]> granicus.if.org Git - python/commitdiff
Issue #17025: multiprocessing: Reduce Queue and SimpleQueue contention.
authorCharles-François Natali <cf.natali@gmail.com>
Mon, 25 Mar 2013 17:20:40 +0000 (18:20 +0100)
committerCharles-François Natali <cf.natali@gmail.com>
Mon, 25 Mar 2013 17:20:40 +0000 (18:20 +0100)
Lib/multiprocessing/queues.py
Misc/NEWS

index f6f02b66656bd453252b644389e6ffc0f6dbea9c..ec188ee4e14df2434f517ecb68152b0394c5b609 100644 (file)
@@ -22,7 +22,7 @@ import _multiprocessing
 from multiprocessing.connection import Pipe
 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
 from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from multiprocessing.forking import assert_spawning, ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -69,8 +69,8 @@ class Queue(object):
         self._joincancelled = False
         self._closed = False
         self._close = None
-        self._send = self._writer.send
-        self._recv = self._reader.recv
+        self._send_bytes = self._writer.send_bytes
+        self._recv_bytes = self._reader.recv_bytes
         self._poll = self._reader.poll
 
     def put(self, obj, block=True, timeout=None):
@@ -89,14 +89,9 @@ class Queue(object):
 
     def get(self, block=True, timeout=None):
         if block and timeout is None:
-            self._rlock.acquire()
-            try:
-                res = self._recv()
-                self._sem.release()
-                return res
-            finally:
-                self._rlock.release()
-
+            with self._rlock:
+                res = self._recv_bytes()
+            self._sem.release()
         else:
             if block:
                 deadline = time.time() + timeout
@@ -109,11 +104,12 @@ class Queue(object):
                         raise Empty
                 elif not self._poll():
                     raise Empty
-                res = self._recv()
+                res = self._recv_bytes()
                 self._sem.release()
-                return res
             finally:
                 self._rlock.release()
+        # unserialize the data after having released the lock
+        return ForkingPickler.loads(res)
 
     def qsize(self):
         # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -158,7 +154,7 @@ class Queue(object):
         self._buffer.clear()
         self._thread = threading.Thread(
             target=Queue._feed,
-            args=(self._buffer, self._notempty, self._send,
+            args=(self._buffer, self._notempty, self._send_bytes,
                   self._wlock, self._writer.close, self._ignore_epipe),
             name='QueueFeederThread'
             )
@@ -210,7 +206,7 @@ class Queue(object):
             notempty.release()
 
     @staticmethod
-    def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
+    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
         debug('starting thread to feed data to pipe')
         from .util import is_exiting
 
@@ -241,16 +237,14 @@ class Queue(object):
                             close()
                             return
 
+                        # serialize the data before acquiring the lock
+                        obj = ForkingPickler.dumps(obj)
                         if wacquire is None:
-                            send(obj)
-                            # Delete references to object. See issue16284
-                            del obj
+                            send_bytes(obj)
                         else:
                             wacquire()
                             try:
-                                send(obj)
-                                # Delete references to object. See issue16284
-                                del obj
+                                send_bytes(obj)
                             finally:
                                 wrelease()
                 except IndexError:
@@ -344,7 +338,6 @@ class SimpleQueue(object):
             self._wlock = None
         else:
             self._wlock = Lock()
-        self._make_methods()
 
     def empty(self):
         return not self._poll()
@@ -355,29 +348,19 @@ class SimpleQueue(object):
 
     def __setstate__(self, state):
         (self._reader, self._writer, self._rlock, self._wlock) = state
-        self._make_methods()
 
-    def _make_methods(self):
-        recv = self._reader.recv
-        racquire, rrelease = self._rlock.acquire, self._rlock.release
-        def get():
-            racquire()
-            try:
-                return recv()
-            finally:
-                rrelease()
-        self.get = get
+    def get(self):
+        with self._rlock:
+            res = self._reader.recv_bytes()
+        # unserialize the data after having released the lock
+        return ForkingPickler.loads(res)
 
+    def put(self, obj):
+        # serialize the data before acquiring the lock
+        obj = ForkingPickler.dumps(obj)
         if self._wlock is None:
             # writes to a message oriented win32 pipe are atomic
-            self.put = self._writer.send
+            self._writer.send_bytes(obj)
         else:
-            send = self._writer.send
-            wacquire, wrelease = self._wlock.acquire, self._wlock.release
-            def put(obj):
-                wacquire()
-                try:
-                    return send(obj)
-                finally:
-                    wrelease()
-            self.put = put
+            with self._wlock:
+                self._writer.send_bytes(obj)
index 46250081bb570342dcee9492c43ae43d7ff7aa3f..edda5412d6d8bb9555f3852223a4dfeca36778e8 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -294,6 +294,8 @@ Core and Builtins
 Library
 -------
 
+- Issue #17025: multiprocessing: Reduce Queue and SimpleQueue contention.
+
 - Issue #17536: Add to webbrowser's browser list: www-browser, x-www-browser,
   iceweasel, iceape.