From 5b3d09d5c883c7245cafd8e7170466052fd10cf7 Mon Sep 17 00:00:00 2001 From: He Yin Ling Date: Tue, 20 Nov 2018 22:18:04 +0800 Subject: [PATCH] test: fix unit test script code style warnings --- tools/unit-test-app/unit_test.py | 63 +++++++++++++++++--------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/tools/unit-test-app/unit_test.py b/tools/unit-test-app/unit_test.py index 128b895049..1aae0315b3 100755 --- a/tools/unit-test-app/unit_test.py +++ b/tools/unit-test-app/unit_test.py @@ -52,7 +52,7 @@ SIMPLE_TEST_ID = 0 MULTI_STAGE_ID = 1 MULTI_DEVICE_ID = 2 -DEFAULT_TIMEOUT=20 +DEFAULT_TIMEOUT = 20 DUT_STARTUP_CHECK_RETRY_COUNT = 5 TEST_HISTROY_CHECK_TIMEOUT = 1 @@ -149,7 +149,8 @@ def reset_dut(dut): # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened. # this could cause checking bootup print failed. # now use input cmd `-` and check test history to check if DUT is bootup. - # we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command). + # we'll retry this step for a few times, + # in case `dut.reset` returns during DUT bootup (when DUT can't process any command). for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT): dut.write("-") try: @@ -158,7 +159,7 @@ def reset_dut(dut): except ExpectTimeout: pass else: - raise AssertationError("Reset {} ({}) failed!".format(dut.name, dut.port)) + raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port)) def run_one_normal_case(dut, one_case, junit_test_case, failed_cases): @@ -250,6 +251,7 @@ def run_unit_test_cases(env, extra_data): 3. as list of string or dict: [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...] + :param env: test env instance :param extra_data: the case name or case list or case dictionary :return: None """ @@ -287,7 +289,6 @@ def run_unit_test_cases(env, extra_data): raise AssertionError("Unit Test Failed") - class Handler(threading.Thread): WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!') @@ -335,7 +336,7 @@ class Handler(threading.Thread): expected_signal = data[0] while 1: if time.time() > start_time + self.timeout: - Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange") + Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange") break with self.lock: if expected_signal in self.sent_signal_list: @@ -356,7 +357,6 @@ class Handler(threading.Thread): Utility.console_log("Ignored: " + self.child_case_name, color="orange") one_device_case_finish(not int(data[0])) - try: time.sleep(1) self.dut.write("\"{}\"".format(self.parent_case_name)) @@ -365,7 +365,8 @@ class Handler(threading.Thread): Utility.console_log("No case detected!", color="orange") while not self.finish and not self.force_stop.isSet(): try: - self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name), + self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), + get_child_case_name), (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern @@ -392,11 +393,11 @@ def get_dut(duts, env, name, ut_config, app_bin=None): dut = env.get_dut(name, app_path=ut_config) duts[name] = dut replace_app_bin(dut, "unit-test-app", app_bin) - dut.start_app() # download bin to board + dut.start_app() # download bin to board return dut -def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case): +def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, app_bin, junit_test_case): lock = threading.RLock() threads = [] send_signal_list = [] @@ -456,7 +457,8 @@ def run_multiple_devices_cases(env, extra_data): for one_case in case_config[ut_config]: junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"])) try: - run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case) + run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, + one_case.get('app_bin'), junit_test_case) TinyFW.JunitReport.test_case_finish(junit_test_case) except Exception as e: junit_test_case.add_error_info("Unexpected exception: " + str(e)) @@ -579,6 +581,7 @@ def run_multiple_stage_cases(env, extra_data): 3. as list of string or dict: [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...] + :param env: test env instance :param extra_data: the case name or case list or case dictionary :return: None """ @@ -629,14 +632,14 @@ def detect_update_unit_test_info(env, extra_data, app_bin): dut.write("") dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT) - def find_update_dic(name, t, timeout, child_case_num=None): - for dic in extra_data: - if dic['name'] == name: - dic['type'] = t - if 'timeout' not in dic: - dic['timeout'] = timeout + def find_update_dic(name, _t, _timeout, child_case_num=None): + for _case_data in extra_data: + if _case_data['name'] == name: + _case_data['type'] = _t + if 'timeout' not in _case_data: + _case_data['timeout'] = _timeout if child_case_num: - dic['child case num'] = child_case_num + _case_data['child case num'] = child_case_num try: while True: @@ -666,9 +669,9 @@ def detect_update_unit_test_info(env, extra_data, app_bin): if data[1] and re.search(END_LIST_STR, data[1]): break # check if the unit test case names are correct, i.e. they could be found in the device - for dic in extra_data: - if 'type' not in dic: - raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(dic.get('name'))) + for _dic in extra_data: + if 'type' not in _dic: + raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name'))) except ExpectTimeout: Utility.console_log("Timeout during getting the test list", color="red") finally: @@ -687,13 +690,13 @@ if __name__ == '__main__': default=1 ) parser.add_argument("--env_config_file", "-e", - help="test env config file", - default=None - ) + help="test env config file", + default=None + ) parser.add_argument("--app_bin", "-b", - help="application binary file for flashing the chip", - default=None - ) + help="application binary file for flashing the chip", + default=None + ) parser.add_argument( 'test', help='Comma separated list of