thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
+ task = None
i = -1
- for i, task in enumerate(taskseq):
- if thread._state:
- util.debug('task handler found thread._state != RUN')
- break
- try:
- put(task)
- except Exception as e:
- job, ind = task[:2]
+ try:
+ for i, task in enumerate(taskseq):
+ if thread._state:
+ util.debug('task handler found thread._state != RUN')
+ break
try:
- cache[job]._set(ind, (False, e))
- except KeyError:
- pass
- else:
+ put(task)
+ except Exception as e:
+ job, ind = task[:2]
+ try:
+ cache[job]._set(ind, (False, e))
+ except KeyError:
+ pass
+ else:
+ if set_length:
+ util.debug('doing set_length()')
+ set_length(i+1)
+ continue
+ break
+ except Exception as ex:
+ job, ind = task[:2] if task else (0, 0)
+ if job in cache:
+ cache[job]._set(ind + 1, (False, ex))
if set_length:
util.debug('doing set_length()')
set_length(i+1)
- continue
- break
else:
util.debug('task handler got sentinel')
def mul(x, y):
return x*y
+class SayWhenError(ValueError): pass
+
+def exception_throwing_generator(total, when):
+ for i in range(total):
+ if i == when:
+ raise SayWhenError("Somebody said when")
+ yield i
+
class _TestPool(BaseTestCase):
@classmethod
self.assertEqual(next(it), i*i)
self.assertRaises(StopIteration, it.__next__)
+ def test_imap_handle_iterable_exception(self):
+ if self.TYPE == 'manager':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
+ for i in range(3):
+ self.assertEqual(next(it), i*i)
+ self.assertRaises(SayWhenError, it.__next__)
+
+ # SayWhenError seen at start of problematic chunk's results
+ it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
+ for i in range(6):
+ self.assertEqual(next(it), i*i)
+ self.assertRaises(SayWhenError, it.__next__)
+ it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
+ for i in range(4):
+ self.assertEqual(next(it), i*i)
+ self.assertRaises(SayWhenError, it.__next__)
+
def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, list(range(1000)))
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
+ def test_imap_unordered_handle_iterable_exception(self):
+ if self.TYPE == 'manager':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ it = self.pool.imap_unordered(sqr,
+ exception_throwing_generator(10, 3),
+ 1)
+ with self.assertRaises(SayWhenError):
+ # imap_unordered makes it difficult to anticipate the SayWhenError
+ for i in range(10):
+ self.assertEqual(next(it), i*i)
+
+ it = self.pool.imap_unordered(sqr,
+ exception_throwing_generator(20, 7),
+ 2)
+ with self.assertRaises(SayWhenError):
+ for i in range(20):
+ self.assertEqual(next(it), i*i)
+
def test_make_pool(self):
self.assertRaises(ValueError, multiprocessing.Pool, -1)
self.assertRaises(ValueError, multiprocessing.Pool, 0)