]> granicus.if.org Git - python/commitdiff
[3.6] bpo-24484: Avoid race condition in multiprocessing cleanup (GH-2159) (#2166)
authorAntoine Pitrou <pitrou@free.fr>
Tue, 13 Jun 2017 15:51:26 +0000 (17:51 +0200)
committerGitHub <noreply@github.com>
Tue, 13 Jun 2017 15:51:26 +0000 (17:51 +0200)
* bpo-24484: Avoid race condition in multiprocessing cleanup

The finalizer registry can be mutated while inspected by multiprocessing
at process exit.

* Use test.support.start_threads()

* Add Misc/NEWS.
(cherry picked from commit 1eb6c0074d17f4fd425cacfdda893d65f5f77f0a)

Lib/multiprocessing/util.py
Lib/test/_test_multiprocessing.py
Misc/NEWS

index 0ce274ceca6057ae0f684a661ef476c0e62a3561..b490caa7e64333955ba1bb26058f5d2bb6697a0b 100644 (file)
@@ -241,20 +241,28 @@ def _run_finalizers(minpriority=None):
         return
 
     if minpriority is None:
-        f = lambda p : p[0][0] is not None
+        f = lambda p : p[0] is not None
     else:
-        f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
-
-    items = [x for x in list(_finalizer_registry.items()) if f(x)]
-    items.sort(reverse=True)
-
-    for key, finalizer in items:
-        sub_debug('calling %s', finalizer)
-        try:
-            finalizer()
-        except Exception:
-            import traceback
-            traceback.print_exc()
+        f = lambda p : p[0] is not None and p[0] >= minpriority
+
+    # Careful: _finalizer_registry may be mutated while this function
+    # is running (either by a GC run or by another thread).
+
+    # list(_finalizer_registry) should be atomic, while
+    # list(_finalizer_registry.items()) is not.
+    keys = [key for key in list(_finalizer_registry) if f(key)]
+    keys.sort(reverse=True)
+
+    for key in keys:
+        finalizer = _finalizer_registry.get(key)
+        # key may have been removed from the registry
+        if finalizer is not None:
+            sub_debug('calling %s', finalizer)
+            try:
+                finalizer()
+            except Exception:
+                import traceback
+                traceback.print_exc()
 
     if minpriority is None:
         _finalizer_registry.clear()
index f1f93674935e7ab13d80aba6d8922355c867066c..cd2c8dbc487fadf6d759f14939896d6844c7c1ea 100644 (file)
@@ -3075,6 +3075,14 @@ class _TestFinalize(BaseTestCase):
 
     ALLOWED_TYPES = ('processes',)
 
+    def setUp(self):
+        self.registry_backup = util._finalizer_registry.copy()
+        util._finalizer_registry.clear()
+
+    def tearDown(self):
+        self.assertFalse(util._finalizer_registry)
+        util._finalizer_registry.update(self.registry_backup)
+
     @classmethod
     def _test_finalize(cls, conn):
         class Foo(object):
@@ -3124,6 +3132,61 @@ class _TestFinalize(BaseTestCase):
         result = [obj for obj in iter(conn.recv, 'STOP')]
         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
 
+    def test_thread_safety(self):
+        # bpo-24484: _run_finalizers() should be thread-safe
+        def cb():
+            pass
+
+        class Foo(object):
+            def __init__(self):
+                self.ref = self  # create reference cycle
+                # insert finalizer at random key
+                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
+
+        finish = False
+        exc = None
+
+        def run_finalizers():
+            nonlocal exc
+            while not finish:
+                time.sleep(random.random() * 1e-1)
+                try:
+                    # A GC run will eventually happen during this,
+                    # collecting stale Foo's and mutating the registry
+                    util._run_finalizers()
+                except Exception as e:
+                    exc = e
+
+        def make_finalizers():
+            nonlocal exc
+            d = {}
+            while not finish:
+                try:
+                    # Old Foo's get gradually replaced and later
+                    # collected by the GC (because of the cyclic ref)
+                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
+                except Exception as e:
+                    exc = e
+                    d.clear()
+
+        old_interval = sys.getswitchinterval()
+        old_threshold = gc.get_threshold()
+        try:
+            sys.setswitchinterval(1e-6)
+            gc.set_threshold(5, 5, 5)
+            threads = [threading.Thread(target=run_finalizers),
+                       threading.Thread(target=make_finalizers)]
+            with test.support.start_threads(threads):
+                time.sleep(4.0)  # Wait a bit to trigger race condition
+                finish = True
+            if exc is not None:
+                raise exc
+        finally:
+            sys.setswitchinterval(old_interval)
+            gc.set_threshold(*old_threshold)
+            gc.collect()  # Collect remaining Foo's
+
+
 #
 # Test that from ... import * works for each module
 #
index f744160083702583b3f75ba44c0e53c10c0b3bb1..f3874140a5249c4dca48b4f35645a1f2094f3db7 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -51,6 +51,8 @@ Core and Builtins
 Library
 -------
 
+- bpo-24484: Avoid race condition in multiprocessing cleanup (#2159)
+
 - bpo-28994: The traceback no longer displayed for SystemExit raised in
   a callback registered by atexit.