))
-PoolProxy = MakeProxyType('PoolProxy', (
+BasePoolProxy = MakeProxyType('PoolProxy', (
'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
- 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
+ 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
))
-PoolProxy._method_to_typeid_ = {
+BasePoolProxy._method_to_typeid_ = {
'apply_async': 'AsyncResult',
'map_async': 'AsyncResult',
'starmap_async': 'AsyncResult',
'imap': 'Iterator',
'imap_unordered': 'Iterator'
}
+class PoolProxy(BasePoolProxy):
+ def __enter__(self):
+ return self
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.terminate()
#
# Definition of SyncManager
return "<MaybeEncodingError: %s>" % str(self)
-def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
+def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
+ wrap_exception=False):
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
get = inqueue.get
try:
result = (True, func(*args, **kwds))
except Exception as e:
- e = ExceptionWithTraceback(e, e.__traceback__)
+ if wrap_exception:
+ e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
put((job, i, result))
'''
Class which supports an async version of applying functions to arguments.
'''
+ _wrap_exception = True
+
def Process(self, *args, **kwds):
return self._ctx.Process(*args, **kwds)
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer,
- self._initargs, self._maxtasksperchild)
+ self._initargs, self._maxtasksperchild,
+ self._wrap_exception)
)
self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
#
class ThreadPool(Pool):
+ _wrap_exception = False
@staticmethod
def Process(*args, **kwds):
self.assertIn('raise RuntimeError(123) # some comment',
f1.getvalue())
+ @classmethod
+ def _test_wrapped_exception(cls):
+ raise RuntimeError('foo')
+
+ def test_wrapped_exception(self):
+ # Issue #20980: Should not wrap exception when using thread pool
+ with self.Pool(1) as p:
+ with self.assertRaises(RuntimeError):
+ p.apply(self._test_wrapped_exception)
+
+
def raising():
raise KeyError("key")
Library
-------
+- Issue #20980: Stop wrapping exception when using ThreadPool.
+
- Issue #20990: Fix issues found by pyflakes for multiprocessing.
- Issue #21015: SSL contexts will now automatically select an elliptic