self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv
+ def _check_running(self):
+ if self._state != RUN:
+ raise ValueError("Pool not running")
+
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
- if self._state != RUN:
- raise ValueError("Pool not running")
+ self._check_running()
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put(
'''
Like `imap()` method but ordering of results is arbitrary.
'''
- if self._state != RUN:
- raise ValueError("Pool not running")
+ self._check_running()
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put(
'''
Asynchronous version of `apply()` method.
'''
- if self._state != RUN:
- raise ValueError("Pool not running")
+ self._check_running()
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result
'''
Helper function to implement map, starmap and their async counterparts.
'''
- if self._state != RUN:
- raise ValueError("Pool not running")
+ self._check_running()
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
p.join()
def __enter__(self):
+ self._check_running()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)
+ def test_enter(self):
+ if self.TYPE == 'manager':
+ self.skipTest("test not applicable to manager")
+
+ pool = self.Pool(1)
+ with pool:
+ pass
+ # call pool.terminate()
+ # pool is no longer running
+
+ with self.assertRaises(ValueError):
+ # bpo-35477: pool.__enter__() fails if the pool is not running
+ with pool:
+ pass
+ pool.join()
+
def raising():
raise KeyError("key")