From 098e25672f1c3578855d5ded4f5147795c9ed956 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 3 Oct 2019 16:15:16 +0200 Subject: [PATCH] bpo-36670: Enhance regrtest (GH-16556) * Add log() method: add timestamp and load average prefixes to main messages. * WindowsLoadTracker: * LOAD_FACTOR_1 is now computed using SAMPLING_INTERVAL * Initialize the load to the arithmetic mean of the first 5 values of the Processor Queue Length value (so over 5 seconds), rather than 0.0. * Handle BrokenPipeError and when typeperf exit. * format_duration(1.5) now returns '1.5 sec', rather than '1 sec 500 ms' --- Lib/test/libregrtest/main.py | 35 +++++++++------- Lib/test/libregrtest/runtest_mp.py | 27 ++++++------ Lib/test/libregrtest/utils.py | 11 +++-- Lib/test/libregrtest/win_utils.py | 66 +++++++++++++++++++++--------- Lib/test/test_regrtest.py | 14 ++++--- 5 files changed, 97 insertions(+), 56 deletions(-) diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index fd701c452c..76ad3359f2 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -139,16 +139,8 @@ class Regrtest: print(xml_data, file=sys.__stderr__) raise - def display_progress(self, test_index, text): - if self.ns.quiet: - return - - # "[ 51/405/1] test_tcl passed" - line = f"{test_index:{self.test_count_width}}{self.test_count}" - fails = len(self.bad) + len(self.environment_changed) - if fails and not self.ns.pgo: - line = f"{line}/{fails}" - line = f"[{line}] {text}" + def log(self, line=''): + empty = not line # add the system load prefix: "load avg: 1.80 " load_avg = self.getloadavg() @@ -159,8 +151,23 @@ class Regrtest: test_time = time.monotonic() - self.start_time test_time = datetime.timedelta(seconds=int(test_time)) line = f"{test_time} {line}" + + if empty: + line = line[:-1] + print(line, flush=True) + def display_progress(self, test_index, text): + if self.ns.quiet: + return + + # "[ 51/405/1] test_tcl passed" + line = f"{test_index:{self.test_count_width}}{self.test_count}" + fails = len(self.bad) + len(self.environment_changed) + if fails and not self.ns.pgo: + line = f"{line}/{fails}" + self.log(f"[{line}] {text}") + def parse_args(self, kwargs): ns = _parse_args(sys.argv[1:], **kwargs) @@ -302,11 +309,11 @@ class Regrtest: self.first_result = self.get_tests_result() - print() - print("Re-running failed tests in verbose mode") + self.log() + self.log("Re-running failed tests in verbose mode") self.rerun = self.bad[:] for test_name in self.rerun: - print(f"Re-running {test_name} in verbose mode", flush=True) + self.log(f"Re-running {test_name} in verbose mode") self.ns.verbose = True result = runtest(self.ns, test_name) @@ -387,7 +394,7 @@ class Regrtest: save_modules = sys.modules.keys() - print("Run tests sequentially") + self.log("Run tests sequentially") previous_test = None for test_index, test_name in enumerate(self.tests, 1): diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py index a46c78248d..79afa29fa0 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/runtest_mp.py @@ -104,13 +104,14 @@ class ExitThread(Exception): class TestWorkerProcess(threading.Thread): - def __init__(self, worker_id, pending, output, ns, timeout): + def __init__(self, worker_id, runner): super().__init__() self.worker_id = worker_id - self.pending = pending - self.output = output - self.ns = ns - self.timeout = timeout + self.pending = runner.pending + self.output = runner.output + self.ns = runner.ns + self.timeout = runner.worker_timeout + self.regrtest = runner.regrtest self.current_test_name = None self.start_time = None self._popen = None @@ -294,7 +295,8 @@ class TestWorkerProcess(threading.Thread): if not self.is_alive(): break dt = time.monotonic() - start_time - print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True) + self.regrtest.log(f"Waiting for {self} thread " + f"for {format_duration(dt)}") if dt > JOIN_TIMEOUT: print_warning(f"Failed to join {self} in {format_duration(dt)}") break @@ -316,6 +318,7 @@ def get_running(workers): class MultiprocessTestRunner: def __init__(self, regrtest): self.regrtest = regrtest + self.log = self.regrtest.log self.ns = regrtest.ns self.output = queue.Queue() self.pending = MultiprocessIterator(self.regrtest.tests) @@ -326,11 +329,10 @@ class MultiprocessTestRunner: self.workers = None def start_workers(self): - self.workers = [TestWorkerProcess(index, self.pending, self.output, - self.ns, self.worker_timeout) + self.workers = [TestWorkerProcess(index, self) for index in range(1, self.ns.use_mp + 1)] - print("Run tests in parallel using %s child processes" - % len(self.workers)) + self.log("Run tests in parallel using %s child processes" + % len(self.workers)) for worker in self.workers: worker.start() @@ -364,7 +366,7 @@ class MultiprocessTestRunner: # display progress running = get_running(self.workers) if running and not self.ns.pgo: - print('running: %s' % ', '.join(running), flush=True) + self.log('running: %s' % ', '.join(running)) def display_result(self, mp_result): result = mp_result.result @@ -384,8 +386,7 @@ class MultiprocessTestRunner: if item[0]: # Thread got an exception format_exc = item[1] - print(f"regrtest worker thread failed: {format_exc}", - file=sys.stderr, flush=True) + print_warning(f"regrtest worker thread failed: {format_exc}") return True self.test_index += 1 diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index 2691a2c30c..40faed832c 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -17,11 +17,14 @@ def format_duration(seconds): if minutes: parts.append('%s min' % minutes) if seconds: - parts.append('%s sec' % seconds) - if ms: - parts.append('%s ms' % ms) + if parts: + # 2 min 1 sec + parts.append('%s sec' % seconds) + else: + # 1.0 sec + parts.append('%.1f sec' % (seconds + ms / 1000)) if not parts: - return '0 ms' + return '%s ms' % ms parts = parts[:2] return ' '.join(parts) diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py index 0b73a94630..028c01106d 100644 --- a/Lib/test/libregrtest/win_utils.py +++ b/Lib/test/libregrtest/win_utils.py @@ -1,4 +1,5 @@ import _winapi +import math import msvcrt import os import subprocess @@ -10,11 +11,14 @@ from test.libregrtest.utils import print_warning # Max size of asynchronous reads BUFSIZE = 8192 -# Exponential damping factor (see below) -LOAD_FACTOR_1 = 0.9200444146293232478931553241 - # Seconds per measurement SAMPLING_INTERVAL = 1 +# Exponential damping factor to compute exponentially weighted moving average +# on 1 minute (60 seconds) +LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) +# Initialize the load using the arithmetic mean of the first NVALUE values +# of the Processor Queue Length +NVALUE = 5 # Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names # of typeperf are registered COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" @@ -30,10 +34,10 @@ class WindowsLoadTracker(): """ def __init__(self): - self.load = 0.0 - self.counter_name = '' + self._values = [] + self._load = None self._buffer = '' - self.popen = None + self._popen = None self.start() def start(self): @@ -65,7 +69,7 @@ class WindowsLoadTracker(): # Spawn off the load monitor counter_name = self._get_counter_name() command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] - self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD) + self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD) # Close our copy of the write end of the pipe os.close(command_stdout) @@ -85,12 +89,16 @@ class WindowsLoadTracker(): process_queue_length = counters_dict['44'] return f'"\\{system}\\{process_queue_length}"' - def close(self): - if self.popen is None: + def close(self, kill=True): + if self._popen is None: return - self.popen.kill() - self.popen.wait() - self.popen = None + + self._load = None + + if kill: + self._popen.kill() + self._popen.wait() + self._popen = None def __del__(self): self.close() @@ -109,7 +117,7 @@ class WindowsLoadTracker(): value = value[1:-1] return float(value) - def read_lines(self): + def _read_lines(self): overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) bytes_read, res = overlapped.GetOverlappedResult(False) if res != 0: @@ -135,7 +143,21 @@ class WindowsLoadTracker(): return lines def getloadavg(self): - for line in self.read_lines(): + if self._popen is None: + return None + + returncode = self._popen.poll() + if returncode is not None: + self.close(kill=False) + return None + + try: + lines = self._read_lines() + except BrokenPipeError: + self.close() + return None + + for line in lines: line = line.rstrip() # Ignore the initial header: @@ -148,7 +170,7 @@ class WindowsLoadTracker(): continue try: - load = self._parse_line(line) + processor_queue_length = self._parse_line(line) except ValueError: print_warning("Failed to parse typeperf output: %a" % line) continue @@ -156,7 +178,13 @@ class WindowsLoadTracker(): # We use an exponentially weighted moving average, imitating the # load calculation on Unix systems. # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation - new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1) - self.load = new_load - - return self.load + # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + if self._load is not None: + self._load = (self._load * LOAD_FACTOR_1 + + processor_queue_length * (1.0 - LOAD_FACTOR_1)) + elif len(self._values) < NVALUE: + self._values.append(processor_queue_length) + else: + self._load = sum(self._values) / len(self._values) + + return self._load diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index fa37ecde37..aaa97e0408 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -25,6 +25,7 @@ from test.libregrtest import utils Py_DEBUG = hasattr(sys, 'gettotalrefcount') ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..') ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR)) +LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?' TEST_INTERRUPTED = textwrap.dedent(""" from signal import SIGINT, raise_signal @@ -388,8 +389,8 @@ class BaseTestCase(unittest.TestCase): self.assertRegex(output, regex) def parse_executed_tests(self, output): - regex = (r'^[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)' - % self.TESTNAME_REGEX) + regex = (r'^%s\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)' + % (LOG_PREFIX, self.TESTNAME_REGEX)) parser = re.finditer(regex, output, re.MULTILINE) return list(match.group(1) for match in parser) @@ -449,9 +450,10 @@ class BaseTestCase(unittest.TestCase): if rerun: regex = list_regex('%s re-run test%s', rerun) self.check_line(output, regex) - self.check_line(output, "Re-running failed tests in verbose mode") + regex = LOG_PREFIX + r"Re-running failed tests in verbose mode" + self.check_line(output, regex) for test_name in rerun: - regex = f"Re-running {test_name} in verbose mode" + regex = LOG_PREFIX + f"Re-running {test_name} in verbose mode" self.check_line(output, regex) if no_test_ran: @@ -1232,9 +1234,9 @@ class TestUtils(unittest.TestCase): self.assertEqual(utils.format_duration(10e-3), '10 ms') self.assertEqual(utils.format_duration(1.5), - '1 sec 500 ms') + '1.5 sec') self.assertEqual(utils.format_duration(1), - '1 sec') + '1.0 sec') self.assertEqual(utils.format_duration(2 * 60), '2 min') self.assertEqual(utils.format_duration(2 * 60 + 1), -- 2.40.0