]> granicus.if.org Git - esp-idf/commitdiff
test: fix unit test script code style warnings
authorHe Yin Ling <heyinling@espressif.com>
Tue, 20 Nov 2018 14:18:04 +0000 (22:18 +0800)
committerHe Yin Ling <heyinling@espressif.com>
Mon, 26 Nov 2018 02:07:41 +0000 (10:07 +0800)
tools/unit-test-app/unit_test.py

index 128b8950491a3bd8a0073e7d042ebf0262b9a3bf..1aae0315b390bb8b6bb76d3cae677f73489121e6 100755 (executable)
@@ -52,7 +52,7 @@ SIMPLE_TEST_ID = 0
 MULTI_STAGE_ID = 1
 MULTI_DEVICE_ID = 2
 
-DEFAULT_TIMEOUT=20
+DEFAULT_TIMEOUT = 20
 
 DUT_STARTUP_CHECK_RETRY_COUNT = 5
 TEST_HISTROY_CHECK_TIMEOUT = 1
@@ -149,7 +149,8 @@ def reset_dut(dut):
     # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
     # this could cause checking bootup print failed.
     # now use input cmd `-` and check test history to check if DUT is bootup.
-    # we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
+    # we'll retry this step for a few times,
+    # in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
     for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
         dut.write("-")
         try:
@@ -158,7 +159,7 @@ def reset_dut(dut):
         except ExpectTimeout:
             pass
     else:
-        raise AssertationError("Reset {} ({}) failed!".format(dut.name, dut.port))
+        raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
 
 
 def run_one_normal_case(dut, one_case, junit_test_case, failed_cases):
@@ -250,6 +251,7 @@ def run_unit_test_cases(env, extra_data):
     3. as list of string or dict:
                [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
 
+    :param env: test env instance
     :param extra_data: the case name or case list or case dictionary
     :return: None
     """
@@ -287,7 +289,6 @@ def run_unit_test_cases(env, extra_data):
         raise AssertionError("Unit Test Failed")
 
 
-
 class Handler(threading.Thread):
 
     WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
@@ -335,7 +336,7 @@ class Handler(threading.Thread):
             expected_signal = data[0]
             while 1:
                 if time.time() > start_time + self.timeout:
-                    Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
+                    Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange")
                     break
                 with self.lock:
                     if expected_signal in self.sent_signal_list:
@@ -356,7 +357,6 @@ class Handler(threading.Thread):
                 Utility.console_log("Ignored: " + self.child_case_name, color="orange")
             one_device_case_finish(not int(data[0]))
 
-
         try:
             time.sleep(1)
             self.dut.write("\"{}\"".format(self.parent_case_name))
@@ -365,7 +365,8 @@ class Handler(threading.Thread):
             Utility.console_log("No case detected!", color="orange")
         while not self.finish and not self.force_stop.isSet():
             try:
-                self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
+                self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'),
+                                     get_child_case_name),
                                     (self.WAIT_SIGNAL_PATTERN, device_wait_action),  # wait signal pattern
                                     (self.SEND_SIGNAL_PATTERN, device_send_action),  # send signal pattern
                                     (self.FINISH_PATTERN, handle_device_test_finish),  # test finish pattern
@@ -392,11 +393,11 @@ def get_dut(duts, env, name, ut_config, app_bin=None):
         dut = env.get_dut(name, app_path=ut_config)
         duts[name] = dut
         replace_app_bin(dut, "unit-test-app", app_bin)
-        dut.start_app() # download bin to board
+        dut.start_app()  # download bin to board
     return dut
 
 
-def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case):
+def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, app_bin, junit_test_case):
     lock = threading.RLock()
     threads = []
     send_signal_list = []
@@ -456,7 +457,8 @@ def run_multiple_devices_cases(env, extra_data):
         for one_case in case_config[ut_config]:
             junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
             try:
-                run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case)
+                run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases,
+                                              one_case.get('app_bin'), junit_test_case)
                 TinyFW.JunitReport.test_case_finish(junit_test_case)
             except Exception as e:
                 junit_test_case.add_error_info("Unexpected exception: " + str(e))
@@ -579,6 +581,7 @@ def run_multiple_stage_cases(env, extra_data):
     3. as list of string or dict:
                [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
 
+    :param env: test env instance
     :param extra_data: the case name or case list or case dictionary
     :return: None
     """
@@ -629,14 +632,14 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
         dut.write("")
         dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
 
-        def find_update_dic(name, t, timeout, child_case_num=None):
-            for dic in extra_data:
-                if dic['name'] == name:
-                    dic['type'] = t
-                    if 'timeout' not in dic:
-                        dic['timeout'] = timeout
+        def find_update_dic(name, _t, _timeout, child_case_num=None):
+            for _case_data in extra_data:
+                if _case_data['name'] == name:
+                    _case_data['type'] = _t
+                    if 'timeout' not in _case_data:
+                        _case_data['timeout'] = _timeout
                     if child_case_num:
-                        dic['child case num'] = child_case_num
+                        _case_data['child case num'] = child_case_num
 
         try:
             while True:
@@ -666,9 +669,9 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
                 if data[1] and re.search(END_LIST_STR, data[1]):
                     break
             # check if the unit test case names are correct, i.e. they could be found in the device
-            for dic in extra_data:
-                if 'type' not in dic:
-                    raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(dic.get('name')))
+            for _dic in extra_data:
+                if 'type' not in _dic:
+                    raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
         except ExpectTimeout:
             Utility.console_log("Timeout during getting the test list", color="red")
         finally:
@@ -687,13 +690,13 @@ if __name__ == '__main__':
         default=1
     )
     parser.add_argument("--env_config_file", "-e",
-        help="test env config file",
-        default=None
-    )
+                        help="test env config file",
+                        default=None
+                        )
     parser.add_argument("--app_bin", "-b",
-        help="application binary file for flashing the chip",
-        default=None
-    )
+                        help="application binary file for flashing the chip",
+                        default=None
+                        )
     parser.add_argument(
         'test',
         help='Comma separated list of <option>:<argument> where option can be "name" (default), "child case num", \
@@ -727,12 +730,12 @@ if __name__ == '__main__':
     env_config['app'] = UT
     env_config['dut'] = IDF.IDFDUT
     env_config['test_suite_name'] = 'unit_test_parsing'
-    env = Env.Env(**env_config)
-    detect_update_unit_test_info(env, extra_data=list_of_dicts, app_bin=args.app_bin)
+    test_env = Env.Env(**env_config)
+    detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
 
-    for i in range(1, args.repeat+1):
+    for index in range(1, args.repeat+1):
         if args.repeat > 1:
-            Utility.console_log("Repetition {}".format(i), color="green")
+            Utility.console_log("Repetition {}".format(index), color="green")
         for dic in list_of_dicts:
             t = dic.get('type', SIMPLE_TEST_ID)
             if t == SIMPLE_TEST_ID: