# without thread support.
import threading
-import multiprocessing.dummy
import multiprocessing.connection
-import multiprocessing.managers
+import multiprocessing.dummy
import multiprocessing.heap
+import multiprocessing.managers
import multiprocessing.pool
+import multiprocessing.queues
from multiprocessing import util
def latin(s):
return s.encode('latin')
+
+def close_queue(queue):
+ if isinstance(queue, multiprocessing.queues.Queue):
+ queue.close()
+ queue.join_thread()
+
+
#
# Constants
#
self.assertEqual(p.exitcode, 0)
self.assertEqual(p.is_alive(), False)
self.assertNotIn(p, self.active_children())
+ close_queue(q)
@classmethod
def _test_terminate(cls):
p.join()
self.assertIs(wr(), None)
self.assertEqual(q.get(), 5)
+ close_queue(q)
#
self.assertEqual(queue_full(queue, MAXSIZE), False)
proc.join()
+ close_queue(queue)
@classmethod
def _test_get(cls, queue, child_can_start, parent_can_continue):
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
proc.join()
+ close_queue(queue)
@classmethod
def _test_fork(cls, queue):
self.assertRaises(pyqueue.Empty, queue.get, False)
p.join()
+ close_queue(queue)
def test_qsize(self):
q = self.Queue()
self.assertEqual(q.qsize(), 1)
q.get()
self.assertEqual(q.qsize(), 0)
+ close_queue(q)
@classmethod
def _test_task_done(cls, q):
for p in workers:
p.join()
+ close_queue(queue)
def test_no_import_lock_contention(self):
with test.support.temp_cwd():
# Tolerate a delta of 30 ms because of the bad clock resolution on
# Windows (usually 15.6 ms)
self.assertGreaterEqual(delta, 0.170)
+ close_queue(q)
def test_queue_feeder_donot_stop_onexc(self):
# bpo-30414: verify feeder handles exceptions correctly
q = self.Queue()
q.put(NotSerializable())
q.put(True)
- self.assertTrue(q.get(timeout=0.1))
+ # bpo-30595: use a timeout of 1 second for slow buildbots
+ self.assertTrue(q.get(timeout=1.0))
+ close_queue(q)
#
#
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
+ self.addCleanup(p.join)
p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
+ self.addCleanup(p.join)
# wait for both children to start sleeping
sleeping.acquire()
args=(cond, sleeping, woken, TIMEOUT1))
p.daemon = True
p.start()
+ self.addCleanup(p.join)
t = threading.Thread(target=self.f,
args=(cond, sleeping, woken, TIMEOUT1))
t.daemon = True
t.start()
+ self.addCleanup(t.join)
# wait for them all to sleep
for i in range(6):
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
+ self.addCleanup(p.join)
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
t.daemon = True
t.start()
+ self.addCleanup(t.join)
# wait for them to all sleep
for i in range(6):
p.daemon = True
p.start()
self.assertEqual(wait(), True)
+ p.join()
#
# Tests for Barrier - adapted from tests in test/lock_tests.py
self.run_threads(self._test_wait_return_f, (self.barrier, queue))
results = [queue.get() for i in range(self.N)]
self.assertEqual(results.count(0), 1)
+ close_queue(queue)
@classmethod
def _test_action_f(cls, barrier, results):
p = self.Process(target=self._test_thousand_f,
args=(self.barrier, passes, child_conn, lock))
p.start()
+ self.addCleanup(p.join)
for i in range(passes):
for j in range(self.N):
w.close()
self.assertEqual(conn.recv(), 'foobar'*2)
+ p.join()
+
#
#
#
logger.setLevel(LEVEL1)
p = self.Process(target=self._test_level, args=(writer,))
- p.daemon = True
p.start()
self.assertEqual(LEVEL1, reader.recv())
+ p.join()
logger.setLevel(logging.NOTSET)
root_logger.setLevel(LEVEL2)
p = self.Process(target=self._test_level, args=(writer,))
- p.daemon = True
p.start()
self.assertEqual(LEVEL2, reader.recv())
+ p.join()
root_logger.setLevel(root_level)
logger.setLevel(level=LOG_LEVEL)
except pyqueue.Empty:
pass
-def _test_process(q):
+def _test_process():
queue = multiprocessing.Queue()
subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
subProc.daemon = True
class TestStdinBadfiledescriptor(unittest.TestCase):
def test_queue_in_process(self):
- queue = multiprocessing.Queue()
- proc = multiprocessing.Process(target=_test_process, args=(queue,))
+ proc = multiprocessing.Process(target=_test_process)
proc.start()
proc.join()
# Mixins
#
-class ProcessesMixin(object):
+class BaseMixin(object):
+ @classmethod
+ def setUpClass(cls):
+ cls.dangling = (multiprocessing.process._dangling.copy(),
+ threading._dangling.copy())
+
+ @classmethod
+ def tearDownClass(cls):
+ # bpo-26762: Some multiprocessing objects like Pool create reference
+ # cycles. Trigger a garbage collection to break these cycles.
+ test.support.gc_collect()
+
+ processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
+ if processes:
+ print('Warning -- Dangling processes: %s' % processes,
+ file=sys.stderr)
+ processes = None
+
+ threads = set(threading._dangling) - set(cls.dangling[1])
+ if threads:
+ print('Warning -- Dangling threads: %s' % threads,
+ file=sys.stderr)
+ threads = None
+
+
+class ProcessesMixin(BaseMixin):
TYPE = 'processes'
Process = multiprocessing.Process
connection = multiprocessing.connection
RawArray = staticmethod(multiprocessing.RawArray)
-class ManagerMixin(object):
+class ManagerMixin(BaseMixin):
TYPE = 'manager'
Process = multiprocessing.Process
Queue = property(operator.attrgetter('manager.Queue'))
@classmethod
def setUpClass(cls):
+ super().setUpClass()
cls.manager = multiprocessing.Manager()
@classmethod
# only the manager process should be returned by active_children()
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
+ start_time = time.monotonic()
t = 0.01
- while len(multiprocessing.active_children()) > 1 and t < 5:
+ while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
+ dt = time.monotonic() - start_time
+ if dt >= 5.0:
+ print("Warning -- multiprocessing.Manager still has %s active "
+ "children after %s seconds"
+ % (multiprocessing.active_children(), dt),
+ file=sys.stderr)
+ break
+
gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
# This is not really an error since some tests do not
# ensure that all processes which hold a reference to a
# managed object have been joined.
- print('Shared objects which still exist at manager shutdown:')
+ print('Warning -- Shared objects which still exist at manager '
+ 'shutdown:')
print(cls.manager._debug_info())
cls.manager.shutdown()
cls.manager.join()
cls.manager = None
+ super().tearDownClass()
+
-class ThreadsMixin(object):
+class ThreadsMixin(BaseMixin):
TYPE = 'threads'
Process = multiprocessing.dummy.Process
connection = multiprocessing.dummy.connection
multiprocessing.get_logger().setLevel(LOG_LEVEL)
def tearDownModule():
+ need_sleep = False
+
+ # bpo-26762: Some multiprocessing objects like Pool create reference
+ # cycles. Trigger a garbage collection to break these cycles.
+ test.support.gc_collect()
+
multiprocessing.set_start_method(old_start_method[0], force=True)
# pause a bit so we don't get warning about dangling threads/processes
- time.sleep(0.5)
+ processes = set(multiprocessing.process._dangling) - set(dangling[0])
+ if processes:
+ need_sleep = True
+ print('Warning -- Dangling processes: %s' % processes,
+ file=sys.stderr)
+ processes = None
+
+ threads = set(threading._dangling) - set(dangling[1])
+ if threads:
+ need_sleep = True
+ print('Warning -- Dangling threads: %s' % threads,
+ file=sys.stderr)
+ threads = None
+
+ # Sleep 500 ms to give time to child processes to complete.
+ if need_sleep:
+ time.sleep(0.5)
multiprocessing.process._cleanup()
- gc.collect()
- tmp = set(multiprocessing.process._dangling) - set(dangling[0])
- if tmp:
- print('Dangling processes:', tmp, file=sys.stderr)
- del tmp
- tmp = set(threading._dangling) - set(dangling[1])
- if tmp:
- print('Dangling threads:', tmp, file=sys.stderr)
+ test.support.gc_collect()
remote_globs['setUpModule'] = setUpModule
remote_globs['tearDownModule'] = tearDownModule