]> granicus.if.org Git - python/commitdiff
bpo-5001: More-informative multiprocessing error messages (#3079)
authorAllen W. Smith, Ph.D <drallensmith@users.noreply.github.com>
Tue, 29 Aug 2017 22:52:18 +0000 (17:52 -0500)
committerAntoine Pitrou <pitrou@free.fr>
Tue, 29 Aug 2017 22:52:18 +0000 (00:52 +0200)
* Make error message more informative

Replace assertions in error-reporting code with more-informative version that doesn't cause confusion over where and what the error is.

* Additional clarification + get travis to check

* Change from SystemError to TypeError

As suggested in PR comment by @pitrou, changing from SystemError; TypeError appears appropriate.

* NEWS file installation; ACKS addition (will do my best to justify it by additional work)

* Making current AssertionErrors in multiprocessing more informative

* Blurb added re multiprocessing managers.py, queues.py cleanup

* Further multiprocessing cleanup - went through pool.py

* Fix two asserts in multiprocessing/util.py

* Most asserts in multiprocessing more informative

* Didn't save right version

* Further work on multiprocessing error messages

* Correct typo

* Correct typo v2

* Blasted colon... serves me right for trying to work on two things at once

* Simplify NEWS entry

* Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst

* Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst

OK, never mind.

* Corrected (thanks to pitrou) error messages for notify

* Remove extraneous backslash in docstring.

15 files changed:
Lib/multiprocessing/connection.py
Lib/multiprocessing/dummy/__init__.py
Lib/multiprocessing/forkserver.py
Lib/multiprocessing/heap.py
Lib/multiprocessing/managers.py
Lib/multiprocessing/pool.py
Lib/multiprocessing/popen_fork.py
Lib/multiprocessing/queues.py
Lib/multiprocessing/reduction.py
Lib/multiprocessing/resource_sharer.py
Lib/multiprocessing/semaphore_tracker.py
Lib/multiprocessing/spawn.py
Lib/multiprocessing/synchronize.py
Lib/multiprocessing/util.py
Misc/NEWS.d/next/Library/2017-08-18-17-16-38.bpo-5001.gwnthq.rst [new file with mode: 0644]

index ba9b17cee1b1b5a37c3ebe513f3be571eef87207..7a621a55f468b69134cf7390891ff7b8aa67c4d2 100644 (file)
@@ -720,7 +720,9 @@ FAILURE = b'#FAILURE#'
 
 def deliver_challenge(connection, authkey):
     import hmac
-    assert isinstance(authkey, bytes)
+    if not isinstance(authkey, bytes):
+        raise ValueError(
+            "Authkey must be bytes, not {0!s}".format(type(authkey)))
     message = os.urandom(MESSAGE_LENGTH)
     connection.send_bytes(CHALLENGE + message)
     digest = hmac.new(authkey, message, 'md5').digest()
@@ -733,7 +735,9 @@ def deliver_challenge(connection, authkey):
 
 def answer_challenge(connection, authkey):
     import hmac
-    assert isinstance(authkey, bytes)
+    if not isinstance(authkey, bytes):
+        raise ValueError(
+            "Authkey must be bytes, not {0!s}".format(type(authkey)))
     message = connection.recv_bytes(256)         # reject large message
     assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
     message = message[len(CHALLENGE):]
index cbb7f4909cb55cfecc19b7e8bcdc79a0dbb07b99..403f5e5198e4ed08d3cacc9b2508c5163420c5db 100644 (file)
@@ -41,7 +41,10 @@ class DummyProcess(threading.Thread):
         self._parent = current_process()
 
     def start(self):
-        assert self._parent is current_process()
+        if self._parent is not current_process():
+            raise RuntimeError(
+                "Parent is {0!r} but current_process is {1!r}".format(
+                    self._parent, current_process()))
         self._start_called = True
         if hasattr(self._parent, '_children'):
             self._parent._children[self] = None
index 69b842aa939a3c34c3fe067977af1360730587f0..7a952e2ba697817ed2bf64ef3067a84a930c61a4 100644 (file)
@@ -189,7 +189,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
 
                 if alive_r in rfds:
                     # EOF because no more client processes left
-                    assert os.read(alive_r, 1) == b''
+                    assert os.read(alive_r, 1) == b'', "Not at EOF?"
                     raise SystemExit
 
                 if sig_r in rfds:
@@ -208,7 +208,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
                             if os.WIFSIGNALED(sts):
                                 returncode = -os.WTERMSIG(sts)
                             else:
-                                assert os.WIFEXITED(sts)
+                                if not os.WIFEXITED(sts):
+                                    raise AssertionError(
+                                        "Child {0:n} status is {1:n}".format(
+                                            pid,sts))
                                 returncode = os.WEXITSTATUS(sts)
                             # Send exit code to client process
                             try:
@@ -227,7 +230,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
                     with listener.accept()[0] as s:
                         # Receive fds from client
                         fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
-                        assert len(fds) <= MAXFDS_TO_SEND
+                        if len(fds) > MAXFDS_TO_SEND:
+                            raise RuntimeError(
+                                "Too many ({0:n}) fds to send".format(
+                                    len(fds)))
                         child_r, child_w, *fds = fds
                         s.close()
                         pid = os.fork()
index ee3ed551d0c122e6ec0b1bcd6fb075d4d83a25db..566173a1b0ae8253efee5940b6ea6fec7d65ab9f 100644 (file)
@@ -211,7 +211,10 @@ class Heap(object):
         # synchronously sometimes later from malloc() or free(), by calling
         # _free_pending_blocks() (appending and retrieving from a list is not
         # strictly thread-safe but under cPython it's atomic thanks to the GIL).
-        assert os.getpid() == self._lastpid
+        if os.getpid() != self._lastpid:
+            raise ValueError(
+                "My pid ({0:n}) is not last pid {1:n}".format(
+                    os.getpid(),self._lastpid))
         if not self._lock.acquire(False):
             # can't acquire the lock right now, add the block to the list of
             # pending blocks to free
@@ -227,7 +230,10 @@ class Heap(object):
 
     def malloc(self, size):
         # return a block of right size (possibly rounded up)
-        assert 0 <= size < sys.maxsize
+        if size < 0:
+            raise ValueError("Size {0:n} out of range".format(size))
+        if sys.maxsize <= size:
+            raise OverflowError("Size {0:n} too large".format(size))
         if os.getpid() != self._lastpid:
             self.__init__()                     # reinitialize after fork
         with self._lock:
@@ -250,7 +256,10 @@ class BufferWrapper(object):
     _heap = Heap()
 
     def __init__(self, size):
-        assert 0 <= size < sys.maxsize
+        if size < 0:
+            raise ValueError("Size {0:n} out of range".format(size))
+        if sys.maxsize <= size:
+            raise OverflowError("Size {0:n} too large".format(size))
         block = BufferWrapper._heap.malloc(size)
         self._state = (block, size)
         util.Finalize(self, BufferWrapper._heap.free, args=(block,))
index c6722771b05c94f97469ed3db14250f99142fd45..04df26bac661871d7603b8046c842726991376a3 100644 (file)
@@ -23,7 +23,7 @@ from time import time as _time
 from traceback import format_exc
 
 from . import connection
-from .context import reduction, get_spawning_popen
+from .context import reduction, get_spawning_popen, ProcessError
 from . import pool
 from . import process
 from . import util
@@ -133,7 +133,10 @@ class Server(object):
               'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
 
     def __init__(self, registry, address, authkey, serializer):
-        assert isinstance(authkey, bytes)
+        if not isinstance(authkey, bytes):
+            raise TypeError(
+                "Authkey {0!r} is type {1!s}, not bytes".format(
+                    authkey, type(authkey)))
         self.registry = registry
         self.authkey = process.AuthenticationString(authkey)
         Listener, Client = listener_client[serializer]
@@ -163,7 +166,7 @@ class Server(object):
             except (KeyboardInterrupt, SystemExit):
                 pass
         finally:
-            if sys.stdout != sys.__stdout__:
+            if sys.stdout != sys.__stdout__: # what about stderr?
                 util.debug('resetting stdout, stderr')
                 sys.stdout = sys.__stdout__
                 sys.stderr = sys.__stderr__
@@ -316,6 +319,7 @@ class Server(object):
         '''
         Return some info --- useful to spot problems with refcounting
         '''
+        # Perhaps include debug info about 'c'?
         with self.mutex:
             result = []
             keys = list(self.id_to_refcount.keys())
@@ -356,7 +360,9 @@ class Server(object):
                       self.registry[typeid]
 
             if callable is None:
-                assert len(args) == 1 and not kwds
+                if kwds or (len(args) != 1):
+                    raise ValueError(
+                        "Without callable, must have one non-keyword argument")
                 obj = args[0]
             else:
                 obj = callable(*args, **kwds)
@@ -364,7 +370,10 @@ class Server(object):
             if exposed is None:
                 exposed = public_methods(obj)
             if method_to_typeid is not None:
-                assert type(method_to_typeid) is dict
+                if not isinstance(method_to_typeid, dict):
+                    raise TypeError(
+                        "Method_to_typeid {0!r}: type {1!s}, not dict".format(
+                            method_to_typeid, type(method_to_typeid)))
                 exposed = list(exposed) + list(method_to_typeid)
 
             ident = '%x' % id(obj)  # convert to string because xmlrpclib
@@ -417,7 +426,11 @@ class Server(object):
             return
 
         with self.mutex:
-            assert self.id_to_refcount[ident] >= 1
+            if self.id_to_refcount[ident] <= 0:
+                raise AssertionError(
+                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
+                        ident, self.id_to_obj[ident],
+                        self.id_to_refcount[ident]))
             self.id_to_refcount[ident] -= 1
             if self.id_to_refcount[ident] == 0:
                 del self.id_to_refcount[ident]
@@ -480,7 +493,14 @@ class BaseManager(object):
         '''
         Return server object with serve_forever() method and address attribute
         '''
-        assert self._state.value == State.INITIAL
+        if self._state.value != State.INITIAL:
+            if self._state.value == State.STARTED:
+                raise ProcessError("Already started server")
+            elif self._state.value == State.SHUTDOWN:
+                raise ProcessError("Manager has shut down")
+            else:
+                raise ProcessError(
+                    "Unknown state {!r}".format(self._state.value))
         return Server(self._registry, self._address,
                       self._authkey, self._serializer)
 
@@ -497,7 +517,14 @@ class BaseManager(object):
         '''
         Spawn a server process for this manager object
         '''
-        assert self._state.value == State.INITIAL
+        if self._state.value != State.INITIAL:
+            if self._state.value == State.STARTED:
+                raise ProcessError("Already started server")
+            elif self._state.value == State.SHUTDOWN:
+                raise ProcessError("Manager has shut down")
+            else:
+                raise ProcessError(
+                    "Unknown state {!r}".format(self._state.value))
 
         if initializer is not None and not callable(initializer):
             raise TypeError('initializer must be a callable')
@@ -593,7 +620,14 @@ class BaseManager(object):
     def __enter__(self):
         if self._state.value == State.INITIAL:
             self.start()
-        assert self._state.value == State.STARTED
+        if self._state.value != State.STARTED:
+            if self._state.value == State.INITIAL:
+                raise ProcessError("Unable to start server")
+            elif self._state.value == State.SHUTDOWN:
+                raise ProcessError("Manager has shut down")
+            else:
+                raise ProcessError(
+                    "Unknown state {!r}".format(self._state.value))
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
@@ -653,7 +687,7 @@ class BaseManager(object):
                            getattr(proxytype, '_method_to_typeid_', None)
 
         if method_to_typeid:
-            for key, value in list(method_to_typeid.items()):
+            for key, value in list(method_to_typeid.items()): # isinstance?
                 assert type(key) is str, '%r is not a string' % key
                 assert type(value) is str, '%r is not a string' % value
 
index c2364ab186c328205d957b7c2dafd40f1dc751e4..e457f0a576f5f9046e0b72cd7b5d819f71322802 100644 (file)
@@ -92,7 +92,9 @@ class MaybeEncodingError(Exception):
 
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
            wrap_exception=False):
-    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
+    if (maxtasks is not None) and not (isinstance(maxtasks, int)
+                                       and maxtasks >= 1):
+        raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
     put = outqueue.put
     get = inqueue.get
     if hasattr(inqueue, '_writer'):
@@ -254,8 +256,8 @@ class Pool(object):
     def apply(self, func, args=(), kwds={}):
         '''
         Equivalent of `func(*args, **kwds)`.
+        Pool must be running.
         '''
-        assert self._state == RUN
         return self.apply_async(func, args, kwds).get()
 
     def map(self, func, iterable, chunksize=None):
@@ -307,6 +309,10 @@ class Pool(object):
                 ))
             return result
         else:
+            if chunksize < 1:
+                raise ValueError(
+                    "Chunksize must be 1+, not {0:n}".format(
+                        chunksize))
             assert chunksize > 1
             task_batches = Pool._get_tasks(func, iterable, chunksize)
             result = IMapIterator(self._cache)
@@ -334,7 +340,9 @@ class Pool(object):
                 ))
             return result
         else:
-            assert chunksize > 1
+            if chunksize < 1:
+                raise ValueError(
+                    "Chunksize must be 1+, not {0!r}".format(chunksize))
             task_batches = Pool._get_tasks(func, iterable, chunksize)
             result = IMapUnorderedIterator(self._cache)
             self._taskqueue.put(
@@ -466,7 +474,7 @@ class Pool(object):
                 return
 
             if thread._state:
-                assert thread._state == TERMINATE
+                assert thread._state == TERMINATE, "Thread not in TERMINATE"
                 util.debug('result handler found thread._state=TERMINATE')
                 break
 
@@ -542,7 +550,10 @@ class Pool(object):
 
     def join(self):
         util.debug('joining pool')
-        assert self._state in (CLOSE, TERMINATE)
+        if self._state == RUN:
+            raise ValueError("Pool is still running")
+        elif self._state not in (CLOSE, TERMINATE):
+            raise ValueError("In unknown state")
         self._worker_handler.join()
         self._task_handler.join()
         self._result_handler.join()
@@ -570,7 +581,9 @@ class Pool(object):
         util.debug('helping task handler/workers to finish')
         cls._help_stuff_finish(inqueue, task_handler, len(pool))
 
-        assert result_handler.is_alive() or len(cache) == 0
+        if (not result_handler.is_alive()) and (len(cache) != 0):
+            raise AssertionError(
+                "Cannot have cache with result_hander not alive")
 
         result_handler._state = TERMINATE
         outqueue.put(None)                  # sentinel
@@ -628,7 +641,8 @@ class ApplyResult(object):
         return self._event.is_set()
 
     def successful(self):
-        assert self.ready()
+        if not self.ready():
+            raise ValueError("{0!r} not ready".format(self))
         return self._success
 
     def wait(self, timeout=None):
index 44ce9a9e79b0500599082e38862bccdb426407d3..cbdbfa79cf3c0dbdd359d0859d119c0be76f0ab8 100644 (file)
@@ -35,7 +35,7 @@ class Popen(object):
                 if os.WIFSIGNALED(sts):
                     self.returncode = -os.WTERMSIG(sts)
                 else:
-                    assert os.WIFEXITED(sts)
+                    assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
                     self.returncode = os.WEXITSTATUS(sts)
         return self.returncode
 
index 513807cafecb6dd2e19ec347ef210e9a2c109367..328efbd95fe63d515497f4195e5be15ccb44c12d 100644 (file)
@@ -78,7 +78,7 @@ class Queue(object):
         self._poll = self._reader.poll
 
     def put(self, obj, block=True, timeout=None):
-        assert not self._closed
+        assert not self._closed, "Queue {0!r} has been closed".format(self)
         if not self._sem.acquire(block, timeout):
             raise Full
 
@@ -140,7 +140,7 @@ class Queue(object):
 
     def join_thread(self):
         debug('Queue.join_thread()')
-        assert self._closed
+        assert self._closed, "Queue {0!r} not closed".format(self)
         if self._jointhread:
             self._jointhread()
 
@@ -281,7 +281,7 @@ class JoinableQueue(Queue):
         self._cond, self._unfinished_tasks = state[-2:]
 
     def put(self, obj, block=True, timeout=None):
-        assert not self._closed
+        assert not self._closed, "Queue {0!r} is closed".format(self)
         if not self._sem.acquire(block, timeout):
             raise Full
 
index 7f65947379ffa4332aa798b4fbb0995c0a015190..deca19ccad7b7f236304db5237ab90d032ae0ffc 100644 (file)
@@ -165,7 +165,10 @@ else:
                 if len(cmsg_data) % a.itemsize != 0:
                     raise ValueError
                 a.frombytes(cmsg_data)
-                assert len(a) % 256 == msg[0]
+                if len(a) % 256 != msg[0]:
+                    raise AssertionError(
+                        "Len is {0:n} but msg[0] is {1!r}".format(
+                            len(a), msg[0]))
                 return list(a)
         except (ValueError, IndexError):
             pass
index e44a728fa9ade7d402dee9fe873d3e59c3401a2a..6d99da102ffc617ec4347e220b7b65f8ef37e00d 100644 (file)
@@ -125,7 +125,7 @@ class _ResourceSharer(object):
 
     def _start(self):
         from .connection import Listener
-        assert self._listener is None
+        assert self._listener is None, "Already have Listener"
         util.debug('starting listener and thread for sending handles')
         self._listener = Listener(authkey=process.current_process().authkey)
         self._address = self._listener.address
index de7738eeee8ab264ab059c99d749ee62372c3c39..d5f259c246b2a0cd5784c130685ae3f326053fab 100644 (file)
@@ -80,7 +80,8 @@ class SemaphoreTracker(object):
             # bytes are atomic, and that PIPE_BUF >= 512
             raise ValueError('name too long')
         nbytes = os.write(self._fd, msg)
-        assert nbytes == len(msg)
+        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
+            nbytes, len(msg))
 
 
 _semaphore_tracker = SemaphoreTracker()
index 4aba372e48eff730c28f4015afd92e9f75626440..1f4f3f496f51a26bee2382b4d3333a6e364f8b7a 100644 (file)
@@ -93,7 +93,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
     '''
     Run code specified by data received over pipe
     '''
-    assert is_forking(sys.argv)
+    assert is_forking(sys.argv), "Not forking"
     if sys.platform == 'win32':
         import msvcrt
         new_handle = reduction.steal_handle(parent_pid, pipe_handle)
index 0590ed68f53ab20be13ad3328b510ba207884aa3..038f73f6b76551784e9e38486aaa42f99aff69fd 100644 (file)
@@ -270,13 +270,16 @@ class Condition(object):
 
     def notify(self, n=1):
         assert self._lock._semlock._is_mine(), 'lock is not owned'
-        assert not self._wait_semaphore.acquire(False)
+        assert not self._wait_semaphore.acquire(
+            False), ('notify: Should not have been able to acquire'
+                     + '_wait_semaphore')
 
         # to take account of timeouts since last notify*() we subtract
         # woken_count from sleeping_count and rezero woken_count
         while self._woken_count.acquire(False):
             res = self._sleeping_count.acquire(False)
-            assert res
+            assert res, ('notify: Bug in sleeping_count.acquire'
+                         + '- res should not be False')
 
         sleepers = 0
         while sleepers < n and self._sleeping_count.acquire(False):
index b490caa7e64333955ba1bb26058f5d2bb6697a0b..f0827f0a9693ae7c238765dbaad67c6cfaab4e2d 100644 (file)
@@ -149,12 +149,15 @@ class Finalize(object):
     Class which supports object finalization using weakrefs
     '''
     def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
-        assert exitpriority is None or type(exitpriority) is int
+        if (exitpriority is not None) and not isinstance(exitpriority,int):
+            raise TypeError(
+                "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
+                    exitpriority, type(exitpriority)))
 
         if obj is not None:
             self._weakref = weakref.ref(obj, self)
-        else:
-            assert exitpriority is not None
+        elif exitpriority is None:
+            raise ValueError("Without object, exitpriority cannot be None")
 
         self._callback = callback
         self._args = args
diff --git a/Misc/NEWS.d/next/Library/2017-08-18-17-16-38.bpo-5001.gwnthq.rst b/Misc/NEWS.d/next/Library/2017-08-18-17-16-38.bpo-5001.gwnthq.rst
new file mode 100644 (file)
index 0000000..640b801
--- /dev/null
@@ -0,0 +1 @@
+Many asserts in `multiprocessing` are now more informative, and some error types have been changed to more specific ones.