]> granicus.if.org Git - python/commitdiff
Issue #14666: stop multiprocessing's resource-sharing thread after the tests are...
authorAntoine Pitrou <solipsis@pitrou.net>
Fri, 27 Apr 2012 21:51:03 +0000 (23:51 +0200)
committerAntoine Pitrou <solipsis@pitrou.net>
Fri, 27 Apr 2012 21:51:03 +0000 (23:51 +0200)
Also, block delivery of signals to that thread. Patch by Richard Oudkerk.

This will hopefully fix sporadic freezes on the FreeBSD 9.0 buildbot.

Lib/multiprocessing/reduction.py
Lib/test/test_multiprocessing.py

index ce38fe367e0a83f5328d7ffda1daf88631fca981..cef445b4d8468aa07c2d5e75bb70690ec62b4e4b 100644 (file)
@@ -40,6 +40,7 @@ import sys
 import socket
 import threading
 import struct
+import signal
 
 from multiprocessing import current_process
 from multiprocessing.util import register_after_fork, debug, sub_debug
@@ -209,6 +210,7 @@ class ResourceSharer(object):
         self._lock = threading.Lock()
         self._listener = None
         self._address = None
+        self._thread = None
         register_after_fork(self, ResourceSharer._afterfork)
 
     def register(self, send, close):
@@ -227,6 +229,24 @@ class ResourceSharer(object):
         c.send((key, os.getpid()))
         return c
 
+    def stop(self, timeout=None):
+        from .connection import Client
+        with self._lock:
+            if self._address is not None:
+                c = Client(self._address, authkey=current_process().authkey)
+                c.send(None)
+                c.close()
+                self._thread.join(timeout)
+                if self._thread.is_alive():
+                    sub_warn('ResourceSharer thread did not stop when asked')
+                self._listener.close()
+                self._thread = None
+                self._address = None
+                self._listener = None
+                for key, (send, close) in self._cache.items():
+                    close()
+                self._cache.clear()
+
     def _afterfork(self):
         for key, (send, close) in self._cache.items():
             close()
@@ -239,6 +259,7 @@ class ResourceSharer(object):
             self._listener.close()
         self._listener = None
         self._address = None
+        self._thread = None
 
     def _start(self):
         from .connection import Listener
@@ -249,12 +270,18 @@ class ResourceSharer(object):
         t = threading.Thread(target=self._serve)
         t.daemon = True
         t.start()
+        self._thread = t
 
     def _serve(self):
+        if hasattr(signal, 'pthread_sigmask'):
+            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
         while 1:
             try:
                 conn = self._listener.accept()
-                key, destination_pid = conn.recv()
+                msg = conn.recv()
+                if msg is None:
+                    break
+                key, destination_pid = msg
                 send, close = self._cache.pop(key)
                 send(conn, destination_pid)
                 close()
index 4c22aefbda74ffa4192a7b7bba47cfd8dc8c5c61..799be702fcda33076b6ba8afbc21b1a8138bc7c6 100644 (file)
@@ -1965,6 +1965,11 @@ class _TestPicklingConnections(BaseTestCase):
 
     ALLOWED_TYPES = ('processes',)
 
+    @classmethod
+    def tearDownClass(cls):
+        from multiprocessing.reduction import resource_sharer
+        resource_sharer.stop(timeout=5)
+
     @classmethod
     def _listener(cls, conn, families):
         for fam in families: