bpo-24484: Avoid race condition in multiprocessing cleanup (#2159)
authorAntoine Pitrou <pitrou@free.fr>
Tue, 13 Jun 2017 15:10:39 +0000 (17:10 +0200)
committerGitHub <noreply@github.com>
Tue, 13 Jun 2017 15:10:39 +0000 (17:10 +0200)
* bpo-24484: Avoid race condition in multiprocessing cleanup

The finalizer registry can be mutated while inspected by multiprocessing
at process exit.

* Use test.support.start_threads()

* Add Misc/NEWS

Lib/multiprocessing/util.py
Lib/test/_test_multiprocessing.py
Misc/NEWS

index 0ce274ceca6057ae0f684a661ef476c0e62a3561..b490caa7e64333955ba1bb26058f5d2bb6697a0b 100644 (file)
@@ -241,20 +241,28 @@ def _run_finalizers(minpriority=None):
         return
 
     if minpriority is None:
-        f = lambda p : p[0][0] is not None
+        f = lambda p : p[0] is not None
     else:
-        f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
-
-    items = [x for x in list(_finalizer_registry.items()) if f(x)]
-    items.sort(reverse=True)
-
-    for key, finalizer in items:
-        sub_debug('calling %s', finalizer)
-        try:
-            finalizer()
-        except Exception:
-            import traceback
-            traceback.print_exc()
+        f = lambda p : p[0] is not None and p[0] >= minpriority
+
+    # Careful: _finalizer_registry may be mutated while this function
+    # is running (either by a GC run or by another thread).
+
+    # list(_finalizer_registry) should be atomic, while
+    # list(_finalizer_registry.items()) is not.
+    keys = [key for key in list(_finalizer_registry) if f(key)]
+    keys.sort(reverse=True)
+
+    for key in keys:
+        finalizer = _finalizer_registry.get(key)
+        # key may have been removed from the registry
+        if finalizer is not None:
+            sub_debug('calling %s', finalizer)
+            try:
+                finalizer()
+            except Exception:
+                import traceback
+                traceback.print_exc()
 
     if minpriority is None:
         _finalizer_registry.clear()
index 70ecc54bfec2c531fd38efb6f6285287eafe4a43..d49e9c68c84bc7c3364959f28c0d119f457afb70 100644 (file)
@@ -3110,6 +3110,14 @@ class _TestFinalize(BaseTestCase):
 
     ALLOWED_TYPES = ('processes',)
 
+    def setUp(self):
+        self.registry_backup = util._finalizer_registry.copy()
+        util._finalizer_registry.clear()
+
+    def tearDown(self):
+        self.assertFalse(util._finalizer_registry)
+        util._finalizer_registry.update(self.registry_backup)
+
     @classmethod
     def _test_finalize(cls, conn):
         class Foo(object):
@@ -3159,6 +3167,61 @@ class _TestFinalize(BaseTestCase):
         result = [obj for obj in iter(conn.recv, 'STOP')]
         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
 
+    def test_thread_safety(self):
+        # bpo-24484: _run_finalizers() should be thread-safe
+        def cb():
+            pass
+
+        class Foo(object):
+            def __init__(self):
+                self.ref = self  # create reference cycle
+                # insert finalizer at random key
+                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
+
+        finish = False
+        exc = None
+
+        def run_finalizers():
+            nonlocal exc
+            while not finish:
+                time.sleep(random.random() * 1e-1)
+                try:
+                    # A GC run will eventually happen during this,
+                    # collecting stale Foo's and mutating the registry
+                    util._run_finalizers()
+                except Exception as e:
+                    exc = e
+
+        def make_finalizers():
+            nonlocal exc
+            d = {}
+            while not finish:
+                try:
+                    # Old Foo's get gradually replaced and later
+                    # collected by the GC (because of the cyclic ref)
+                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
+                except Exception as e:
+                    exc = e
+                    d.clear()
+
+        old_interval = sys.getswitchinterval()
+        old_threshold = gc.get_threshold()
+        try:
+            sys.setswitchinterval(1e-6)
+            gc.set_threshold(5, 5, 5)
+            threads = [threading.Thread(target=run_finalizers),
+                       threading.Thread(target=make_finalizers)]
+            with test.support.start_threads(threads):
+                time.sleep(4.0)  # Wait a bit to trigger race condition
+                finish = True
+            if exc is not None:
+                raise exc
+        finally:
+            sys.setswitchinterval(old_interval)
+            gc.set_threshold(*old_threshold)
+            gc.collect()  # Collect remaining Foo's
+
+
 #
 # Test that from ... import * works for each module
 #
index f9aa1630ebf1925f946b6c62264621d8a5dd98ae..f91695d3350d46a764a3c392a44981c577cde352 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -362,6 +362,8 @@ Extension Modules
 Library
 -------
 
+- bpo-24484: Avoid race condition in multiprocessing cleanup.
+
 - bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite
   of the signal number when the process is killed by a signal (instead
   of 255) when using the "forkserver" method.