noExecute, debug, isWindows,
params, config_prefix = None,
maxIndividualTestTime = 0,
- maxFailures = None):
+ maxFailures = None,
+ parallelism_groups = []):
# The name of the test runner.
self.progname = progname
# The items to add to the PATH environment variable.
self.maxIndividualTestTime = maxIndividualTestTime
self.maxFailures = maxFailures
+ self.parallelism_groups = parallelism_groups
@property
def maxIndividualTestTime(self):
environment, substitutions, unsupported,
test_exec_root, test_source_root, excludes,
available_features, pipefail, limit_to_features = [],
- is_early = False):
+ is_early = False, parallelism_group = ""):
self.parent = parent
self.name = str(name)
self.suffixes = set(suffixes)
self.limit_to_features = set(limit_to_features)
# Whether the suite should be tested early in a given run.
self.is_early = bool(is_early)
+ self.parallelism_group = parallelism_group
def finish(self, litConfig):
"""finish() - Finish this config object, after loading is complete."""
params = userParams,
config_prefix = opts.configPrefix,
maxIndividualTestTime = maxIndividualTestTime,
- maxFailures = opts.maxFailures)
+ maxFailures = opts.maxFailures,
+ parallelism_groups = {})
# Perform test discovery.
run = lit.run.Run(litConfig,
self.tests = tests
def execute_test(self, test):
+ pg = test.config.parallelism_group
+ if callable(pg): pg = pg(test)
+
result = None
- start_time = time.time()
+ semaphore = None
try:
+ if pg: semaphore = self.parallelism_semaphores[pg]
+ if semaphore: semaphore.acquire()
+ start_time = time.time()
result = test.config.test_format.execute(test, self.lit_config)
# Support deprecated result from execute() which returned the result
result = lit.Test.Result(code, output)
elif not isinstance(result, lit.Test.Result):
raise ValueError("unexpected result from test execution")
+
+ result.elapsed = time.time() - start_time
except KeyboardInterrupt:
raise
except:
output += traceback.format_exc()
output += '\n'
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
- result.elapsed = time.time() - start_time
+ finally:
+ if semaphore: semaphore.release()
test.setResult(result)
try:
task_impl = multiprocessing.Process
queue_impl = multiprocessing.Queue
+ sem_impl = multiprocessing.Semaphore
canceled_flag = multiprocessing.Value('i', 0)
consumer = MultiprocessResultsConsumer(self, display, jobs)
except:
if not consumer:
task_impl = threading.Thread
queue_impl = queue.Queue
+ sem_impl = threading.Semaphore
canceled_flag = LockedValue(0)
consumer = ThreadResultsConsumer(display)
+ self.parallelism_semaphores = {k: sem_impl(v)
+ for k, v in self.lit_config.parallelism_groups.items()}
+
# Create the test provider.
provider = TestProvider(queue_impl, canceled_flag)
handleFailures(provider, consumer, self.lit_config.maxFailures)