]> granicus.if.org Git - python/commitdiff
Merge 3.1
authorVictor Stinner <victor.stinner@haypocalc.com>
Thu, 24 Mar 2011 15:39:07 +0000 (16:39 +0100)
committerVictor Stinner <victor.stinner@haypocalc.com>
Thu, 24 Mar 2011 15:39:07 +0000 (16:39 +0100)
1  2 
Lib/test/test_multiprocessing.py

index 644145fffbe95e72820bf55dec4721a8aa211383,f4031de61fdba08e7bf73e9ffde898145a0f5e93..55b8cdd4c94df0c8d1a3d06f45935a8fc9c513ef
@@@ -1088,85 -1084,7 +1088,85 @@@ class _TestPool(BaseTestCase)
          self.pool.terminate()
          join = TimingWrapper(self.pool.join)
          join()
-         self.assertTrue(join.elapsed < 0.5)
 -        self.assertLess(join.elapsed, 0.2)
++        self.assertLess(join.elapsed, 0.5)
 +
 +def raising():
 +    raise KeyError("key")
 +
 +def unpickleable_result():
 +    return lambda: 42
 +
 +class _TestPoolWorkerErrors(BaseTestCase):
 +    ALLOWED_TYPES = ('processes', )
 +
 +    def test_async_error_callback(self):
 +        p = multiprocessing.Pool(2)
 +
 +        scratchpad = [None]
 +        def errback(exc):
 +            scratchpad[0] = exc
 +
 +        res = p.apply_async(raising, error_callback=errback)
 +        self.assertRaises(KeyError, res.get)
 +        self.assertTrue(scratchpad[0])
 +        self.assertIsInstance(scratchpad[0], KeyError)
 +
 +        p.close()
 +        p.join()
 +
 +    def test_unpickleable_result(self):
 +        from multiprocessing.pool import MaybeEncodingError
 +        p = multiprocessing.Pool(2)
 +
 +        # Make sure we don't lose pool processes because of encoding errors.
 +        for iteration in range(20):
 +
 +            scratchpad = [None]
 +            def errback(exc):
 +                scratchpad[0] = exc
 +
 +            res = p.apply_async(unpickleable_result, error_callback=errback)
 +            self.assertRaises(MaybeEncodingError, res.get)
 +            wrapped = scratchpad[0]
 +            self.assertTrue(wrapped)
 +            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
 +            self.assertIsNotNone(wrapped.exc)
 +            self.assertIsNotNone(wrapped.value)
 +
 +        p.close()
 +        p.join()
 +
 +class _TestPoolWorkerLifetime(BaseTestCase):
 +    ALLOWED_TYPES = ('processes', )
 +
 +    def test_pool_worker_lifetime(self):
 +        p = multiprocessing.Pool(3, maxtasksperchild=10)
 +        self.assertEqual(3, len(p._pool))
 +        origworkerpids = [w.pid for w in p._pool]
 +        # Run many tasks so each worker gets replaced (hopefully)
 +        results = []
 +        for i in range(100):
 +            results.append(p.apply_async(sqr, (i, )))
 +        # Fetch the results and verify we got the right answers,
 +        # also ensuring all the tasks have completed.
 +        for (j, res) in enumerate(results):
 +            self.assertEqual(res.get(), sqr(j))
 +        # Refill the pool
 +        p._repopulate_pool()
 +        # Wait until all workers are alive
 +        countdown = 5
 +        while countdown and not all(w.is_alive() for w in p._pool):
 +            countdown -= 1
 +            time.sleep(DELTA)
 +        finalworkerpids = [w.pid for w in p._pool]
 +        # All pids should be assigned.  See issue #7805.
 +        self.assertNotIn(None, origworkerpids)
 +        self.assertNotIn(None, finalworkerpids)
 +        # Finally, check that the worker pids have changed
 +        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
 +        p.close()
 +        p.join()
 +
  #
  # Test that manager has expected number of shared objects left
  #