]> granicus.if.org Git - python/commitdiff
Issue #12091: simplify ApplyResult and MapResult with threading.Event
authorRichard Oudkerk <shibturn@gmail.com>
Fri, 25 May 2012 12:26:53 +0000 (13:26 +0100)
committerRichard Oudkerk <shibturn@gmail.com>
Fri, 25 May 2012 12:26:53 +0000 (13:26 +0100)
Patch by Charles-François Natali

Lib/multiprocessing/pool.py

index 66d7cc7d99097c5cba5ab8aafc2bef718906d928..149e32a0e1c6ae0b177f77dd9b023dc1bea28b33 100644 (file)
@@ -526,32 +526,26 @@ class Pool(object):
 class ApplyResult(object):
 
     def __init__(self, cache, callback, error_callback):
-        self._cond = threading.Condition(threading.Lock())
+        self._event = threading.Event()
         self._job = next(job_counter)
         self._cache = cache
-        self._ready = False
         self._callback = callback
         self._error_callback = error_callback
         cache[self._job] = self
 
     def ready(self):
-        return self._ready
+        return self._event.is_set()
 
     def successful(self):
-        assert self._ready
+        assert self.ready()
         return self._success
 
     def wait(self, timeout=None):
-        self._cond.acquire()
-        try:
-            if not self._ready:
-                self._cond.wait(timeout)
-        finally:
-            self._cond.release()
+        self._event.wait(timeout)
 
     def get(self, timeout=None):
         self.wait(timeout)
-        if not self._ready:
+        if not self.ready():
             raise TimeoutError
         if self._success:
             return self._value
@@ -564,12 +558,7 @@ class ApplyResult(object):
             self._callback(self._value)
         if self._error_callback and not self._success:
             self._error_callback(self._value)
-        self._cond.acquire()
-        try:
-            self._ready = True
-            self._cond.notify()
-        finally:
-            self._cond.release()
+        self._event.set()
         del self._cache[self._job]
 
 #
@@ -586,7 +575,7 @@ class MapResult(ApplyResult):
         self._chunksize = chunksize
         if chunksize <= 0:
             self._number_left = 0
-            self._ready = True
+            self._event.set()
         else:
             self._number_left = length//chunksize + bool(length % chunksize)
 
@@ -599,24 +588,14 @@ class MapResult(ApplyResult):
                 if self._callback:
                     self._callback(self._value)
                 del self._cache[self._job]
-                self._cond.acquire()
-                try:
-                    self._ready = True
-                    self._cond.notify()
-                finally:
-                    self._cond.release()
+                self._event.set()
         else:
             self._success = False
             self._value = result
             if self._error_callback:
                 self._error_callback(self._value)
             del self._cache[self._job]
-            self._cond.acquire()
-            try:
-                self._ready = True
-                self._cond.notify()
-            finally:
-                self._cond.release()
+            self._event.set()
 
 #
 # Class whose instances are returned by `Pool.imap()`