From ab74504346a6e2569b3255b7b621c589716888c4 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 18 Jan 2018 10:38:03 +0100 Subject: [PATCH] bpo-32576: use queue.SimpleQueue in critical places (#5216) Where a queue may be invoked from a weakref callback, we need to use the reentrant SimpleQueue. --- Lib/concurrent/futures/thread.py | 2 +- Lib/multiprocessing/pool.py | 19 +++++++++++-------- .../2018-01-17-13-04-16.bpo-32576.iDL09t.rst | 2 ++ 3 files changed, 14 insertions(+), 9 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2e7100bc35..6e22950a15 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -128,7 +128,7 @@ class ThreadPoolExecutor(_base.Executor): raise TypeError("initializer must be a callable") self._max_workers = max_workers - self._work_queue = queue.Queue() + self._work_queue = queue.SimpleQueue() self._threads = set() self._broken = False self._shutdown = False diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b1ee725fac..3e9a0d6b48 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -156,7 +156,7 @@ class Pool(object): maxtasksperchild=None, context=None): self._ctx = context or get_context() self._setup_queues() - self._taskqueue = queue.Queue() + self._taskqueue = queue.SimpleQueue() self._cache = {} self._state = RUN self._maxtasksperchild = maxtasksperchild @@ -802,15 +802,18 @@ class ThreadPool(Pool): Pool.__init__(self, processes, initializer, initargs) def _setup_queues(self): - self._inqueue = queue.Queue() - self._outqueue = queue.Queue() + self._inqueue = queue.SimpleQueue() + self._outqueue = queue.SimpleQueue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get @staticmethod def _help_stuff_finish(inqueue, task_handler, size): - # put sentinels at head of inqueue to make workers finish - with inqueue.not_empty: - inqueue.queue.clear() - inqueue.queue.extend([None] * size) - inqueue.not_empty.notify_all() + # drain inqueue, and put sentinels at its head to make workers finish + try: + while True: + inqueue.get(block=False) + except queue.Empty: + pass + for i in range(size): + inqueue.put(None) diff --git a/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst b/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst new file mode 100644 index 0000000000..143a83e0fc --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst @@ -0,0 +1,2 @@ +Use queue.SimpleQueue() in places where it can be invoked from a weakref +callback. -- 2.40.0