]> granicus.if.org Git - python/commitdiff
Issue #11635: Don't use polling in worker threads and processes launched by
authorAntoine Pitrou <solipsis@pitrou.net>
Sat, 26 Mar 2011 18:29:44 +0000 (19:29 +0100)
committerAntoine Pitrou <solipsis@pitrou.net>
Sat, 26 Mar 2011 18:29:44 +0000 (19:29 +0100)
concurrent.futures.

Lib/concurrent/futures/process.py
Lib/concurrent/futures/thread.py
Misc/NEWS

index 79c60c3d105fcdb1c0a51a87b89351319485cfd9..a899d5fca87937b34b2921215bdfe62ddf350775 100644 (file)
@@ -66,28 +66,17 @@ import weakref
 # workers to exit when their work queues are empty and then waits until the
 # threads/processes finish.
 
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
 _shutdown = False
 
 def _python_exit():
     global _shutdown
     _shutdown = True
-    for thread_reference in _thread_references:
-        thread = thread_reference()
-        if thread is not None:
-            thread.join()
-
-def _remove_dead_thread_references():
-    """Remove inactive threads from _thread_references.
-
-    Should be called periodically to prevent memory leaks in scenarios such as:
-    >>> while True:
-    >>> ...    t = ThreadPoolExecutor(max_workers=5)
-    >>> ...    t.map(int, ['1', '2', '3', '4', '5'])
-    """
-    for thread_reference in set(_thread_references):
-        if thread_reference() is None:
-            _thread_references.discard(thread_reference)
+    items = list(_threads_queues.items())
+    for t, q in items:
+        q.put(None)
+    for t, q in items:
+        t.join()
 
 # Controls how many more calls than processes will be queued in the call queue.
 # A smaller number will mean that processes spend more time idle waiting for
@@ -130,11 +119,15 @@ def _process_worker(call_queue, result_queue, shutdown):
     """
     while True:
         try:
-            call_item = call_queue.get(block=True, timeout=0.1)
+            call_item = call_queue.get(block=True)
         except queue.Empty:
             if shutdown.is_set():
                 return
         else:
+            if call_item is None:
+                # Wake up queue management thread
+                result_queue.put(None)
+                return
             try:
                 r = call_item.fn(*call_item.args, **call_item.kwargs)
             except BaseException as e:
@@ -209,40 +202,56 @@ def _queue_manangement_worker(executor_reference,
             process workers that they should exit when their work queue is
             empty.
     """
+    nb_shutdown_processes = 0
+    def shutdown_one_process():
+        """Tell a worker to terminate, which will in turn wake us again"""
+        nonlocal nb_shutdown_processes
+        call_queue.put(None)
+        nb_shutdown_processes += 1
     while True:
         _add_call_item_to_queue(pending_work_items,
                                 work_ids_queue,
                                 call_queue)
 
         try:
-            result_item = result_queue.get(block=True, timeout=0.1)
+            result_item = result_queue.get(block=True)
         except queue.Empty:
-            executor = executor_reference()
-            # No more work items can be added if:
-            #   - The interpreter is shutting down OR
-            #   - The executor that owns this worker has been collected OR
-            #   - The executor that owns this worker has been shutdown.
-            if _shutdown or executor is None or executor._shutdown_thread:
-                # Since no new work items can be added, it is safe to shutdown
-                # this thread if there are no pending work items.
-                if not pending_work_items:
-                    shutdown_process_event.set()
-
-                    # If .join() is not called on the created processes then
-                    # some multiprocessing.Queue methods may deadlock on Mac OS
-                    # X.
-                    for p in processes:
-                        p.join()
-                    return
-            del executor
+            pass
         else:
-            work_item = pending_work_items[result_item.work_id]
-            del pending_work_items[result_item.work_id]
-
-            if result_item.exception:
-                work_item.future.set_exception(result_item.exception)
+            if result_item is not None:
+                work_item = pending_work_items[result_item.work_id]
+                del pending_work_items[result_item.work_id]
+
+                if result_item.exception:
+                    work_item.future.set_exception(result_item.exception)
+                else:
+                    work_item.future.set_result(result_item.result)
+                continue
+        # If we come here, we either got a timeout or were explicitly woken up.
+        # In either case, check whether we should start shutting down.
+        executor = executor_reference()
+        # No more work items can be added if:
+        #   - The interpreter is shutting down OR
+        #   - The executor that owns this worker has been collected OR
+        #   - The executor that owns this worker has been shutdown.
+        if _shutdown or executor is None or executor._shutdown_thread:
+            # Since no new work items can be added, it is safe to shutdown
+            # this thread if there are no pending work items.
+            if not pending_work_items:
+                shutdown_process_event.set()
+
+                while nb_shutdown_processes < len(processes):
+                    shutdown_one_process()
+                # If .join() is not called on the created processes then
+                # some multiprocessing.Queue methods may deadlock on Mac OS
+                # X.
+                for p in processes:
+                    p.join()
+                return
             else:
-                work_item.future.set_result(result_item.result)
+                # Start shutting down by telling a process it can exit.
+                shutdown_one_process()
+        del executor
 
 _system_limits_checked = False
 _system_limited = None
@@ -279,7 +288,6 @@ class ProcessPoolExecutor(_base.Executor):
                 worker processes will be created as the machine has processors.
         """
         _check_system_limits()
-        _remove_dead_thread_references()
 
         if max_workers is None:
             self._max_workers = multiprocessing.cpu_count()
@@ -304,10 +312,14 @@ class ProcessPoolExecutor(_base.Executor):
         self._pending_work_items = {}
 
     def _start_queue_management_thread(self):
+        # When the executor gets lost, the weakref callback will wake up
+        # the queue management thread.
+        def weakref_cb(_, q=self._result_queue):
+            q.put(None)
         if self._queue_management_thread is None:
             self._queue_management_thread = threading.Thread(
                     target=_queue_manangement_worker,
-                    args=(weakref.ref(self),
+                    args=(weakref.ref(self, weakref_cb),
                           self._processes,
                           self._pending_work_items,
                           self._work_ids,
@@ -316,7 +328,7 @@ class ProcessPoolExecutor(_base.Executor):
                           self._shutdown_process_event))
             self._queue_management_thread.daemon = True
             self._queue_management_thread.start()
-            _thread_references.add(weakref.ref(self._queue_management_thread))
+            _threads_queues[self._queue_management_thread] = self._result_queue
 
     def _adjust_process_count(self):
         for _ in range(len(self._processes), self._max_workers):
@@ -339,6 +351,8 @@ class ProcessPoolExecutor(_base.Executor):
             self._pending_work_items[self._queue_count] = w
             self._work_ids.put(self._queue_count)
             self._queue_count += 1
+            # Wake up queue management thread
+            self._result_queue.put(None)
 
             self._start_queue_management_thread()
             self._adjust_process_count()
@@ -348,8 +362,10 @@ class ProcessPoolExecutor(_base.Executor):
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown_thread = True
-        if wait:
-            if self._queue_management_thread:
+        if self._queue_management_thread:
+            # Wake up queue management thread
+            self._result_queue.put(None)
+            if wait:
                 self._queue_management_thread.join()
         # To reduce the risk of openning too many files, remove references to
         # objects that use file descriptors.
index 15736daa5256882445f1da0889a64afceba13b7d..93b495f55a3b4f0273859b88fb63f6c2235c3c3f 100644 (file)
@@ -25,28 +25,17 @@ import weakref
 # workers to exit when their work queues are empty and then waits until the
 # threads finish.
 
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
 _shutdown = False
 
 def _python_exit():
     global _shutdown
     _shutdown = True
-    for thread_reference in _thread_references:
-        thread = thread_reference()
-        if thread is not None:
-            thread.join()
-
-def _remove_dead_thread_references():
-    """Remove inactive threads from _thread_references.
-
-    Should be called periodically to prevent memory leaks in scenarios such as:
-    >>> while True:
-    ...    t = ThreadPoolExecutor(max_workers=5)
-    ...    t.map(int, ['1', '2', '3', '4', '5'])
-    """
-    for thread_reference in set(_thread_references):
-        if thread_reference() is None:
-            _thread_references.discard(thread_reference)
+    items = list(_threads_queues.items())
+    for t, q in items:
+        q.put(None)
+    for t, q in items:
+        t.join()
 
 atexit.register(_python_exit)
 
@@ -72,18 +61,23 @@ def _worker(executor_reference, work_queue):
     try:
         while True:
             try:
-                work_item = work_queue.get(block=True, timeout=0.1)
+                work_item = work_queue.get(block=True)
             except queue.Empty:
-                executor = executor_reference()
-                # Exit if:
-                #   - The interpreter is shutting down OR
-                #   - The executor that owns the worker has been collected OR
-                #   - The executor that owns the worker has been shutdown.
-                if _shutdown or executor is None or executor._shutdown:
-                    return
-                del executor
+                pass
             else:
-                work_item.run()
+                if work_item is not None:
+                    work_item.run()
+                    continue
+            executor = executor_reference()
+            # Exit if:
+            #   - The interpreter is shutting down OR
+            #   - The executor that owns the worker has been collected OR
+            #   - The executor that owns the worker has been shutdown.
+            if _shutdown or executor is None or executor._shutdown:
+                # Notice other workers
+                work_queue.put(None)
+                return
+            del executor
     except BaseException as e:
         _base.LOGGER.critical('Exception in worker', exc_info=True)
 
@@ -95,8 +89,6 @@ class ThreadPoolExecutor(_base.Executor):
             max_workers: The maximum number of threads that can be used to
                 execute the given calls.
         """
-        _remove_dead_thread_references()
-
         self._max_workers = max_workers
         self._work_queue = queue.Queue()
         self._threads = set()
@@ -117,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor):
     submit.__doc__ = _base.Executor.submit.__doc__
 
     def _adjust_thread_count(self):
+        # When the executor gets lost, the weakref callback will wake up
+        # the worker threads.
+        def weakref_cb(_, q=self._work_queue):
+            q.put(None)
         # TODO(bquinlan): Should avoid creating new threads if there are more
         # idle threads than items in the work queue.
         if len(self._threads) < self._max_workers:
             t = threading.Thread(target=_worker,
-                                 args=(weakref.ref(self), self._work_queue))
+                                 args=(weakref.ref(self, weakref_cb),
+                                       self._work_queue))
             t.daemon = True
             t.start()
             self._threads.add(t)
-            _thread_references.add(weakref.ref(t))
+            _threads_queues[t] = self._work_queue
 
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown = True
+            self._work_queue.put(None)
         if wait:
             for t in self._threads:
                 t.join()
index 2fd4c39e9aacb6f14c3006ef938e54860ac412d7..f5ff837d76920c278236a22e6485efc96cfe240f 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -53,6 +53,9 @@ Core and Builtins
 Library
 -------
 
+- Issue #11635: Don't use polling in worker threads and processes launched by
+  concurrent.futures.
+
 - Issue #11628: cmp_to_key generated class should use __slots__
 
 - Issue #11666: let help() display named tuple attributes and methods