]> granicus.if.org Git - python/commitdiff
Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize*
authorAntoine Pitrou <solipsis@pitrou.net>
Sat, 4 Oct 2014 18:20:10 +0000 (20:20 +0200)
committerAntoine Pitrou <solipsis@pitrou.net>
Sat, 4 Oct 2014 18:20:10 +0000 (20:20 +0200)
argument to allow batching of tasks in child processes and improve
performance of ProcessPoolExecutor.  Patch by Dan O'Reilly.

Doc/library/concurrent.futures.rst
Lib/concurrent/futures/_base.py
Lib/concurrent/futures/process.py
Lib/test/test_concurrent_futures.py

index e487817dfb050ad83952b3efe66ca7e4c10b7128..2bebd4bf7cc4d9c8aaf2d5c510b5487d1a097d31 100644 (file)
@@ -38,7 +38,7 @@ Executor Objects
               future = executor.submit(pow, 323, 1235)
               print(future.result())
 
-    .. method:: map(func, *iterables, timeout=None)
+    .. method:: map(func, *iterables, timeout=None, chunksize=1)
 
        Equivalent to :func:`map(func, *iterables) <map>` except *func* is executed
        asynchronously and several calls to *func* may be made concurrently.  The
@@ -48,7 +48,16 @@ Executor Objects
        *timeout* can be an int or a float.  If *timeout* is not specified or
        ``None``, there is no limit to the wait time.  If a call raises an
        exception, then that exception will be raised when its value is
-       retrieved from the iterator.
+       retrieved from the iterator. When using :class:`ProcessPoolExecutor`, this
+       method chops *iterables* into a number of chunks which it submits to the
+       pool as separate tasks. The (approximate) size of these chunks can be
+       specified by setting *chunksize* to a positive integer. For very long
+       iterables, using a large value for *chunksize* can significantly improve
+       performance compared to the default size of 1. With :class:`ThreadPoolExecutor`,
+       *chunksize* has no effect.
+
+       .. versionchanged:: 3.5
+          Added the *chunksize* argument.
 
     .. method:: shutdown(wait=True)
 
index c13b3b6b290f181897bdca53fb44abe202e6a37d..9e447137adce8acad4c8f9c89c288054c7b6917d 100644 (file)
@@ -520,7 +520,7 @@ class Executor(object):
         """
         raise NotImplementedError()
 
-    def map(self, fn, *iterables, timeout=None):
+    def map(self, fn, *iterables, timeout=None, chunksize=1):
         """Returns a iterator equivalent to map(fn, iter).
 
         Args:
@@ -528,6 +528,10 @@ class Executor(object):
                 passed iterables.
             timeout: The maximum number of seconds to wait. If None, then there
                 is no limit on the wait time.
+            chunksize: The size of the chunks the iterable will be broken into
+                before being passed to a child process. This argument is only
+                used by ProcessPoolExecutor; it is ignored by
+                ThreadPoolExecutor.
 
         Returns:
             An iterator equivalent to: map(func, *iterables) but the calls may
index 12993901e36c88c824aa7a6c2488c2da442d3d5e..fc64dbe84bfdc13726e458b75f4b7c2ac1021898 100644 (file)
@@ -55,6 +55,8 @@ from multiprocessing import SimpleQueue
 from multiprocessing.connection import wait
 import threading
 import weakref
+from functools import partial
+import itertools
 
 # Workers are created as daemon threads and processes. This is done to allow the
 # interpreter to exit when there are still idle processes in a
@@ -108,6 +110,26 @@ class _CallItem(object):
         self.args = args
         self.kwargs = kwargs
 
+def _get_chunks(*iterables, chunksize):
+    """ Iterates over zip()ed iterables in chunks. """
+    it = zip(*iterables)
+    while True:
+        chunk = tuple(itertools.islice(it, chunksize))
+        if not chunk:
+            return
+        yield chunk
+
+def _process_chunk(fn, chunk):
+    """ Processes a chunk of an iterable passed to map.
+
+    Runs the function passed to map() on a chunk of the
+    iterable passed to map.
+
+    This function is run in a separate process.
+
+    """
+    return [fn(*args) for args in chunk]
+
 def _process_worker(call_queue, result_queue):
     """Evaluates calls from call_queue and places the results in result_queue.
 
@@ -411,6 +433,35 @@ class ProcessPoolExecutor(_base.Executor):
             return f
     submit.__doc__ = _base.Executor.submit.__doc__
 
+    def map(self, fn, *iterables, timeout=None, chunksize=1):
+        """Returns a iterator equivalent to map(fn, iter).
+
+        Args:
+            fn: A callable that will take as many arguments as there are
+                passed iterables.
+            timeout: The maximum number of seconds to wait. If None, then there
+                is no limit on the wait time.
+            chunksize: If greater than one, the iterables will be chopped into
+                chunks of size chunksize and submitted to the process pool.
+                If set to one, the items in the list will be sent one at a time.
+
+        Returns:
+            An iterator equivalent to: map(func, *iterables) but the calls may
+            be evaluated out-of-order.
+
+        Raises:
+            TimeoutError: If the entire result iterator could not be generated
+                before the given timeout.
+            Exception: If fn(*args) raises for any values.
+        """
+        if chunksize < 1:
+            raise ValueError("chunksize must be >= 1.")
+
+        results = super().map(partial(_process_chunk, fn),
+                              _get_chunks(*iterables, chunksize=chunksize),
+                              timeout=timeout)
+        return itertools.chain.from_iterable(results)
+
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown_thread = True
index 11560e65ac02c99a93eca7f203935ed015fca461..7f92618022f44ce85aecafbfd1d9d8787649128c 100644 (file)
@@ -464,6 +464,22 @@ class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase)
         # Submitting other jobs fails as well.
         self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
 
+    def test_map_chunksize(self):
+        def bad_map():
+            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
+
+        ref = list(map(pow, range(40), range(40)))
+        self.assertEqual(
+            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
+            ref)
+        self.assertEqual(
+            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
+            ref)
+        self.assertEqual(
+            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
+            ref)
+        self.assertRaises(ValueError, bad_map)
+
 
 class FutureTests(unittest.TestCase):
     def test_done_callback_with_result(self):