Merged revisions 65864 via svnmerge from
authorBenjamin Peterson <benjamin@python.org>
Tue, 19 Aug 2008 19:17:39 +0000 (19:17 +0000)
committerBenjamin Peterson <benjamin@python.org>
Tue, 19 Aug 2008 19:17:39 +0000 (19:17 +0000)
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r65864 | jesse.noller | 2008-08-19 14:06:19 -0500 (Tue, 19 Aug 2008) | 2 lines

  issue3352: clean up the multiprocessing API to remove many get_/set_ methods and convert them to properties. Update the docs and the examples included.
........

15 files changed:
Doc/includes/mp_distributing.py
Doc/includes/mp_pool.py
Doc/includes/mp_synchronize.py
Doc/includes/mp_webserver.py
Doc/includes/mp_workers.py
Doc/library/multiprocessing.rst
Lib/multiprocessing/dummy/__init__.py
Lib/multiprocessing/forking.py
Lib/multiprocessing/managers.py
Lib/multiprocessing/pool.py
Lib/multiprocessing/process.py
Lib/multiprocessing/reduction.py
Lib/multiprocessing/synchronize.py
Lib/multiprocessing/util.py
Lib/test/test_multiprocessing.py

index 5cd12bb52b050df75ae6c296140ac6e5ddc83784..7acefb8d6adaae8e28798143672a2c68133fc473 100644 (file)
@@ -17,10 +17,10 @@ import shutil
 import subprocess
 import logging
 import itertools
-import Queue
+import queue
 
 try:
-    import cPickle as pickle
+    import pickle as pickle
 except ImportError:
     import pickle
 
@@ -152,7 +152,7 @@ class DistributedPool(pool.Pool):
 
 def LocalProcess(**kwds):
     p = Process(**kwds)
-    p.set_name('localhost/' + p.get_name())
+    p.set_name('localhost/' + p.name)
     return p
 
 class Cluster(managers.SyncManager):
@@ -210,7 +210,7 @@ class Cluster(managers.SyncManager):
         self._base_shutdown()
 
     def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
-        slot = self._slot_iterator.next()
+        slot = next(self._slot_iterator)
         return slot.Process(
             group=group, target=target, name=name, args=args, kwargs=kwargs
             )
@@ -231,7 +231,7 @@ class Cluster(managers.SyncManager):
 # Queue subclass used by distributed pool
 #
 
-class SettableQueue(Queue.Queue):
+class SettableQueue(queue.Queue):
     def empty(self):
         return not self.queue
     def full(self):
@@ -243,7 +243,7 @@ class SettableQueue(Queue.Queue):
         try:
             self.queue.clear()
             self.queue.extend(contents)
-            self.not_empty.notify_all()
+            self.not_empty.notifyAll()
         finally:
             self.not_empty.release()
 
index b937b86f53c0d03991471f04816b22c6802ee5bd..e7aaaacaf30667286a51596eea693da0af4768a1 100644 (file)
@@ -14,7 +14,7 @@ import sys
 def calculate(func, args):
     result = func(*args)
     return '%s says that %s%s = %s' % (
-        multiprocessing.current_process().get_name(),
+        multiprocessing.current_process().name,
         func.__name__, args, result
         )
 
index 8cf11bd4d01df0dfe8c31eb4e6acb27f9e1bb939..ddcd338273c7961a61cc0173377f0ea141758851 100644 (file)
@@ -224,7 +224,7 @@ def test_sharedvalues():
     p.start()
     p.join()
 
-    assert p.get_exitcode() == 0
+    assert p.exitcode == 0
 
 
 ####
index 15d2b6b6e2cc284fadca0921de915440d90c6ce9..4943f5d4775259e93d0b58993d3528c1febbcbd1 100644 (file)
@@ -21,7 +21,7 @@ if sys.platform == 'win32':
 
 
 def note(format, *args):
-    sys.stderr.write('[%s]\t%s\n' % (current_process().get_name(),format%args))
+    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
 
 
 class RequestHandler(SimpleHTTPRequestHandler):
index 795e6cb9bf8595ca300e51f2416831d736eaa9ce..07e4cdd2e0cd83048e2009f7e20b0d00115c7142 100644 (file)
@@ -29,7 +29,7 @@ def worker(input, output):
 def calculate(func, args):
     result = func(*args)
     return '%s says that %s%s = %s' % \
-        (current_process().get_name(), func.__name__, args, result)
+        (current_process().name, func.__name__, args, result)
 
 #
 # Functions referenced by tasks
index 10ccb17f8d0aae9db4c5a575fb8978d4aadd7dfe..4bbd94c385a2948256e9b9d4bef8031c08c19437 100644 (file)
@@ -290,11 +290,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the
       A process cannot join itself because this would cause a deadlock.  It is
       an error to attempt to join a process before it has been started.
 
-   .. method:: get_name()
+   .. attribute:: Process.name
 
       Return the process's name.
 
-   .. method:: set_name(name)
+   .. attribute:: Process.name = name
 
       Set the process's name.
 
@@ -309,11 +309,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the
       Roughly, a process object is alive from the moment the :meth:`start`
       method returns until the child process terminates.
 
-   .. method:: is_daemon()
+   .. attribute:: Process.daemon
 
-      Return the process's daemon flag.
+      Return the process's daemon flag., this is a boolean.
 
-   .. method:: set_daemon(daemonic)
+   .. attribute:: Process.daemon = daemonic
 
       Set the process's daemon flag to the Boolean value *daemonic*.  This must
       be called before :meth:`start` is called.
@@ -329,18 +329,18 @@ The :mod:`multiprocessing` package mostly replicates the API of the
 
    In addition process objects also support the following methods:
 
-   .. method:: get_pid()
+   .. attribute:: Process.pid
 
       Return the process ID.  Before the process is spawned, this will be
       ``None``.
 
-   .. method:: get_exit_code()
+   .. attribute:: Process.exitcode
 
       Return the child's exit code.  This will be ``None`` if the process has
       not yet terminated.  A negative value *-N* indicates that the child was
       terminated by signal *N*.
 
-   .. method:: get_auth_key()
+   .. attribute:: Process.authkey
 
       Return the process's authentication key (a byte string).
 
@@ -349,11 +349,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the
 
       When a :class:`Process` object is created, it will inherit the
       authentication key of its parent process, although this may be changed
-      using :meth:`set_auth_key` below.
+      using :attr:`Process.authkey` below.
 
       See :ref:`multiprocessing-auth-keys`.
 
-   .. method:: set_auth_key(authkey)
+   .. attribute:: Process.authkey = authkey
 
       Set the process's authentication key which must be a byte string.
 
index 48ca75b6cbf11fac13659388882791ec8fa46c46..da18877845572d9a367a9dbe9ffcd5b1ad6d293c 100644 (file)
@@ -47,7 +47,8 @@ class DummyProcess(threading.Thread):
         self._parent._children[self] = None
         threading.Thread.start(self)
 
-    def get_exitcode(self):
+    @property
+    def exitcode(self):
         if self._start_called and not self.is_alive():
             return 0
         else:
index b14143ba1a799eb8fa9666e84c33fd96c8510972..47d54f276a95a9583571c659c2abf11b53f5228b 100644 (file)
@@ -315,7 +315,7 @@ else:
             sys_argv=sys.argv,
             log_to_stderr=_log_to_stderr,
             orig_dir=process.ORIGINAL_DIR,
-            authkey=process.current_process().get_authkey(),
+            authkey=process.current_process().authkey,
             )
 
         if _logger is not None:
@@ -363,7 +363,7 @@ def prepare(data):
     old_main_modules.append(sys.modules['__main__'])
 
     if 'name' in data:
-        process.current_process().set_name(data['name'])
+        process.current_process().name = data['name']
 
     if 'authkey' in data:
         process.current_process()._authkey = data['authkey']
index d7558c7de057eb5aaf0b17aa882447b833014730..d6b16e525e78d7981536ebe2d3af9d2bae03195a 100644 (file)
@@ -450,7 +450,7 @@ class BaseManager(object):
 
     def __init__(self, address=None, authkey=None, serializer='pickle'):
         if authkey is None:
-            authkey = current_process().get_authkey()
+            authkey = current_process().authkey
         self._address = address     # XXX not final address if eg ('', 0)
         self._authkey = AuthenticationString(authkey)
         self._state = State()
@@ -495,7 +495,7 @@ class BaseManager(object):
                   self._serializer, writer),
             )
         ident = ':'.join(str(i) for i in self._process._identity)
-        self._process.set_name(type(self).__name__  + '-' + ident)
+        self._process.name = type(self).__name__  + '-' + ident
         self._process.start()
 
         # get address of server
@@ -696,7 +696,7 @@ class BaseProxy(object):
         elif self._manager is not None:
             self._authkey = self._manager._authkey
         else:
-            self._authkey = current_process().get_authkey()
+            self._authkey = current_process().authkey
 
         if incref:
             self._incref()
@@ -705,7 +705,7 @@ class BaseProxy(object):
 
     def _connect(self):
         util.debug('making connection to manager')
-        name = current_process().get_name()
+        name = current_process().name
         if threading.current_thread().name != 'MainThread':
             name += '|' + threading.current_thread().name
         conn = self._Client(self._token.address, authkey=self._authkey)
@@ -886,7 +886,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None,
     if authkey is None and manager is not None:
         authkey = manager._authkey
     if authkey is None:
-        authkey = current_process().get_authkey()
+        authkey = current_process().authkey
 
     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
index cb0e49fc49285184818e6fc25c89930f80b38809..90fd178e63b1a4d9465ecc5978b2f3f314c630f6 100644 (file)
@@ -99,7 +99,7 @@ class Pool(object):
                 args=(self._inqueue, self._outqueue, initializer, initargs)
                 )
             self._pool.append(w)
-            w.name = w.get_name().replace('Process', 'PoolWorker')
+            w.name = w.name.replace('Process', 'PoolWorker')
             w.daemon = True
             w.start()
 
index 1f89dbab512c464f83f84a5daa0c85ab371fc901..e21d2f077b988e5f0500bd7674e8707a6aa1efeb 100644 (file)
@@ -132,45 +132,43 @@ class Process(object):
         self._popen.poll()
         return self._popen.returncode is None
 
-    def get_name(self):
-        '''
-        Return name of process
-        '''
+    @property
+    def name(self):
         return self._name
 
-    def set_name(self, name):
-        '''
-        Set name of process
-        '''
+    @name.setter
+    def name(self, name):
         assert isinstance(name, str), 'name must be a string'
         self._name = name
 
-    def is_daemon(self):
+    @property
+    def daemon(self):
         '''
         Return whether process is a daemon
         '''
         return self._daemonic
 
-    def set_daemon(self, daemonic):
+    @daemon.setter
+    def daemon(self, daemonic):
         '''
         Set whether process is a daemon
         '''
         assert self._popen is None, 'process has already started'
         self._daemonic = daemonic
 
-    def get_authkey(self):
-        '''
-        Return authorization key of process
-        '''
+    @property
+    def authkey(self):
         return self._authkey
 
-    def set_authkey(self, authkey):
+    @authkey.setter
+    def authkey(self, authkey):
         '''
         Set authorization key of process
         '''
         self._authkey = AuthenticationString(authkey)
 
-    def get_exitcode(self):
+    @property
+    def exitcode(self):
         '''
         Return exit code of process or `None` if it has yet to stop
         '''
@@ -178,7 +176,8 @@ class Process(object):
             return self._popen
         return self._popen.poll()
 
-    def get_ident(self):
+    @property
+    def ident(self):
         '''
         Return indentifier (PID) of process or `None` if it has yet to start
         '''
@@ -187,7 +186,7 @@ class Process(object):
         else:
             return self._popen and self._popen.pid
 
-    pid = property(get_ident)
+    pid = ident
 
     def __repr__(self):
         if self is _current_process:
@@ -198,7 +197,7 @@ class Process(object):
             status = 'initial'
         else:
             if self._popen.poll() is not None:
-                status = self.get_exitcode()
+                status = self.exitcode
             else:
                 status = 'started'
 
@@ -245,7 +244,7 @@ class Process(object):
         except:
             exitcode = 1
             import traceback
-            sys.stderr.write('Process %s:\n' % self.get_name())
+            sys.stderr.write('Process %s:\n' % self.name)
             sys.stderr.flush()
             traceback.print_exc()
 
index 194bb1700b7ef6cec3b15ce1a5b67788a1bcd945..010d871638541f03ca06147a2080dc9968fe72fb 100644 (file)
@@ -82,9 +82,9 @@ def _get_listener():
         try:
             if _listener is None:
                 debug('starting listener and thread for sending handles')
-                _listener = Listener(authkey=current_process().get_authkey())
+                _listener = Listener(authkey=current_process().authkey)
                 t = threading.Thread(target=_serve)
-                t.set_daemon(True)
+                t.daemon = True
                 t.start()
         finally:
             _lock.release()
@@ -127,7 +127,7 @@ def rebuild_handle(pickled_data):
     if inherited:
         return handle
     sub_debug('rebuilding handle %d', handle)
-    conn = Client(address, authkey=current_process().get_authkey())
+    conn = Client(address, authkey=current_process().authkey)
     conn.send((handle, os.getpid()))
     new_handle = recv_handle(conn)
     conn.close()
index 628792eb659c80101dac03e59228e8be8c3c12d0..be56a5b67143c48def9df3e63189396c7aadb53b 100644 (file)
@@ -108,9 +108,9 @@ class Lock(SemLock):
     def __repr__(self):
         try:
             if self._semlock._is_mine():
-                name = current_process().get_name()
-                if threading.current_thread().get_name() != 'MainThread':
-                    name += '|' + threading.current_thread().get_name()
+                name = current_process().name
+                if threading.current_thread().name != 'MainThread':
+                    name += '|' + threading.current_thread().name
             elif self._semlock._get_value() == 1:
                 name = 'None'
             elif self._semlock._count() > 0:
@@ -133,9 +133,9 @@ class RLock(SemLock):
     def __repr__(self):
         try:
             if self._semlock._is_mine():
-                name = current_process().get_name()
-                if threading.current_thread().get_name() != 'MainThread':
-                    name += '|' + threading.current_thread().get_name()
+                name = current_process().name
+                if threading.current_thread().name != 'MainThread':
+                    name += '|' + threading.current_thread().name
                 count = self._semlock._count()
             elif self._semlock._get_value() == 1:
                 name, count = 'None', 0
index 8aff4f45f9fce528efd593d41e585fa6aea625cc..aae38c7d52b3287d848b4766a8b4039653cc38c4 100644 (file)
@@ -274,11 +274,11 @@ def _exit_function():
 
     for p in active_children():
         if p._daemonic:
-            info('calling terminate() for daemon %s', p.get_name())
+            info('calling terminate() for daemon %s', p.name)
             p._popen.terminate()
 
     for p in active_children():
-        info('calling join() for process %s', p.get_name())
+        info('calling join() for process %s', p.name)
         p.join()
 
     debug('running the remaining "atexit" finalizers')
index 436cad807ed5eceeec7b7c62be62c5fb02cdd765..102042d9b60d85cf3d1153fee5429306c6abadf0 100644 (file)
@@ -120,22 +120,22 @@ class _TestProcess(BaseTestCase):
             return
 
         current = self.current_process()
-        authkey = current.get_authkey()
+        authkey = current.authkey
 
         self.assertTrue(current.is_alive())
-        self.assertTrue(not current.is_daemon())
+        self.assertTrue(not current.daemon)
         self.assertTrue(isinstance(authkey, bytes))
         self.assertTrue(len(authkey) > 0)
-        self.assertEqual(current.get_ident(), os.getpid())
-        self.assertEqual(current.get_exitcode(), None)
+        self.assertEqual(current.ident, os.getpid())
+        self.assertEqual(current.exitcode, None)
 
     def _test(self, q, *args, **kwds):
         current = self.current_process()
         q.put(args)
         q.put(kwds)
-        q.put(current.get_name())
+        q.put(current.name)
         if self.TYPE != 'threads':
-            q.put(bytes(current.get_authkey()))
+            q.put(bytes(current.authkey))
             q.put(current.pid)
 
     def test_process(self):
@@ -147,33 +147,33 @@ class _TestProcess(BaseTestCase):
         p = self.Process(
             target=self._test, args=args, kwargs=kwargs, name=name
             )
-        p.set_daemon(True)
+        p.daemon = True
         current = self.current_process()
 
         if self.TYPE != 'threads':
-            self.assertEquals(p.get_authkey(), current.get_authkey())
+            self.assertEquals(p.authkey, current.authkey)
         self.assertEquals(p.is_alive(), False)
-        self.assertEquals(p.is_daemon(), True)
+        self.assertEquals(p.daemon, True)
         self.assertTrue(p not in self.active_children())
         self.assertTrue(type(self.active_children()) is list)
-        self.assertEqual(p.get_exitcode(), None)
+        self.assertEqual(p.exitcode, None)
 
         p.start()
 
-        self.assertEquals(p.get_exitcode(), None)
+        self.assertEquals(p.exitcode, None)
         self.assertEquals(p.is_alive(), True)
         self.assertTrue(p in self.active_children())
 
         self.assertEquals(q.get(), args[1:])
         self.assertEquals(q.get(), kwargs)
-        self.assertEquals(q.get(), p.get_name())
+        self.assertEquals(q.get(), p.name)
         if self.TYPE != 'threads':
-            self.assertEquals(q.get(), current.get_authkey())
+            self.assertEquals(q.get(), current.authkey)
             self.assertEquals(q.get(), p.pid)
 
         p.join()
 
-        self.assertEquals(p.get_exitcode(), 0)
+        self.assertEquals(p.exitcode, 0)
         self.assertEquals(p.is_alive(), False)
         self.assertTrue(p not in self.active_children())
 
@@ -185,12 +185,12 @@ class _TestProcess(BaseTestCase):
             return
 
         p = self.Process(target=self._test_terminate)
-        p.set_daemon(True)
+        p.daemon = True
         p.start()
 
         self.assertEqual(p.is_alive(), True)
         self.assertTrue(p in self.active_children())
-        self.assertEqual(p.get_exitcode(), None)
+        self.assertEqual(p.exitcode, None)
 
         p.terminate()
 
@@ -203,8 +203,8 @@ class _TestProcess(BaseTestCase):
 
         p.join()
 
-        # XXX sometimes get p.get_exitcode() == 0 on Windows ...
-        #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
+        # XXX sometimes get p.exitcode == 0 on Windows ...
+        #self.assertEqual(p.exitcode, -signal.SIGTERM)
 
     def test_cpu_count(self):
         try:
@@ -331,7 +331,7 @@ class _TestQueue(BaseTestCase):
             target=self._test_put,
             args=(queue, child_can_start, parent_can_continue)
             )
-        proc.set_daemon(True)
+        proc.daemon = True
         proc.start()
 
         self.assertEqual(queue_empty(queue), True)
@@ -397,7 +397,7 @@ class _TestQueue(BaseTestCase):
             target=self._test_get,
             args=(queue, child_can_start, parent_can_continue)
             )
-        proc.set_daemon(True)
+        proc.daemon = True
         proc.start()
 
         self.assertEqual(queue_empty(queue), True)
@@ -620,17 +620,11 @@ class _TestCondition(BaseTestCase):
         woken = self.Semaphore(0)
 
         p = self.Process(target=self.f, args=(cond, sleeping, woken))
-        try:
-            p.set_daemon(True)
-        except AttributeError:
-            p.daemon = True
+        p.daemon = True
         p.start()
 
         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
-        try:
-            p.set_daemon(True)
-        except AttributeError:
-            p.daemon = True
+        p.daemon = True
         p.start()
 
         # wait for both children to start sleeping
@@ -672,7 +666,7 @@ class _TestCondition(BaseTestCase):
         for i in range(3):
             p = self.Process(target=self.f,
                              args=(cond, sleeping, woken, TIMEOUT1))
-            p.set_daemon(True)
+            p.daemon = True
             p.start()
 
             t = threading.Thread(target=self.f,
@@ -695,7 +689,7 @@ class _TestCondition(BaseTestCase):
         # start some more threads/processes
         for i in range(3):
             p = self.Process(target=self.f, args=(cond, sleeping, woken))
-            p.set_daemon(True)
+            p.daemon = True
             p.start()
 
             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
@@ -1192,7 +1186,7 @@ class _TestConnection(BaseTestCase):
         conn, child_conn = self.Pipe()
 
         p = self.Process(target=self._echo, args=(child_conn,))
-        p.set_daemon(True)
+        p.daemon = True
         p.start()
 
         seq = [1, 2.25, None]
@@ -1341,7 +1335,7 @@ class _TestListenerClient(BaseTestCase):
         for family in self.connection.families:
             l = self.connection.Listener(family=family)
             p = self.Process(target=self._test, args=(l.address,))
-            p.set_daemon(True)
+            p.daemon = True
             p.start()
             conn = l.accept()
             self.assertEqual(conn.recv(), 'hello')