_cleanup()
self._popen = self._Popen(self)
self._sentinel = self._popen.sentinel
+ # Avoid a refcycle if the target function holds an indirect
+ # reference to the process object (see bpo-30775)
+ del self._target, self._args, self._kwargs
_children.add(self)
def terminate(self):
# Testcases
#
+class DummyCallable:
+ def __call__(self, q, c):
+ assert isinstance(c, DummyCallable)
+ q.put(5)
+
+
class _TestProcess(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads')
p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=1))
+ def test_lose_target_ref(self):
+ c = DummyCallable()
+ wr = weakref.ref(c)
+ q = self.Queue()
+ p = self.Process(target=c, args=(q, c))
+ del c
+ p.start()
+ p.join()
+ self.assertIs(wr(), None)
+ self.assertEqual(q.get(), 5)
+
+
#
#
#