From: Victor Stinner Date: Thu, 24 Mar 2011 15:39:07 +0000 (+0100) Subject: Merge 3.1 X-Git-Tag: v3.2.1b1~220 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=900189b41457d9d4bd0ddffc08f089eab73ec63a;p=python Merge 3.1 --- 900189b41457d9d4bd0ddffc08f089eab73ec63a diff --cc Lib/test/test_multiprocessing.py index 644145fffb,f4031de61f..55b8cdd4c9 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@@ -1088,85 -1084,7 +1088,85 @@@ class _TestPool(BaseTestCase) self.pool.terminate() join = TimingWrapper(self.pool.join) join() - self.assertTrue(join.elapsed < 0.5) - self.assertLess(join.elapsed, 0.2) ++ self.assertLess(join.elapsed, 0.5) + +def raising(): + raise KeyError("key") + +def unpickleable_result(): + return lambda: 42 + +class _TestPoolWorkerErrors(BaseTestCase): + ALLOWED_TYPES = ('processes', ) + + def test_async_error_callback(self): + p = multiprocessing.Pool(2) + + scratchpad = [None] + def errback(exc): + scratchpad[0] = exc + + res = p.apply_async(raising, error_callback=errback) + self.assertRaises(KeyError, res.get) + self.assertTrue(scratchpad[0]) + self.assertIsInstance(scratchpad[0], KeyError) + + p.close() + p.join() + + def test_unpickleable_result(self): + from multiprocessing.pool import MaybeEncodingError + p = multiprocessing.Pool(2) + + # Make sure we don't lose pool processes because of encoding errors. + for iteration in range(20): + + scratchpad = [None] + def errback(exc): + scratchpad[0] = exc + + res = p.apply_async(unpickleable_result, error_callback=errback) + self.assertRaises(MaybeEncodingError, res.get) + wrapped = scratchpad[0] + self.assertTrue(wrapped) + self.assertIsInstance(scratchpad[0], MaybeEncodingError) + self.assertIsNotNone(wrapped.exc) + self.assertIsNotNone(wrapped.value) + + p.close() + p.join() + +class _TestPoolWorkerLifetime(BaseTestCase): + ALLOWED_TYPES = ('processes', ) + + def test_pool_worker_lifetime(self): + p = multiprocessing.Pool(3, maxtasksperchild=10) + self.assertEqual(3, len(p._pool)) + origworkerpids = [w.pid for w in p._pool] + # Run many tasks so each worker gets replaced (hopefully) + results = [] + for i in range(100): + results.append(p.apply_async(sqr, (i, ))) + # Fetch the results and verify we got the right answers, + # also ensuring all the tasks have completed. + for (j, res) in enumerate(results): + self.assertEqual(res.get(), sqr(j)) + # Refill the pool + p._repopulate_pool() + # Wait until all workers are alive + countdown = 5 + while countdown and not all(w.is_alive() for w in p._pool): + countdown -= 1 + time.sleep(DELTA) + finalworkerpids = [w.pid for w in p._pool] + # All pids should be assigned. See issue #7805. + self.assertNotIn(None, origworkerpids) + self.assertNotIn(None, finalworkerpids) + # Finally, check that the worker pids have changed + self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) + p.close() + p.join() + # # Test that manager has expected number of shared objects left #