]> granicus.if.org Git - python/commitdiff
Issue #28053: Applying refactorings, docs and other cleanup to follow.
authorDavin Potts <python@discontinuity.net>
Fri, 9 Sep 2016 23:03:10 +0000 (18:03 -0500)
committerDavin Potts <python@discontinuity.net>
Fri, 9 Sep 2016 23:03:10 +0000 (18:03 -0500)
13 files changed:
Lib/multiprocessing/connection.py
Lib/multiprocessing/context.py
Lib/multiprocessing/forkserver.py
Lib/multiprocessing/heap.py
Lib/multiprocessing/managers.py
Lib/multiprocessing/popen_forkserver.py
Lib/multiprocessing/popen_spawn_posix.py
Lib/multiprocessing/popen_spawn_win32.py
Lib/multiprocessing/queues.py
Lib/multiprocessing/reduction.py
Lib/multiprocessing/resource_sharer.py
Lib/multiprocessing/sharedctypes.py
Lib/multiprocessing/spawn.py

index d0a1b86b13ef1279fc428efe5e0f4026e56871a7..d49e8f0d32b62a4a4e6149e54b844672d047cde9 100644 (file)
@@ -20,11 +20,11 @@ import itertools
 
 import _multiprocessing
 
-from . import reduction
 from . import util
 
 from . import AuthenticationError, BufferTooShort
-from .reduction import ForkingPickler
+from .context import reduction
+_ForkingPickler = reduction.ForkingPickler
 
 try:
     import _winapi
@@ -203,7 +203,7 @@ class _ConnectionBase:
         """Send a (picklable) object"""
         self._check_closed()
         self._check_writable()
-        self._send_bytes(ForkingPickler.dumps(obj))
+        self._send_bytes(_ForkingPickler.dumps(obj))
 
     def recv_bytes(self, maxlength=None):
         """
@@ -248,7 +248,7 @@ class _ConnectionBase:
         self._check_closed()
         self._check_readable()
         buf = self._recv_bytes()
-        return ForkingPickler.loads(buf.getbuffer())
+        return _ForkingPickler.loads(buf.getbuffer())
 
     def poll(self, timeout=0.0):
         """Whether there is any input available to be read"""
index 63849f9d16e362fdda37caa8892aa204ee68b4e4..09455e2ec777b333569d749282fccf94332a1f71 100644 (file)
@@ -3,6 +3,7 @@ import sys
 import threading
 
 from . import process
+from . import reduction
 
 __all__ = []            # things are copied from here to __init__.py
 
@@ -198,6 +199,16 @@ class BaseContext(object):
     def set_start_method(self, method=None):
         raise ValueError('cannot set start method of concrete context')
 
+    @property
+    def reducer(self):
+        '''Controls how objects will be reduced to a form that can be
+        shared with other processes.'''
+        return globals().get('reduction')
+
+    @reducer.setter
+    def reducer(self, reduction):
+        globals()['reduction'] = reduction
+
     def _check_available(self):
         pass
 
@@ -245,7 +256,6 @@ class DefaultContext(BaseContext):
         if sys.platform == 'win32':
             return ['spawn']
         else:
-            from . import reduction
             if reduction.HAVE_SEND_HANDLE:
                 return ['fork', 'spawn', 'forkserver']
             else:
@@ -292,7 +302,6 @@ if sys.platform != 'win32':
         _name = 'forkserver'
         Process = ForkServerProcess
         def _check_available(self):
-            from . import reduction
             if not reduction.HAVE_SEND_HANDLE:
                 raise ValueError('forkserver start method not available')
 
index ad01ede0e06f8cea7541a2d378204b796c172aee..f2c179e4e0afaa68ffacc5e4e621f0fbc1b8a1ae 100644 (file)
@@ -9,7 +9,7 @@ import threading
 
 from . import connection
 from . import process
-from . import reduction
+from .context import reduction
 from . import semaphore_tracker
 from . import spawn
 from . import util
index 44d9638ff6ec4d9d4595d085ab14b7a4065d1c41..443321535ecc7ce9e11c79df1bc5c8516ab8e874 100644 (file)
@@ -14,8 +14,7 @@ import sys
 import tempfile
 import threading
 
-from . import context
-from . import reduction
+from .context import reduction, assert_spawning
 from . import util
 
 __all__ = ['BufferWrapper']
@@ -48,7 +47,7 @@ if sys.platform == 'win32':
             self._state = (self.size, self.name)
 
         def __getstate__(self):
-            context.assert_spawning(self)
+            assert_spawning(self)
             return self._state
 
         def __setstate__(self, state):
index c4dc972b80ebc448ea1428cbce2e8eec4598b8cf..b9ce84b2d85ddc4d6a561b3ba36d91445a0a27f7 100644 (file)
@@ -23,10 +23,9 @@ from time import time as _time
 from traceback import format_exc
 
 from . import connection
-from . import context
+from .context import reduction, get_spawning_popen
 from . import pool
 from . import process
-from . import reduction
 from . import util
 from . import get_context
 
@@ -833,7 +832,7 @@ class BaseProxy(object):
 
     def __reduce__(self):
         kwds = {}
-        if context.get_spawning_popen() is not None:
+        if get_spawning_popen() is not None:
             kwds['authkey'] = self._authkey
 
         if getattr(self, '_isauto', False):
index e792194f44cea6df687227c269aaa9ab714f88c5..222db2d90a31564bb94c1bb9ad1de918d4f3038f 100644 (file)
@@ -1,10 +1,9 @@
 import io
 import os
 
-from . import reduction
+from .context import reduction, set_spawning_popen
 if not reduction.HAVE_SEND_HANDLE:
     raise ImportError('No support for sending fds between processes')
-from . import context
 from . import forkserver
 from . import popen_fork
 from . import spawn
@@ -42,12 +41,12 @@ class Popen(popen_fork.Popen):
     def _launch(self, process_obj):
         prep_data = spawn.get_preparation_data(process_obj._name)
         buf = io.BytesIO()
-        context.set_spawning_popen(self)
+        set_spawning_popen(self)
         try:
             reduction.dump(prep_data, buf)
             reduction.dump(process_obj, buf)
         finally:
-            context.set_spawning_popen(None)
+            set_spawning_popen(None)
 
         self.sentinel, w = forkserver.connect_to_new_process(self._fds)
         util.Finalize(self, os.close, (self.sentinel,))
index 6b0a8d635feadb5f90f6de12b3157469e54be25b..98f8f0ab334d2ff3f04753019a208bb56f03b061 100644 (file)
@@ -1,9 +1,8 @@
 import io
 import os
 
-from . import context
+from .context import reduction, set_spawning_popen
 from . import popen_fork
-from . import reduction
 from . import spawn
 from . import util
 
@@ -42,12 +41,12 @@ class Popen(popen_fork.Popen):
         self._fds.append(tracker_fd)
         prep_data = spawn.get_preparation_data(process_obj._name)
         fp = io.BytesIO()
-        context.set_spawning_popen(self)
+        set_spawning_popen(self)
         try:
             reduction.dump(prep_data, fp)
             reduction.dump(process_obj, fp)
         finally:
-            context.set_spawning_popen(None)
+            set_spawning_popen(None)
 
         parent_r = child_w = child_r = parent_w = None
         try:
index 3b53068be4113183a064363e7829c8d62986573d..6fd588f542673ef89e3a2eeee694930e36115b80 100644 (file)
@@ -4,9 +4,8 @@ import signal
 import sys
 import _winapi
 
-from . import context
+from .context import reduction, get_spawning_popen, set_spawning_popen
 from . import spawn
-from . import reduction
 from . import util
 
 __all__ = ['Popen']
@@ -60,15 +59,15 @@ class Popen(object):
             util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
 
             # send information to child
-            context.set_spawning_popen(self)
+            set_spawning_popen(self)
             try:
                 reduction.dump(prep_data, to_child)
                 reduction.dump(process_obj, to_child)
             finally:
-                context.set_spawning_popen(None)
+                set_spawning_popen(None)
 
     def duplicate_for_child(self, handle):
-        assert self is context.get_spawning_popen()
+        assert self is get_spawning_popen()
         return reduction.duplicate(handle, self.sentinel)
 
     def wait(self, timeout=None):
index 786a303b33705a8be5c4fbb46060645319c00f60..dda03ddf5425ceebf93f9ff75ed362dc32c08a04 100644 (file)
@@ -23,9 +23,9 @@ import _multiprocessing
 
 from . import connection
 from . import context
+_ForkingPickler = context.reduction.ForkingPickler
 
 from .util import debug, info, Finalize, register_after_fork, is_exiting
-from .reduction import ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -110,7 +110,7 @@ class Queue(object):
             finally:
                 self._rlock.release()
         # unserialize the data after having released the lock
-        return ForkingPickler.loads(res)
+        return _ForkingPickler.loads(res)
 
     def qsize(self):
         # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -238,7 +238,7 @@ class Queue(object):
                             return
 
                         # serialize the data before acquiring the lock
-                        obj = ForkingPickler.dumps(obj)
+                        obj = _ForkingPickler.dumps(obj)
                         if wacquire is None:
                             send_bytes(obj)
                         else:
@@ -342,11 +342,11 @@ class SimpleQueue(object):
         with self._rlock:
             res = self._reader.recv_bytes()
         # unserialize the data after having released the lock
-        return ForkingPickler.loads(res)
+        return _ForkingPickler.loads(res)
 
     def put(self, obj):
         # serialize the data before acquiring the lock
-        obj = ForkingPickler.dumps(obj)
+        obj = _ForkingPickler.dumps(obj)
         if self._wlock is None:
             # writes to a message oriented win32 pipe are atomic
             self._writer.send_bytes(obj)
index 8f209b47dab7985aa9b8433b0feab047dfd54341..c043c9a0dc164438c058874b97bb94e585c1875a 100644 (file)
@@ -7,6 +7,7 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
+from abc import ABCMeta, abstractmethod
 import copyreg
 import functools
 import io
@@ -238,3 +239,36 @@ else:
         fd = df.detach()
         return socket.socket(family, type, proto, fileno=fd)
     register(socket.socket, _reduce_socket)
+
+
+class AbstractReducer(metaclass=ABCMeta):
+    '''Abstract base class for use in implementing a Reduction class
+    suitable for use in replacing the standard reduction mechanism
+    used in multiprocessing.'''
+    ForkingPickler = ForkingPickler
+    register = register
+    dump = dump
+    send_handle = send_handle
+    recv_handle = recv_handle
+
+    if sys.platform == 'win32':
+        steal_handle = steal_handle
+        duplicate = duplicate
+        DupHandle = DupHandle
+    else:
+        sendfds = sendfds
+        recvfds = recvfds
+        DupFd = DupFd
+
+    _reduce_method = _reduce_method
+    _reduce_method_descriptor = _reduce_method_descriptor
+    _rebuild_partial = _rebuild_partial
+    _reduce_socket = _reduce_socket
+    _rebuild_socket = _rebuild_socket
+
+    def __init__(self, *args):
+        register(type(_C().f), _reduce_method)
+        register(type(list.append), _reduce_method_descriptor)
+        register(type(int.__add__), _reduce_method_descriptor)
+        register(functools.partial, _reduce_partial)
+        register(socket.socket, _reduce_socket)
index 5e46fc65b4ee80a01f92e564c593b81f8dae720d..e44a728fa9ade7d402dee9fe873d3e59c3401a2a 100644 (file)
@@ -15,7 +15,7 @@ import sys
 import threading
 
 from . import process
-from . import reduction
+from .context import reduction
 from . import util
 
 __all__ = ['stop']
index 4258f591c4ca54147b33dc328d3c23031c1faac7..25cbcf2ae4cd6f89fb46f3187a51994dd48a8bff 100644 (file)
@@ -13,8 +13,8 @@ import weakref
 from . import heap
 from . import get_context
 
-from .context import assert_spawning
-from .reduction import ForkingPickler
+from .context import reduction, assert_spawning
+_ForkingPickler = reduction.ForkingPickler
 
 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
 
@@ -134,7 +134,7 @@ def reduce_ctype(obj):
 def rebuild_ctype(type_, wrapper, length):
     if length is not None:
         type_ = type_ * length
-    ForkingPickler.register(type_, reduce_ctype)
+    _ForkingPickler.register(type_, reduce_ctype)
     buf = wrapper.create_memoryview()
     obj = type_.from_buffer(buf)
     obj._wrapper = wrapper
index 4d769516cd936f09bc95c0aa69a4adaa103c4e25..dfb9f652701042792ead2215232b663fcf89da31 100644 (file)
@@ -9,13 +9,13 @@
 #
 
 import os
-import pickle
 import sys
 import runpy
 import types
 
 from . import get_start_method, set_start_method
 from . import process
+from .context import reduction
 from . import util
 
 __all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
@@ -96,8 +96,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
     assert is_forking(sys.argv)
     if sys.platform == 'win32':
         import msvcrt
-        from .reduction import steal_handle
-        new_handle = steal_handle(parent_pid, pipe_handle)
+        new_handle = reduction.steal_handle(parent_pid, pipe_handle)
         fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
     else:
         from . import semaphore_tracker
@@ -111,9 +110,9 @@ def _main(fd):
     with os.fdopen(fd, 'rb', closefd=True) as from_parent:
         process.current_process()._inheriting = True
         try:
-            preparation_data = pickle.load(from_parent)
+            preparation_data = reduction.pickle.load(from_parent)
             prepare(preparation_data)
-            self = pickle.load(from_parent)
+            self = reduction.pickle.load(from_parent)
         finally:
             del process.current_process()._inheriting
     return self._bootstrap()