]> granicus.if.org Git - python/commitdiff
Issue #9400: Partial backport of fix for #9244
authorRichard Oudkerk <shibturn@gmail.com>
Wed, 2 May 2012 15:36:26 +0000 (16:36 +0100)
committerRichard Oudkerk <shibturn@gmail.com>
Wed, 2 May 2012 15:36:26 +0000 (16:36 +0100)
In multiprocessing, a pool worker process would die
if the result/error could not be pickled.  This could
cause pool methods to hang.

In 3.x this was fixed by 0aa8af79359d (which also added
an error_callback argument to some methods), but the fix
was not back ported.

Lib/multiprocessing/pool.py
Lib/test/test_multiprocessing.py

index bcbf7e350310c9eeea9025a91e6e750ff2655287..99b4df472d5a75f587f63913b1f9710440dc1859 100644 (file)
@@ -68,6 +68,23 @@ def mapstar(args):
 # Code run by worker processes
 #
 
+class MaybeEncodingError(Exception):
+    """Wraps possible unpickleable errors, so they can be
+    safely sent through the socket."""
+
+    def __init__(self, exc, value):
+        self.exc = repr(exc)
+        self.value = repr(value)
+        super(MaybeEncodingError, self).__init__(self.exc, self.value)
+
+    def __str__(self):
+        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
+                                                             self.exc)
+
+    def __repr__(self):
+        return "<MaybeEncodingError: %s>" % str(self)
+
+
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
     assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
     put = outqueue.put
@@ -96,7 +113,13 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
             result = (True, func(*args, **kwds))
         except Exception, e:
             result = (False, e)
-        put((job, i, result))
+        try:
+            put((job, i, result))
+        except Exception as e:
+            wrapped = MaybeEncodingError(e, result[1])
+            debug("Possible encoding error while sending result: %s" % (
+                wrapped))
+            put((job, i, (False, wrapped)))
         completed += 1
     debug('worker exiting after %d tasks' % completed)
 
index e5258bb9d8ba7ba174a3fe3e78242c549deaef96..eeb768f7d7a509b48936796b66c1d980dfe695cc 100644 (file)
@@ -1152,6 +1152,24 @@ class _TestPool(BaseTestCase):
         join()
         self.assertTrue(join.elapsed < 0.2)
 
+def unpickleable_result():
+    return lambda: 42
+
+class _TestPoolWorkerErrors(BaseTestCase):
+    ALLOWED_TYPES = ('processes', )
+
+    def test_unpickleable_result(self):
+        from multiprocessing.pool import MaybeEncodingError
+        p = multiprocessing.Pool(2)
+
+        # Make sure we don't lose pool processes because of encoding errors.
+        for iteration in range(20):
+            res = p.apply_async(unpickleable_result)
+            self.assertRaises(MaybeEncodingError, res.get)
+
+        p.close()
+        p.join()
+
 class _TestPoolWorkerLifetime(BaseTestCase):
 
     ALLOWED_TYPES = ('processes', )