work_item.run()
# Delete references to object. See issue16284
del work_item
+
+ # attempt to increment idle count
+ executor = executor_reference()
+ if executor is not None:
+ executor._idle_semaphore.release()
+ del executor
continue
+
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
+ self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
+ # if idle threads are available, don't spin new threads
+ if self._idle_semaphore.acquire(timeout=0):
+ return
+
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
- # TODO(bquinlan): Should avoid creating new threads if there are more
- # idle threads than items in the work queue.
+
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
pass
def test_threads_terminate(self):
- self.executor.submit(mul, 21, 2)
- self.executor.submit(mul, 6, 7)
- self.executor.submit(mul, 3, 14)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(3):
+ self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._threads), 3)
+ for i in range(3):
+ sem.release()
self.executor.shutdown()
for t in self.executor._threads:
t.join()
self.assertEqual(executor._max_workers,
(os.cpu_count() or 1) * 5)
+ def test_saturation(self):
+ executor = self.executor_type(4)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(15 * executor._max_workers):
+ executor.submit(acquire_lock, sem)
+ self.assertEqual(len(executor._threads), executor._max_workers)
+ for i in range(15 * executor._max_workers):
+ sem.release()
+ executor.shutdown(wait=True)
+
+ def test_idle_thread_reuse(self):
+ executor = self.executor_type()
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._threads), 1)
+ executor.shutdown(wait=True)
+
class ProcessPoolExecutorTest(ExecutorTest):