time.sleep(0.1)
# send sentinel to stop workers
pool._taskqueue.put(None)
- debug('worker handler exiting')
+ util.debug('worker handler exiting')
@staticmethod
- def _handle_tasks(taskqueue, put, outqueue, pool):
+ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
break
try:
put(task)
- except OSError:
- util.debug('could not put task on queue')
- break
+ except Exception as e:
+ job, ind = task[:2]
+ try:
+ cache[job]._set(ind, (False, e))
+ except KeyError:
+ pass
else:
if set_length:
- debug('doing set_length()')
+ util.debug('doing set_length()')
set_length(i+1)
continue
break