self.pool.terminate()
join = TimingWrapper(self.pool.join)
join()
- self.assertTrue(join.elapsed < 0.5)
- self.assertLess(join.elapsed, 0.2)
++ self.assertLess(join.elapsed, 0.5)
+
+def raising():
+ raise KeyError("key")
+
+def unpickleable_result():
+ return lambda: 42
+
+class _TestPoolWorkerErrors(BaseTestCase):
+ ALLOWED_TYPES = ('processes', )
+
+ def test_async_error_callback(self):
+ p = multiprocessing.Pool(2)
+
+ scratchpad = [None]
+ def errback(exc):
+ scratchpad[0] = exc
+
+ res = p.apply_async(raising, error_callback=errback)
+ self.assertRaises(KeyError, res.get)
+ self.assertTrue(scratchpad[0])
+ self.assertIsInstance(scratchpad[0], KeyError)
+
+ p.close()
+ p.join()
+
+ def test_unpickleable_result(self):
+ from multiprocessing.pool import MaybeEncodingError
+ p = multiprocessing.Pool(2)
+
+ # Make sure we don't lose pool processes because of encoding errors.
+ for iteration in range(20):
+
+ scratchpad = [None]
+ def errback(exc):
+ scratchpad[0] = exc
+
+ res = p.apply_async(unpickleable_result, error_callback=errback)
+ self.assertRaises(MaybeEncodingError, res.get)
+ wrapped = scratchpad[0]
+ self.assertTrue(wrapped)
+ self.assertIsInstance(scratchpad[0], MaybeEncodingError)
+ self.assertIsNotNone(wrapped.exc)
+ self.assertIsNotNone(wrapped.value)
+
+ p.close()
+ p.join()
+
+class _TestPoolWorkerLifetime(BaseTestCase):
+ ALLOWED_TYPES = ('processes', )
+
+ def test_pool_worker_lifetime(self):
+ p = multiprocessing.Pool(3, maxtasksperchild=10)
+ self.assertEqual(3, len(p._pool))
+ origworkerpids = [w.pid for w in p._pool]
+ # Run many tasks so each worker gets replaced (hopefully)
+ results = []
+ for i in range(100):
+ results.append(p.apply_async(sqr, (i, )))
+ # Fetch the results and verify we got the right answers,
+ # also ensuring all the tasks have completed.
+ for (j, res) in enumerate(results):
+ self.assertEqual(res.get(), sqr(j))
+ # Refill the pool
+ p._repopulate_pool()
+ # Wait until all workers are alive
+ countdown = 5
+ while countdown and not all(w.is_alive() for w in p._pool):
+ countdown -= 1
+ time.sleep(DELTA)
+ finalworkerpids = [w.pid for w in p._pool]
+ # All pids should be assigned. See issue #7805.
+ self.assertNotIn(None, origworkerpids)
+ self.assertNotIn(None, finalworkerpids)
+ # Finally, check that the worker pids have changed
+ self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
+ p.close()
+ p.join()
+
#
# Test that manager has expected number of shared objects left
#