]> granicus.if.org Git - python/commitdiff
bpo-29861: release references to multiprocessing Pool tasks (#743) (#803)
authorAntoine Pitrou <pitrou@free.fr>
Fri, 24 Mar 2017 15:03:46 +0000 (16:03 +0100)
committerGitHub <noreply@github.com>
Fri, 24 Mar 2017 15:03:46 +0000 (16:03 +0100)
* bpo-29861: release references to multiprocessing Pool tasks (#743)

* bpo-29861: release references to multiprocessing Pool tasks

Release references to tasks, their arguments and their results as soon
as they are finished, instead of keeping them alive until another task
arrives.

* Comments in test

(cherry picked from commit 8988945cdc27ffa86ba8c624e095b51c459f5154)

* Fix Misc/NEWS ?

Lib/multiprocessing/pool.py
Lib/test/test_multiprocessing.py
Misc/NEWS

index ceb93aab8624efa0d8029b8531cd5da9ef2acd87..a47cd0f58a05af04affbc7b37fe42337cd101d64 100644 (file)
@@ -120,6 +120,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
             debug("Possible encoding error while sending result: %s" % (
                 wrapped))
             put((job, i, (False, wrapped)))
+
+        task = job = result = func = args = kwds = None
         completed += 1
     debug('worker exiting after %d tasks' % completed)
 
@@ -362,10 +364,11 @@ class Pool(object):
                 if set_length:
                     debug('doing set_length()')
                     set_length(i+1)
+            finally:
+                task = taskseq = job = None
         else:
             debug('task handler got sentinel')
 
-
         try:
             # tell result handler to finish when cache is empty
             debug('task handler sending sentinel to result handler')
@@ -405,6 +408,7 @@ class Pool(object):
                 cache[job]._set(i, obj)
             except KeyError:
                 pass
+            task = job = obj = None
 
         while cache and thread._state != TERMINATE:
             try:
@@ -421,6 +425,7 @@ class Pool(object):
                 cache[job]._set(i, obj)
             except KeyError:
                 pass
+            task = job = obj = None
 
         if hasattr(outqueue, '_reader'):
             debug('ensuring that outqueue is not full')
index c66727cf3ae52e7c09de2cedafd7a6f45e15a17f..163c42f1d10a17ced98084cbe4d0601db3e704df 100644 (file)
@@ -14,6 +14,7 @@ import socket
 import random
 import logging
 import errno
+import weakref
 import test.script_helper
 from test import test_support
 from StringIO import StringIO
@@ -1123,6 +1124,19 @@ def sqr(x, wait=0.0):
     time.sleep(wait)
     return x*x
 
+def identity(x):
+    return x
+
+class CountedObject(object):
+    n_instances = 0
+
+    def __new__(cls):
+        cls.n_instances += 1
+        return object.__new__(cls)
+
+    def __del__(self):
+        type(self).n_instances -= 1
+
 class SayWhenError(ValueError): pass
 
 def exception_throwing_generator(total, when):
@@ -1268,6 +1282,20 @@ class _TestPool(BaseTestCase):
         p.close()
         p.join()
 
+    def test_release_task_refs(self):
+        # Issue #29861: task arguments and results should not be kept
+        # alive after we are done with them.
+        objs = list(CountedObject() for i in range(10))
+        refs = list(weakref.ref(o) for o in objs)
+        self.pool.map(identity, objs)
+
+        del objs
+        self.assertEqual(set(wr() for wr in refs), {None})
+        # With a process pool, copies of the objects are returned, check
+        # they were released too.
+        self.assertEqual(CountedObject.n_instances, 0)
+
+
 def unpickleable_result():
     return lambda: 42
 
index 2b492f406cb85a7f454d0ac0fe45b349b200b3b8..de5ef7dc484c37db35e363803e443b6704952109 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -39,6 +39,9 @@ Extension Modules
 Library
 -------
 
+- bpo-29861: Release references to tasks, their arguments and their results
+  as soon as they are finished in multiprocessing.Pool.
+
 - bpo-27880: Fixed integer overflow in cPickle when pickle large strings or
   too many objects.