From 64e538bc703e423a04ab435c4eab6b950b8aef7e Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 1 Jun 2018 19:39:56 +0200 Subject: [PATCH] bpo-33532: Fix test_multiprocessing_forkserver.test_ignore() (GH-7322) Use also support.SOCK_MAX_SIZE, not only support.PIPE_MAX_SIZE, to get the size for a blocking send into a multiprocessing pipe. Replace also test.support with support. --- Lib/test/_test_multiprocessing.py | 64 ++++++++++++++++--------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d7eb69bb8b..0a82026f4d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -20,14 +20,14 @@ import logging import struct import operator import weakref -import test.support +from test import support import test.support.script_helper # Skip tests if _multiprocessing wasn't built. -_multiprocessing = test.support.import_module('_multiprocessing') +_multiprocessing = support.import_module('_multiprocessing') # Skip tests if sem_open implementation is broken. -test.support.import_module('multiprocessing.synchronize') +support.import_module('multiprocessing.synchronize') # import threading after _multiprocessing to raise a more relevant error # message: "No module named _multiprocessing". _multiprocessing is not compiled # without thread support. @@ -567,8 +567,8 @@ class _TestSubclassingProcess(BaseTestCase): if self.TYPE == "threads": self.skipTest('test not appropriate for {}'.format(self.TYPE)) - testfn = test.support.TESTFN - self.addCleanup(test.support.unlink, testfn) + testfn = support.TESTFN + self.addCleanup(support.unlink, testfn) proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) proc.start() proc.join() @@ -597,8 +597,8 @@ class _TestSubclassingProcess(BaseTestCase): if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) - testfn = test.support.TESTFN - self.addCleanup(test.support.unlink, testfn) + testfn = support.TESTFN + self.addCleanup(support.unlink, testfn) for reason in ( [1, 2, 3], @@ -853,7 +853,7 @@ class _TestQueue(BaseTestCase): close_queue(queue) def test_no_import_lock_contention(self): - with test.support.temp_cwd(): + with support.temp_cwd(): module_name = 'imported_by_an_imported_module' with open(module_name + '.py', 'w') as f: f.write("""if 1: @@ -866,7 +866,7 @@ class _TestQueue(BaseTestCase): del q """) - with test.support.DirsOnSysPath(os.getcwd()): + with support.DirsOnSysPath(os.getcwd()): try: __import__(module_name) except pyqueue.Empty: @@ -891,7 +891,7 @@ class _TestQueue(BaseTestCase): class NotSerializable(object): def __reduce__(self): raise AttributeError - with test.support.captured_stderr(): + with support.captured_stderr(): q = self.Queue() q.put(NotSerializable()) q.put(True) @@ -2194,7 +2194,7 @@ class _TestPool(BaseTestCase): self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) self.assertIn('raise RuntimeError(123) # some comment', cause.tb) - with test.support.captured_stderr() as f1: + with support.captured_stderr() as f1: try: raise exc except RuntimeError: @@ -2476,7 +2476,7 @@ class _TestRemoteManager(BaseTestCase): authkey = os.urandom(32) manager = QueueManager( - address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER + address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER ) manager.start() @@ -2513,7 +2513,7 @@ class _TestManagerRestart(BaseTestCase): def test_rapid_restart(self): authkey = os.urandom(32) manager = QueueManager( - address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) + address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER) srvr = manager.get_server() addr = srvr.address # Close the connection.Listener socket which gets opened as a part @@ -2736,14 +2736,14 @@ class _TestConnection(BaseTestCase): p = self.Process(target=self._writefd, args=(child_conn, b"foo")) p.daemon = True p.start() - self.addCleanup(test.support.unlink, test.support.TESTFN) - with open(test.support.TESTFN, "wb") as f: + self.addCleanup(support.unlink, support.TESTFN) + with open(support.TESTFN, "wb") as f: fd = f.fileno() if msvcrt: fd = msvcrt.get_osfhandle(fd) reduction.send_handle(conn, fd, p.pid) p.join() - with open(test.support.TESTFN, "rb") as f: + with open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"foo") @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") @@ -2762,8 +2762,8 @@ class _TestConnection(BaseTestCase): p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) p.daemon = True p.start() - self.addCleanup(test.support.unlink, test.support.TESTFN) - with open(test.support.TESTFN, "wb") as f: + self.addCleanup(support.unlink, support.TESTFN) + with open(support.TESTFN, "wb") as f: fd = f.fileno() for newfd in range(256, MAXFD): if not self._is_fd_assigned(newfd): @@ -2776,7 +2776,7 @@ class _TestConnection(BaseTestCase): finally: os.close(newfd) p.join() - with open(test.support.TESTFN, "rb") as f: + with open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"bar") @classmethod @@ -2986,7 +2986,7 @@ class _TestPicklingConnections(BaseTestCase): l.close() l = socket.socket() - l.bind((test.support.HOST, 0)) + l.bind((support.HOST, 0)) l.listen() conn.send(l.getsockname()) new_conn, addr = l.accept() @@ -3336,7 +3336,7 @@ class _TestFinalize(BaseTestCase): gc.set_threshold(5, 5, 5) threads = [threading.Thread(target=run_finalizers), threading.Thread(target=make_finalizers)] - with test.support.start_threads(threads): + with support.start_threads(threads): time.sleep(4.0) # Wait a bit to trigger race condition finish = True if exc is not None: @@ -3697,7 +3697,7 @@ class TestWait(unittest.TestCase): def test_wait_socket(self, slow=False): from multiprocessing.connection import wait l = socket.socket() - l.bind((test.support.HOST, 0)) + l.bind((support.HOST, 0)) l.listen() addr = l.getsockname() readers = [] @@ -3910,11 +3910,11 @@ class TestNoForkBomb(unittest.TestCase): sm = multiprocessing.get_start_method() name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') if sm != 'fork': - rc, out, err = test.support.script_helper.assert_python_failure(name, sm) + rc, out, err = support.script_helper.assert_python_failure(name, sm) self.assertEqual(out, b'') self.assertIn(b'RuntimeError', err) else: - rc, out, err = test.support.script_helper.assert_python_ok(name, sm) + rc, out, err = support.script_helper.assert_python_ok(name, sm) self.assertEqual(out.rstrip(), b'123') self.assertEqual(err, b'') @@ -4021,6 +4021,9 @@ class TestCloseFds(unittest.TestCase): class TestIgnoreEINTR(unittest.TestCase): + # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block + CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) + @classmethod def _test_ignore(cls, conn): def handler(signum, frame): @@ -4029,7 +4032,7 @@ class TestIgnoreEINTR(unittest.TestCase): conn.send('ready') x = conn.recv() conn.send(x) - conn.send_bytes(b'x' * test.support.PIPE_MAX_SIZE) + conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') def test_ignore(self): @@ -4048,8 +4051,7 @@ class TestIgnoreEINTR(unittest.TestCase): self.assertEqual(conn.recv(), 1234) time.sleep(0.1) os.kill(p.pid, signal.SIGUSR1) - self.assertEqual(conn.recv_bytes(), - b'x' * test.support.PIPE_MAX_SIZE) + self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) time.sleep(0.1) p.join() finally: @@ -4145,7 +4147,7 @@ class TestStartMethod(unittest.TestCase): if multiprocessing.get_start_method() != 'forkserver': self.skipTest("test only relevant for 'forkserver' method") name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') - rc, out, err = test.support.script_helper.assert_python_ok(name) + rc, out, err = support.script_helper.assert_python_ok(name) out = out.decode() err = err.decode() if out.rstrip() != 'ok' or err != '': @@ -4279,7 +4281,7 @@ class BaseMixin(object): def tearDownClass(cls): # bpo-26762: Some multiprocessing objects like Pool create reference # cycles. Trigger a garbage collection to break these cycles. - test.support.gc_collect() + support.gc_collect() processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) if processes: @@ -4458,7 +4460,7 @@ def install_tests_in_module_dict(remote_globs, start_method): # bpo-26762: Some multiprocessing objects like Pool create reference # cycles. Trigger a garbage collection to break these cycles. - test.support.gc_collect() + support.gc_collect() multiprocessing.set_start_method(old_start_method[0], force=True) # pause a bit so we don't get warning about dangling threads/processes @@ -4480,7 +4482,7 @@ def install_tests_in_module_dict(remote_globs, start_method): if need_sleep: time.sleep(0.5) multiprocessing.process._cleanup() - test.support.gc_collect() + support.gc_collect() remote_globs['setUpModule'] = setUpModule remote_globs['tearDownModule'] = tearDownModule -- 2.40.0