debug_group.add_argument("--show-tests", dest="showTests",
help="Show all discovered tests",
action="store_true", default=False)
- debug_group.add_argument("--use-processes", dest="useProcesses",
+ debug_group.add_argument("--use-process-pool", dest="executionStrategy",
+ help="Run tests in parallel with a process pool",
+ action="store_const", const="PROCESS_POOL")
+ debug_group.add_argument("--use-processes", dest="executionStrategy",
help="Run tests in parallel with processes (not threads)",
- action="store_true", default=True)
- debug_group.add_argument("--use-threads", dest="useProcesses",
+ action="store_const", const="PROCESSES")
+ debug_group.add_argument("--use-threads", dest="executionStrategy",
help="Run tests in parallel with threads (not processes)",
- action="store_false", default=True)
+ action="store_const", const="THREADS")
opts = parser.parse_args()
args = opts.test_paths
if opts.numThreads is None:
opts.numThreads = lit.util.detectCPUs()
+ if opts.executionStrategy is None:
+ opts.executionStrategy = 'PROCESS_POOL'
+
if opts.maxFailures == 0:
parser.error("Setting --max-failures to 0 does not have any effect.")
display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try:
run.execute_tests(display, opts.numThreads, opts.maxTime,
- opts.useProcesses)
+ opts.executionStrategy)
except KeyboardInterrupt:
sys.exit(2)
display.finish()
import os
+import sys
import threading
import time
import traceback
def run_test(self, test_index):
test = self.run_instance.tests[test_index]
try:
- self.run_instance.execute_test(test)
+ execute_test(test, self.run_instance.lit_config,
+ self.run_instance.parallelism_semaphores)
except KeyboardInterrupt:
# This is a sad hack. Unfortunately subprocess goes
# bonkers with ctrl-c and we start forking merrily.
print('\nCtrl-C detected, goodbye.')
+ sys.stdout.flush()
os.kill(0,9)
self.consumer.update(test_index, test)
def handleFailures(provider, consumer, maxFailures):
consumer.display = _Display(consumer.display, provider, maxFailures)
+def execute_test(test, lit_config, parallelism_semaphores):
+ """Execute one test"""
+ pg = test.config.parallelism_group
+ if callable(pg):
+ pg = pg(test)
+
+ result = None
+ semaphore = None
+ try:
+ if pg:
+ semaphore = parallelism_semaphores[pg]
+ if semaphore:
+ semaphore.acquire()
+ start_time = time.time()
+ result = test.config.test_format.execute(test, lit_config)
+ # Support deprecated result from execute() which returned the result
+ # code and additional output as a tuple.
+ if isinstance(result, tuple):
+ code, output = result
+ result = lit.Test.Result(code, output)
+ elif not isinstance(result, lit.Test.Result):
+ raise ValueError("unexpected result from test execution")
+ result.elapsed = time.time() - start_time
+ except KeyboardInterrupt:
+ raise
+ except:
+ if lit_config.debug:
+ raise
+ output = 'Exception during script execution:\n'
+ output += traceback.format_exc()
+ output += '\n'
+ result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+ finally:
+ if semaphore:
+ semaphore.release()
+
+ test.setResult(result)
+
class Run(object):
"""
This class represents a concrete, configured testing run.
self.tests = tests
def execute_test(self, test):
- pg = test.config.parallelism_group
- if callable(pg): pg = pg(test)
-
- result = None
- semaphore = None
- try:
- if pg: semaphore = self.parallelism_semaphores[pg]
- if semaphore: semaphore.acquire()
- start_time = time.time()
- result = test.config.test_format.execute(test, self.lit_config)
-
- # Support deprecated result from execute() which returned the result
- # code and additional output as a tuple.
- if isinstance(result, tuple):
- code, output = result
- result = lit.Test.Result(code, output)
- elif not isinstance(result, lit.Test.Result):
- raise ValueError("unexpected result from test execution")
-
- result.elapsed = time.time() - start_time
- except KeyboardInterrupt:
- raise
- except:
- if self.lit_config.debug:
- raise
- output = 'Exception during script execution:\n'
- output += traceback.format_exc()
- output += '\n'
- result = lit.Test.Result(lit.Test.UNRESOLVED, output)
- finally:
- if semaphore: semaphore.release()
-
- test.setResult(result)
+ return execute_test(test, self.lit_config, self.parallelism_semaphores)
def execute_tests(self, display, jobs, max_time=None,
- use_processes=False):
+ execution_strategy=None):
"""
execute_tests(display, jobs, [max_time])
be given an UNRESOLVED result.
"""
+ if execution_strategy == 'PROCESS_POOL':
+ self.execute_tests_with_mp_pool(display, jobs, max_time)
+ return
+ # FIXME: Standardize on the PROCESS_POOL execution strategy and remove
+ # the other two strategies.
+
+ use_processes = execution_strategy == 'PROCESSES'
+
# Choose the appropriate parallel execution implementation.
consumer = None
if jobs != 1 and use_processes and multiprocessing:
provider = TestProvider(queue_impl, canceled_flag)
handleFailures(provider, consumer, self.lit_config.maxFailures)
- # Queue the tests outside the main thread because we can't guarantee
- # that we can put() all the tests without blocking:
+ # Putting tasks into the threading or multiprocessing Queue may block,
+ # so do it in a separate thread.
# https://docs.python.org/2/library/multiprocessing.html
# e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
# without taking any out.
# Wait for all the tasks to complete.
for t in tasks:
t.join()
+
+ def execute_tests_with_mp_pool(self, display, jobs, max_time=None):
+ # Don't do anything if we aren't going to run any tests.
+ if not self.tests or jobs == 0:
+ return
+
+ # Set up semaphores to limit parallelism of certain classes of tests.
+ # For example, some ASan tests require lots of virtual memory and run
+ # faster with less parallelism on OS X.
+ self.parallelism_semaphores = \
+ {k: multiprocessing.Semaphore(v) for k, v in
+ self.lit_config.parallelism_groups.items()}
+
+ # Save the display object on the runner so that we can update it from
+ # our task completion callback.
+ self.display = display
+
+ # Start a process pool. Copy over the data shared between all test runs.
+ pool = multiprocessing.Pool(jobs, worker_initializer,
+ (self.lit_config,
+ self.parallelism_semaphores))
+
+ # Install a console-control signal handler on Windows.
+ if win32api is not None:
+ def console_ctrl_handler(type):
+ print "Ctr-C received, terminating"
+ pool.terminate()
+ pool.join()
+ os.kill(0,9)
+ return True
+ win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+
+ # FIXME: Implement max_time using .wait() timeout argument and a
+ # deadline.
+
+ try:
+ async_results = [pool.apply_async(worker_run_one_test,
+ args=(test_index, test),
+ callback=self.consume_test_result)
+ for test_index, test in enumerate(self.tests)]
+
+ # Wait for all results to come in. The callback that runs in the
+ # parent process will update the display.
+ for a in async_results:
+ a.wait()
+ if not a.successful():
+ a.get() # Exceptions raised here come from the worker.
+ finally:
+ pool.terminate()
+ pool.join()
+
+ # Mark any tests that weren't run as UNRESOLVED.
+ for test in self.tests:
+ if test.result is None:
+ test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
+
+ def consume_test_result(self, pool_result):
+ """Test completion callback for worker_run_one_test
+
+ Updates the test result status in the parent process. Each task in the
+ pool returns the test index and the result, and we use the index to look
+ up the original test object. Also updates the progress bar as tasks
+ complete.
+ """
+ (test_index, test_with_result) = pool_result
+ # Update the parent process copy of the test. This includes the result,
+ # XFAILS, REQUIRES, and UNSUPPORTED statuses.
+ assert self.tests[test_index].file_path == test_with_result.file_path, \
+ "parent and child disagree on test path"
+ self.tests[test_index] = test_with_result
+ self.display.update(test_with_result)
+
+child_lit_config = None
+child_parallelism_semaphores = None
+
+def worker_initializer(lit_config, parallelism_semaphores):
+ """Copy expensive repeated data into worker processes"""
+ global child_lit_config
+ child_lit_config = lit_config
+ global child_parallelism_semaphores
+ child_parallelism_semaphores = parallelism_semaphores
+
+def worker_run_one_test(test_index, test):
+ """Run one test in a multiprocessing.Pool
+
+ Side effects in this function and functions it calls are not visible in the
+ main lit process.
+
+ Arguments and results of this function are pickled, so they should be cheap
+ to copy. For efficiency, we copy all data needed to execute all tests into
+ each worker and store it in the child_* global variables. This reduces the
+ cost of each task.
+
+ Returns an index and a Result, which the parent process uses to update
+ the display.
+ """
+ try:
+ execute_test(test, child_lit_config, child_parallelism_semaphores)
+ return (test_index, test)
+ except KeyboardInterrupt as e:
+ # This is a sad hack. Unfortunately subprocess goes
+ # bonkers with ctrl-c and we start forking merrily.
+ print('\nCtrl-C detected, goodbye.')
+ traceback.print_exc()
+ sys.stdout.flush()
+ os.kill(0,9)
+ except:
+ traceback.print_exc()