]> granicus.if.org Git - python/commitdiff
Use WeakSets rather than manual pruning to prevent unbounded growth of dead thread...
authorBrian Quinlan <brian@sweetapp.com>
Sun, 20 Mar 2011 02:11:11 +0000 (13:11 +1100)
committerBrian Quinlan <brian@sweetapp.com>
Sun, 20 Mar 2011 02:11:11 +0000 (13:11 +1100)
Lib/concurrent/futures/process.py
Lib/concurrent/futures/thread.py

index 79c60c3d105fcdb1c0a51a87b89351319485cfd9..44f8504ccab6cf071866343199bccf34a0e06a3f 100644 (file)
@@ -66,28 +66,14 @@ import weakref
 # workers to exit when their work queues are empty and then waits until the
 # threads/processes finish.
 
-_thread_references = set()
+_live_threads = weakref.WeakSet()
 _shutdown = False
 
 def _python_exit():
     global _shutdown
     _shutdown = True
-    for thread_reference in _thread_references:
-        thread = thread_reference()
-        if thread is not None:
-            thread.join()
-
-def _remove_dead_thread_references():
-    """Remove inactive threads from _thread_references.
-
-    Should be called periodically to prevent memory leaks in scenarios such as:
-    >>> while True:
-    >>> ...    t = ThreadPoolExecutor(max_workers=5)
-    >>> ...    t.map(int, ['1', '2', '3', '4', '5'])
-    """
-    for thread_reference in set(_thread_references):
-        if thread_reference() is None:
-            _thread_references.discard(thread_reference)
+    for thread in _live_threads:
+        thread.join()
 
 # Controls how many more calls than processes will be queued in the call queue.
 # A smaller number will mean that processes spend more time idle waiting for
@@ -279,7 +265,6 @@ class ProcessPoolExecutor(_base.Executor):
                 worker processes will be created as the machine has processors.
         """
         _check_system_limits()
-        _remove_dead_thread_references()
 
         if max_workers is None:
             self._max_workers = multiprocessing.cpu_count()
@@ -316,7 +301,7 @@ class ProcessPoolExecutor(_base.Executor):
                           self._shutdown_process_event))
             self._queue_management_thread.daemon = True
             self._queue_management_thread.start()
-            _thread_references.add(weakref.ref(self._queue_management_thread))
+            _live_threads.add(self._queue_management_thread)
 
     def _adjust_process_count(self):
         for _ in range(len(self._processes), self._max_workers):
index 15736daa5256882445f1da0889a64afceba13b7d..299b94a77ab82f0a957ce34d326dbb282d8a92ec 100644 (file)
@@ -25,29 +25,14 @@ import weakref
 # workers to exit when their work queues are empty and then waits until the
 # threads finish.
 
-_thread_references = set()
+_live_threads = weakref.WeakSet()
 _shutdown = False
 
 def _python_exit():
     global _shutdown
     _shutdown = True
-    for thread_reference in _thread_references:
-        thread = thread_reference()
-        if thread is not None:
-            thread.join()
-
-def _remove_dead_thread_references():
-    """Remove inactive threads from _thread_references.
-
-    Should be called periodically to prevent memory leaks in scenarios such as:
-    >>> while True:
-    ...    t = ThreadPoolExecutor(max_workers=5)
-    ...    t.map(int, ['1', '2', '3', '4', '5'])
-    """
-    for thread_reference in set(_thread_references):
-        if thread_reference() is None:
-            _thread_references.discard(thread_reference)
-
+    for thread in _live_threads:
+        thread.join()
 atexit.register(_python_exit)
 
 class _WorkItem(object):
@@ -95,8 +80,6 @@ class ThreadPoolExecutor(_base.Executor):
             max_workers: The maximum number of threads that can be used to
                 execute the given calls.
         """
-        _remove_dead_thread_references()
-
         self._max_workers = max_workers
         self._work_queue = queue.Queue()
         self._threads = set()
@@ -125,7 +108,7 @@ class ThreadPoolExecutor(_base.Executor):
             t.daemon = True
             t.start()
             self._threads.add(t)
-            _thread_references.add(weakref.ref(t))
+            _live_threads.add(t)
 
     def shutdown(self, wait=True):
         with self._shutdown_lock: