]> granicus.if.org Git - python/commitdiff
Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
authorCharles-François Natali <neologix@free.fr>
Tue, 17 Apr 2012 16:45:57 +0000 (18:45 +0200)
committerCharles-François Natali <neologix@free.fr>
Tue, 17 Apr 2012 16:45:57 +0000 (18:45 +0200)
Doc/library/multiprocessing.rst
Lib/multiprocessing/managers.py
Lib/multiprocessing/synchronize.py
Lib/test/test_multiprocessing.py
Misc/NEWS

index 1c7b9b95b95ce33991fefa2fca85b8bc451b445e..b9dfd19b316c1b28fb50d4f0b34f2abb311fa057 100644 (file)
@@ -897,6 +897,9 @@ object -- see :ref:`multiprocessing-managers`.
    If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
    object from :mod:`multiprocessing`.
 
+   .. versionchanged:: 3.3
+      The :meth:`wait_for` method was added.
+
 .. class:: Event()
 
    A clone of :class:`threading.Event`.
@@ -1281,6 +1284,9 @@ their parent process exits.  The manager classes are defined in the
       If *lock* is supplied then it should be a proxy for a
       :class:`threading.Lock` or :class:`threading.RLock` object.
 
+      .. versionchanged:: 3.3
+         The :meth:`wait_for` method was added.
+
    .. method:: Event()
 
       Create a shared :class:`threading.Event` object and return a proxy for it.
index eaf912c12456b356435874872a63be2ffdbf0916..d1c9d4578ea11243df9feb0dca211fc44c4cfe7a 100644 (file)
@@ -48,6 +48,7 @@ from traceback import format_exc
 from multiprocessing import Process, current_process, active_children, Pool, util, connection
 from multiprocessing.process import AuthenticationString
 from multiprocessing.forking import exit, Popen, ForkingPickler
+from time import time as _time
 
 #
 # Register some things for pickling
@@ -996,6 +997,24 @@ class ConditionProxy(AcquirerProxy):
         return self._callmethod('notify')
     def notify_all(self):
         return self._callmethod('notify_all')
+    def wait_for(self, predicate, timeout=None):
+        result = predicate()
+        if result:
+            return result
+        if timeout is not None:
+            endtime = _time() + timeout
+        else:
+            endtime = None
+            waittime = None
+        while not result:
+            if endtime is not None:
+                waittime = endtime - _time()
+                if waittime <= 0:
+                    break
+            self.wait(waittime)
+            result = predicate()
+        return result
+
 
 class EventProxy(BaseProxy):
     _exposed_ = ('is_set', 'set', 'clear', 'wait')
index e35bbff185c204f05bf9a858ae4f2cdcf2fb1d3f..532ac5c1dd24855d831f1f14c7162c74637d2ab9 100644 (file)
@@ -43,6 +43,7 @@ import _multiprocessing
 from multiprocessing.process import current_process
 from multiprocessing.util import register_after_fork, debug
 from multiprocessing.forking import assert_spawning, Popen
+from time import time as _time
 
 # Try to import the mp.synchronize module cleanly, if it fails
 # raise ImportError for platforms lacking a working sem_open implementation.
@@ -290,6 +291,24 @@ class Condition(object):
             while self._wait_semaphore.acquire(False):
                 pass
 
+    def wait_for(self, predicate, timeout=None):
+        result = predicate()
+        if result:
+            return result
+        if timeout is not None:
+            endtime = _time() + timeout
+        else:
+            endtime = None
+            waittime = None
+        while not result:
+            if endtime is not None:
+                waittime = endtime - _time()
+                if waittime <= 0:
+                    break
+            self.wait(waittime)
+            result = predicate()
+        return result
+
 #
 # Event
 #
index 2bcdb4e07c08d129f6dd26a17d6ef015e91b8776..bbde366e5dbb8bc8e1ba93f0c131ee7dc4df2a44 100644 (file)
@@ -887,6 +887,73 @@ class _TestCondition(BaseTestCase):
         self.assertEqual(res, False)
         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 
+    @classmethod
+    def _test_waitfor_f(cls, cond, state):
+        with cond:
+            state.value = 0
+            cond.notify()
+            result = cond.wait_for(lambda : state.value==4)
+            if not result or state.value != 4:
+                sys.exit(1)
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', -1)
+
+        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
+        p.daemon = True
+        p.start()
+
+        with cond:
+            result = cond.wait_for(lambda : state.value==0)
+            self.assertTrue(result)
+            self.assertEqual(state.value, 0)
+
+        for i in range(4):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertFalse(p.is_alive())
+        self.assertEqual(p.exitcode, 0)
+
+    @classmethod
+    def _test_waitfor_timeout_f(cls, cond, state, success):
+        with cond:
+            expected = 0.1
+            dt = time.time()
+            result = cond.wait_for(lambda : state.value==4, timeout=expected)
+            dt = time.time() - dt
+            # borrow logic in assertTimeout() from test/lock_tests.py
+            if not result and expected * 0.6 < dt < expected * 10.0:
+                success.value = True
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor_timeout(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', 0)
+        success = self.Value('i', False)
+
+        p = self.Process(target=self._test_waitfor_timeout_f,
+                         args=(cond, state, success))
+        p.daemon = True
+        p.start()
+
+        # Only increment 3 times, so state == 4 is never reached.
+        for i in range(3):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertTrue(success.value)
+
 
 class _TestEvent(BaseTestCase):
 
index 54da24dac11f408a630e3802197e5005757760a7..f1837e732cf930a4208e6d3ecacbe6072ba0c8d2 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -39,6 +39,8 @@ Core and Builtins
 Library
 -------
 
+- Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
+
 - Issue #14452: SysLogHandler no longer inserts a UTF-8 BOM into the message.
 
 - Issue #14386: Expose the dict_proxy internal type as types.MappingProxyType.