from subprocess import Popen, PIPE
debug_output_pat = re.compile(r"\[\d+ refs\]$")
output = Queue()
- def tests_and_args():
- for test in tests:
- args_tuple = (
- (test, verbose, quiet),
- dict(huntrleaks=huntrleaks, use_resources=use_resources,
- debug=debug, output_on_failure=verbose3,
- failfast=failfast, match_tests=match_tests)
- )
- yield (test, args_tuple)
- pending = tests_and_args()
+ pending = MultiprocessTests(tests)
opt_args = support.args_from_interpreter_flags()
base_cmd = [sys.executable] + opt_args + ['-m', 'test.regrtest']
def work():
try:
while True:
try:
- test, args_tuple = next(pending)
+ test = next(pending)
except StopIteration:
output.put((None, None, None, None))
return
+ args_tuple = (
+ (test, verbose, quiet),
+ dict(huntrleaks=huntrleaks, use_resources=use_resources,
+ debug=debug, output_on_failure=verbose3,
+ failfast=failfast, match_tests=match_tests)
+ )
# -E is needed by some tests, e.g. test_import
# Running the child from the same working directory ensures
# that TEMPDIR for the child is the same when
test_index += 1
except KeyboardInterrupt:
interrupted = True
- pending.close()
+ pending.interrupted = True
for worker in workers:
worker.join()
else:
tests.append(modname)
return stdtests + sorted(tests)
+# We do not use a generator so multiple threads can call next().
+class MultiprocessTests(object):
+
+ """A thread-safe iterator over tests for multiprocess mode."""
+
+ def __init__(self, tests):
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.tests = tests
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self.lock:
+ if self.interrupted:
+ raise StopIteration('tests interrupted')
+ return next(self.tests)
+
def replace_stdout():
"""Set stdout encoder error handler to backslashreplace (as stderr error
handler) to avoid UnicodeEncodeError when printing a traceback"""