]> granicus.if.org Git - python/commitdiff
bpo-32576: use queue.SimpleQueue in critical places (#5216)
authorAntoine Pitrou <pitrou@free.fr>
Thu, 18 Jan 2018 09:38:03 +0000 (10:38 +0100)
committerGitHub <noreply@github.com>
Thu, 18 Jan 2018 09:38:03 +0000 (10:38 +0100)
Where a queue may be invoked from a weakref callback, we need
to use the reentrant SimpleQueue.

Lib/concurrent/futures/thread.py
Lib/multiprocessing/pool.py
Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst [new file with mode: 0644]

index 2e7100bc3529d4b44b99562e1ef89658edc380b1..6e22950a157db6c80aba15047aa4846544d152c9 100644 (file)
@@ -128,7 +128,7 @@ class ThreadPoolExecutor(_base.Executor):
             raise TypeError("initializer must be a callable")
 
         self._max_workers = max_workers
-        self._work_queue = queue.Queue()
+        self._work_queue = queue.SimpleQueue()
         self._threads = set()
         self._broken = False
         self._shutdown = False
index b1ee725fac6c25f1d9fe4dd24259b494ccc37b01..3e9a0d6b48679f778b193472ba0162cbbe119f5a 100644 (file)
@@ -156,7 +156,7 @@ class Pool(object):
                  maxtasksperchild=None, context=None):
         self._ctx = context or get_context()
         self._setup_queues()
-        self._taskqueue = queue.Queue()
+        self._taskqueue = queue.SimpleQueue()
         self._cache = {}
         self._state = RUN
         self._maxtasksperchild = maxtasksperchild
@@ -802,15 +802,18 @@ class ThreadPool(Pool):
         Pool.__init__(self, processes, initializer, initargs)
 
     def _setup_queues(self):
-        self._inqueue = queue.Queue()
-        self._outqueue = queue.Queue()
+        self._inqueue = queue.SimpleQueue()
+        self._outqueue = queue.SimpleQueue()
         self._quick_put = self._inqueue.put
         self._quick_get = self._outqueue.get
 
     @staticmethod
     def _help_stuff_finish(inqueue, task_handler, size):
-        # put sentinels at head of inqueue to make workers finish
-        with inqueue.not_empty:
-            inqueue.queue.clear()
-            inqueue.queue.extend([None] * size)
-            inqueue.not_empty.notify_all()
+        # drain inqueue, and put sentinels at its head to make workers finish
+        try:
+            while True:
+                inqueue.get(block=False)
+        except queue.Empty:
+            pass
+        for i in range(size):
+            inqueue.put(None)
diff --git a/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst b/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst
new file mode 100644 (file)
index 0000000..143a83e
--- /dev/null
@@ -0,0 +1,2 @@
+Use queue.SimpleQueue() in places where it can be invoked from a weakref
+callback.