def deliver_challenge(connection, authkey):
import hmac
- assert isinstance(authkey, bytes)
+ if not isinstance(authkey, bytes):
+ raise ValueError(
+ "Authkey must be bytes, not {0!s}".format(type(authkey)))
message = os.urandom(MESSAGE_LENGTH)
connection.send_bytes(CHALLENGE + message)
digest = hmac.new(authkey, message, 'md5').digest()
def answer_challenge(connection, authkey):
import hmac
- assert isinstance(authkey, bytes)
+ if not isinstance(authkey, bytes):
+ raise ValueError(
+ "Authkey must be bytes, not {0!s}".format(type(authkey)))
message = connection.recv_bytes(256) # reject large message
assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
message = message[len(CHALLENGE):]
self._parent = current_process()
def start(self):
- assert self._parent is current_process()
+ if self._parent is not current_process():
+ raise RuntimeError(
+ "Parent is {0!r} but current_process is {1!r}".format(
+ self._parent, current_process()))
self._start_called = True
if hasattr(self._parent, '_children'):
self._parent._children[self] = None
if alive_r in rfds:
# EOF because no more client processes left
- assert os.read(alive_r, 1) == b''
+ assert os.read(alive_r, 1) == b'', "Not at EOF?"
raise SystemExit
if sig_r in rfds:
if os.WIFSIGNALED(sts):
returncode = -os.WTERMSIG(sts)
else:
- assert os.WIFEXITED(sts)
+ if not os.WIFEXITED(sts):
+ raise AssertionError(
+ "Child {0:n} status is {1:n}".format(
+ pid,sts))
returncode = os.WEXITSTATUS(sts)
# Send exit code to client process
try:
with listener.accept()[0] as s:
# Receive fds from client
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
- assert len(fds) <= MAXFDS_TO_SEND
+ if len(fds) > MAXFDS_TO_SEND:
+ raise RuntimeError(
+ "Too many ({0:n}) fds to send".format(
+ len(fds)))
child_r, child_w, *fds = fds
s.close()
pid = os.fork()
# synchronously sometimes later from malloc() or free(), by calling
# _free_pending_blocks() (appending and retrieving from a list is not
# strictly thread-safe but under cPython it's atomic thanks to the GIL).
- assert os.getpid() == self._lastpid
+ if os.getpid() != self._lastpid:
+ raise ValueError(
+ "My pid ({0:n}) is not last pid {1:n}".format(
+ os.getpid(),self._lastpid))
if not self._lock.acquire(False):
# can't acquire the lock right now, add the block to the list of
# pending blocks to free
def malloc(self, size):
# return a block of right size (possibly rounded up)
- assert 0 <= size < sys.maxsize
+ if size < 0:
+ raise ValueError("Size {0:n} out of range".format(size))
+ if sys.maxsize <= size:
+ raise OverflowError("Size {0:n} too large".format(size))
if os.getpid() != self._lastpid:
self.__init__() # reinitialize after fork
with self._lock:
_heap = Heap()
def __init__(self, size):
- assert 0 <= size < sys.maxsize
+ if size < 0:
+ raise ValueError("Size {0:n} out of range".format(size))
+ if sys.maxsize <= size:
+ raise OverflowError("Size {0:n} too large".format(size))
block = BufferWrapper._heap.malloc(size)
self._state = (block, size)
util.Finalize(self, BufferWrapper._heap.free, args=(block,))
from traceback import format_exc
from . import connection
-from .context import reduction, get_spawning_popen
+from .context import reduction, get_spawning_popen, ProcessError
from . import pool
from . import process
from . import util
'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
def __init__(self, registry, address, authkey, serializer):
- assert isinstance(authkey, bytes)
+ if not isinstance(authkey, bytes):
+ raise TypeError(
+ "Authkey {0!r} is type {1!s}, not bytes".format(
+ authkey, type(authkey)))
self.registry = registry
self.authkey = process.AuthenticationString(authkey)
Listener, Client = listener_client[serializer]
except (KeyboardInterrupt, SystemExit):
pass
finally:
- if sys.stdout != sys.__stdout__:
+ if sys.stdout != sys.__stdout__: # what about stderr?
util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
'''
Return some info --- useful to spot problems with refcounting
'''
+ # Perhaps include debug info about 'c'?
with self.mutex:
result = []
keys = list(self.id_to_refcount.keys())
self.registry[typeid]
if callable is None:
- assert len(args) == 1 and not kwds
+ if kwds or (len(args) != 1):
+ raise ValueError(
+ "Without callable, must have one non-keyword argument")
obj = args[0]
else:
obj = callable(*args, **kwds)
if exposed is None:
exposed = public_methods(obj)
if method_to_typeid is not None:
- assert type(method_to_typeid) is dict
+ if not isinstance(method_to_typeid, dict):
+ raise TypeError(
+ "Method_to_typeid {0!r}: type {1!s}, not dict".format(
+ method_to_typeid, type(method_to_typeid)))
exposed = list(exposed) + list(method_to_typeid)
ident = '%x' % id(obj) # convert to string because xmlrpclib
return
with self.mutex:
- assert self.id_to_refcount[ident] >= 1
+ if self.id_to_refcount[ident] <= 0:
+ raise AssertionError(
+ "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
+ ident, self.id_to_obj[ident],
+ self.id_to_refcount[ident]))
self.id_to_refcount[ident] -= 1
if self.id_to_refcount[ident] == 0:
del self.id_to_refcount[ident]
'''
Return server object with serve_forever() method and address attribute
'''
- assert self._state.value == State.INITIAL
+ if self._state.value != State.INITIAL:
+ if self._state.value == State.STARTED:
+ raise ProcessError("Already started server")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("Manager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
return Server(self._registry, self._address,
self._authkey, self._serializer)
'''
Spawn a server process for this manager object
'''
- assert self._state.value == State.INITIAL
+ if self._state.value != State.INITIAL:
+ if self._state.value == State.STARTED:
+ raise ProcessError("Already started server")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("Manager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
if initializer is not None and not callable(initializer):
raise TypeError('initializer must be a callable')
def __enter__(self):
if self._state.value == State.INITIAL:
self.start()
- assert self._state.value == State.STARTED
+ if self._state.value != State.STARTED:
+ if self._state.value == State.INITIAL:
+ raise ProcessError("Unable to start server")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("Manager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
getattr(proxytype, '_method_to_typeid_', None)
if method_to_typeid:
- for key, value in list(method_to_typeid.items()):
+ for key, value in list(method_to_typeid.items()): # isinstance?
assert type(key) is str, '%r is not a string' % key
assert type(value) is str, '%r is not a string' % value
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False):
- assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
+ if (maxtasks is not None) and not (isinstance(maxtasks, int)
+ and maxtasks >= 1):
+ raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
+ Pool must be running.
'''
- assert self._state == RUN
return self.apply_async(func, args, kwds).get()
def map(self, func, iterable, chunksize=None):
))
return result
else:
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0:n}".format(
+ chunksize))
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
))
return result
else:
- assert chunksize > 1
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0!r}".format(chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put(
return
if thread._state:
- assert thread._state == TERMINATE
+ assert thread._state == TERMINATE, "Thread not in TERMINATE"
util.debug('result handler found thread._state=TERMINATE')
break
def join(self):
util.debug('joining pool')
- assert self._state in (CLOSE, TERMINATE)
+ if self._state == RUN:
+ raise ValueError("Pool is still running")
+ elif self._state not in (CLOSE, TERMINATE):
+ raise ValueError("In unknown state")
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
- assert result_handler.is_alive() or len(cache) == 0
+ if (not result_handler.is_alive()) and (len(cache) != 0):
+ raise AssertionError(
+ "Cannot have cache with result_hander not alive")
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
return self._event.is_set()
def successful(self):
- assert self.ready()
+ if not self.ready():
+ raise ValueError("{0!r} not ready".format(self))
return self._success
def wait(self, timeout=None):
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
else:
- assert os.WIFEXITED(sts)
+ assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
self.returncode = os.WEXITSTATUS(sts)
return self.returncode
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
- assert not self._closed
+ assert not self._closed, "Queue {0!r} has been closed".format(self)
if not self._sem.acquire(block, timeout):
raise Full
def join_thread(self):
debug('Queue.join_thread()')
- assert self._closed
+ assert self._closed, "Queue {0!r} not closed".format(self)
if self._jointhread:
self._jointhread()
self._cond, self._unfinished_tasks = state[-2:]
def put(self, obj, block=True, timeout=None):
- assert not self._closed
+ assert not self._closed, "Queue {0!r} is closed".format(self)
if not self._sem.acquire(block, timeout):
raise Full
if len(cmsg_data) % a.itemsize != 0:
raise ValueError
a.frombytes(cmsg_data)
- assert len(a) % 256 == msg[0]
+ if len(a) % 256 != msg[0]:
+ raise AssertionError(
+ "Len is {0:n} but msg[0] is {1!r}".format(
+ len(a), msg[0]))
return list(a)
except (ValueError, IndexError):
pass
def _start(self):
from .connection import Listener
- assert self._listener is None
+ assert self._listener is None, "Already have Listener"
util.debug('starting listener and thread for sending handles')
self._listener = Listener(authkey=process.current_process().authkey)
self._address = self._listener.address
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('name too long')
nbytes = os.write(self._fd, msg)
- assert nbytes == len(msg)
+ assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
+ nbytes, len(msg))
_semaphore_tracker = SemaphoreTracker()
'''
Run code specified by data received over pipe
'''
- assert is_forking(sys.argv)
+ assert is_forking(sys.argv), "Not forking"
if sys.platform == 'win32':
import msvcrt
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
def notify(self, n=1):
assert self._lock._semlock._is_mine(), 'lock is not owned'
- assert not self._wait_semaphore.acquire(False)
+ assert not self._wait_semaphore.acquire(
+ False), ('notify: Should not have been able to acquire'
+ + '_wait_semaphore')
# to take account of timeouts since last notify*() we subtract
# woken_count from sleeping_count and rezero woken_count
while self._woken_count.acquire(False):
res = self._sleeping_count.acquire(False)
- assert res
+ assert res, ('notify: Bug in sleeping_count.acquire'
+ + '- res should not be False')
sleepers = 0
while sleepers < n and self._sleeping_count.acquire(False):
Class which supports object finalization using weakrefs
'''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
- assert exitpriority is None or type(exitpriority) is int
+ if (exitpriority is not None) and not isinstance(exitpriority,int):
+ raise TypeError(
+ "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
+ exitpriority, type(exitpriority)))
if obj is not None:
self._weakref = weakref.ref(obj, self)
- else:
- assert exitpriority is not None
+ elif exitpriority is None:
+ raise ValueError("Without object, exitpriority cannot be None")
self._callback = callback
self._args = args
--- /dev/null
+Many asserts in `multiprocessing` are now more informative, and some error types have been changed to more specific ones.