]> granicus.if.org Git - python/commitdiff
Issue #12708: Add starmap() and starmap_async() methods (similar to itertools.starmap...
authorAntoine Pitrou <solipsis@pitrou.net>
Wed, 21 Dec 2011 10:03:24 +0000 (11:03 +0100)
committerAntoine Pitrou <solipsis@pitrou.net>
Wed, 21 Dec 2011 10:03:24 +0000 (11:03 +0100)
Patch by Hynek Schlawack.

Doc/library/multiprocessing.rst
Lib/multiprocessing/managers.py
Lib/multiprocessing/pool.py
Lib/test/test_multiprocessing.py
Misc/ACKS
Misc/NEWS

index 851b3cf7f571ed8865a03e1284d5dc5117005b13..39ff0a60c22a1ff400781d8937a36471b69db98e 100644 (file)
@@ -1669,6 +1669,24 @@ with the :class:`Pool` class.
       returned iterator should be considered arbitrary.  (Only when there is
       only one worker process is the order guaranteed to be "correct".)
 
+   .. method:: starmap(func, iterable[, chunksize])
+
+      Like :meth:`map` except that the elements of the `iterable` are expected
+      to be iterables that are unpacked as arguments.
+
+      Hence an `iterable` of `[(1,2), (3, 4)]` results in `[func(1,2),
+      func(3,4)]`.
+
+      .. versionadded:: 3.3
+
+   .. method:: starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
+
+      A combination of :meth:`starmap` and :meth:`map_async` that iterates over
+      `iterable` of iterables and calls `func` with the iterables unpacked.
+      Returns a result object.
+
+      .. versionadded:: 3.3
+
    .. method:: close()
 
       Prevents any more tasks from being submitted to the pool.  Once all the
index f42d353f0d1d568a362398b05a05391e2f429cd8..5cae4c1548b130b5161f422522abc44a42ea2bd4 100644 (file)
@@ -1066,11 +1066,12 @@ ArrayProxy = MakeProxyType('ArrayProxy', (
 
 PoolProxy = MakeProxyType('PoolProxy', (
     'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
-    'map', 'map_async', 'terminate'
+    'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
     ))
 PoolProxy._method_to_typeid_ = {
     'apply_async': 'AsyncResult',
     'map_async': 'AsyncResult',
+    'starmap_async': 'AsyncResult',
     'imap': 'Iterator',
     'imap_unordered': 'Iterator'
     }
index 0c29e644ffd42832677f1e0e6ed933e20610a95a..7039d1679edb4b688b041cad78876cc4a0e7cb02 100644 (file)
@@ -64,6 +64,9 @@ job_counter = itertools.count()
 def mapstar(args):
     return list(map(*args))
 
+def starmapstar(args):
+    return list(itertools.starmap(args[0], args[1]))
+
 #
 # Code run by worker processes
 #
@@ -248,7 +251,25 @@ class Pool(object):
         in a list that is returned.
         '''
         assert self._state == RUN
-        return self.map_async(func, iterable, chunksize).get()
+        return self._map_async(func, iterable, mapstar, chunksize).get()
+
+    def starmap(self, func, iterable, chunksize=None):
+        '''
+        Like `map()` method but the elements of the `iterable` are expected to
+        be iterables as well and will be unpacked as arguments. Hence
+        `func` and (a, b) becomes func(a, b).
+        '''
+        assert self._state == RUN
+        return self._map_async(func, iterable, starmapstar, chunksize).get()
+
+    def starmap_async(self, func, iterable, chunksize=None, callback=None,
+            error_callback=None):
+        '''
+        Asynchronous version of `starmap()` method.
+        '''
+        assert self._state == RUN
+        return self._map_async(func, iterable, starmapstar, chunksize,
+                               callback, error_callback)
 
     def imap(self, func, iterable, chunksize=1):
         '''
@@ -302,6 +323,13 @@ class Pool(object):
         Asynchronous version of `map()` method.
         '''
         assert self._state == RUN
+        return self._map_async(func, iterable, mapstar, chunksize)
+
+    def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
+            error_callback=None):
+        '''
+        Helper function to implement map, starmap and their async counterparts.
+        '''
         if not hasattr(iterable, '__len__'):
             iterable = list(iterable)
 
@@ -315,7 +343,7 @@ class Pool(object):
         task_batches = Pool._get_tasks(func, iterable, chunksize)
         result = MapResult(self._cache, chunksize, len(iterable), callback,
                            error_callback=error_callback)
-        self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+        self._taskqueue.put((((result._job, i, mapper, (x,), {})
                               for i, x in enumerate(task_batches)), None))
         return result
 
index b99201b2809de815aac8d46252aad70250c43cad..93cc11d96d416cdc50e104fdfe680105cd918583 100644 (file)
@@ -8,6 +8,7 @@ import unittest
 import queue as pyqueue
 import time
 import io
+import itertools
 import sys
 import os
 import gc
@@ -1125,6 +1126,9 @@ def sqr(x, wait=0.0):
     time.sleep(wait)
     return x*x
 
+def mul(x, y):
+    return x*y
+
 class _TestPool(BaseTestCase):
 
     def test_apply(self):
@@ -1138,6 +1142,20 @@ class _TestPool(BaseTestCase):
         self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
                          list(map(sqr, list(range(100)))))
 
+    def test_starmap(self):
+        psmap = self.pool.starmap
+        tuples = list(zip(range(10), range(9,-1, -1)))
+        self.assertEqual(psmap(mul, tuples),
+                         list(itertools.starmap(mul, tuples)))
+        tuples = list(zip(range(100), range(99,-1, -1)))
+        self.assertEqual(psmap(mul, tuples, chunksize=20),
+                         list(itertools.starmap(mul, tuples)))
+
+    def test_starmap_async(self):
+        tuples = list(zip(range(100), range(99,-1, -1)))
+        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
+                         list(itertools.starmap(mul, tuples)))
+
     def test_map_chunksize(self):
         try:
             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
index 269518cb02587c3dbe97f33d4df9c14d14b8ab2d..88b67e014e2f761e5749952163d24a4f15209e8c 100644 (file)
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -878,6 +878,7 @@ Michael Scharf
 Andreas Schawo
 Neil Schemenauer
 David Scherer
+Hynek Schlawack
 Bob Schmertz
 Gregor Schmid
 Ralf Schmitt
index 98889a5eaa9b59a3e572e5c7dc01e008e0e3d530..82f9d86be9ad4ee5976390787a3eab7880b9d736 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -419,6 +419,9 @@ Core and Builtins
 Library
 -------
 
+- Issue #12708: Add starmap() and starmap_async() methods (similar to
+  itertools.starmap()) to multiprocessing.Pool.  Patch by Hynek Schlawack.
+
 - Issue #1785: Fix inspect and pydoc with misbehaving descriptors.
 
 - Issue #13637: "a2b" functions in the binascii module now accept ASCII-only