import socket
import threading
import struct
+import signal
from multiprocessing import current_process
from multiprocessing.util import register_after_fork, debug, sub_debug
self._lock = threading.Lock()
self._listener = None
self._address = None
+ self._thread = None
register_after_fork(self, ResourceSharer._afterfork)
def register(self, send, close):
c.send((key, os.getpid()))
return c
+ def stop(self, timeout=None):
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address, authkey=current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ sub_warn('ResourceSharer thread did not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
def _afterfork(self):
for key, (send, close) in self._cache.items():
close()
self._listener.close()
self._listener = None
self._address = None
+ self._thread = None
def _start(self):
from .connection import Listener
t = threading.Thread(target=self._serve)
t.daemon = True
t.start()
+ self._thread = t
def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1:
try:
conn = self._listener.accept()
- key, destination_pid = conn.recv()
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
send, close = self._cache.pop(key)
send(conn, destination_pid)
close()
ALLOWED_TYPES = ('processes',)
+ @classmethod
+ def tearDownClass(cls):
+ from multiprocessing.reduction import resource_sharer
+ resource_sharer.stop(timeout=5)
+
@classmethod
def _listener(cls, conn, families):
for fam in families: