]> granicus.if.org Git - python/commitdiff
Issue #19425 -- a pickling error should not cause pool to hang.
authorRichard Oudkerk <shibturn@gmail.com>
Mon, 28 Oct 2013 23:11:58 +0000 (23:11 +0000)
committerRichard Oudkerk <shibturn@gmail.com>
Mon, 28 Oct 2013 23:11:58 +0000 (23:11 +0000)
Lib/multiprocessing/pool.py
Lib/test/test_multiprocessing.py

index fc9d90402b31965d5b30f93efd1724503fe39d4c..0f2dab48ebddd6846f3c10ad55be8128e5932080 100644 (file)
@@ -147,7 +147,8 @@ class Pool(object):
 
         self._task_handler = threading.Thread(
             target=Pool._handle_tasks,
-            args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+            args=(self._taskqueue, self._quick_put, self._outqueue,
+                  self._pool, self._cache)
             )
         self._task_handler.daemon = True
         self._task_handler._state = RUN
@@ -338,7 +339,7 @@ class Pool(object):
         debug('worker handler exiting')
 
     @staticmethod
-    def _handle_tasks(taskqueue, put, outqueue, pool):
+    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
         thread = threading.current_thread()
 
         for taskseq, set_length in iter(taskqueue.get, None):
@@ -349,9 +350,12 @@ class Pool(object):
                     break
                 try:
                     put(task)
-                except IOError:
-                    debug('could not put task on queue')
-                    break
+                except Exception as e:
+                    job, ind = task[:2]
+                    try:
+                        cache[job]._set(ind, (False, e))
+                    except KeyError:
+                        pass
             else:
                 if set_length:
                     debug('doing set_length()')
index d5582aab82047412f2f34951c0f6b40c08b16a0c..d6e530d7648309434ae6c28f3e1dc3dfaea1fb90 100644 (file)
@@ -1691,6 +1691,16 @@ class _TestPool(BaseTestCase):
         self.assertEqual(2, len(call_args))
         self.assertIsInstance(call_args[1], ValueError)
 
+    def test_map_unplicklable(self):
+        # Issue #19425 -- failure to pickle should not cause a hang
+        if self.TYPE == 'threads':
+            return
+        class A(object):
+            def __reduce__(self):
+                raise RuntimeError('cannot pickle')
+        with self.assertRaises(RuntimeError):
+            self.pool.map(sqr, [A()]*10)
+
     def test_map_chunksize(self):
         try:
             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)