MULTI_STAGE_ID = 1
MULTI_DEVICE_ID = 2
-DEFAULT_TIMEOUT=20
+DEFAULT_TIMEOUT = 20
DUT_STARTUP_CHECK_RETRY_COUNT = 5
TEST_HISTROY_CHECK_TIMEOUT = 1
# before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
# this could cause checking bootup print failed.
# now use input cmd `-` and check test history to check if DUT is bootup.
- # we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
+ # we'll retry this step for a few times,
+ # in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
dut.write("-")
try:
except ExpectTimeout:
pass
else:
- raise AssertationError("Reset {} ({}) failed!".format(dut.name, dut.port))
+ raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
def run_one_normal_case(dut, one_case, junit_test_case, failed_cases):
3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
+ :param env: test env instance
:param extra_data: the case name or case list or case dictionary
:return: None
"""
raise AssertionError("Unit Test Failed")
-
class Handler(threading.Thread):
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
expected_signal = data[0]
while 1:
if time.time() > start_time + self.timeout:
- Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
+ Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange")
break
with self.lock:
if expected_signal in self.sent_signal_list:
Utility.console_log("Ignored: " + self.child_case_name, color="orange")
one_device_case_finish(not int(data[0]))
-
try:
time.sleep(1)
self.dut.write("\"{}\"".format(self.parent_case_name))
Utility.console_log("No case detected!", color="orange")
while not self.finish and not self.force_stop.isSet():
try:
- self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
+ self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'),
+ get_child_case_name),
(self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
(self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
(self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
dut = env.get_dut(name, app_path=ut_config)
duts[name] = dut
replace_app_bin(dut, "unit-test-app", app_bin)
- dut.start_app() # download bin to board
+ dut.start_app() # download bin to board
return dut
-def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case):
+def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, app_bin, junit_test_case):
lock = threading.RLock()
threads = []
send_signal_list = []
for one_case in case_config[ut_config]:
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
- run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case)
+ run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases,
+ one_case.get('app_bin'), junit_test_case)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
+ :param env: test env instance
:param extra_data: the case name or case list or case dictionary
:return: None
"""
dut.write("")
dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
- def find_update_dic(name, t, timeout, child_case_num=None):
- for dic in extra_data:
- if dic['name'] == name:
- dic['type'] = t
- if 'timeout' not in dic:
- dic['timeout'] = timeout
+ def find_update_dic(name, _t, _timeout, child_case_num=None):
+ for _case_data in extra_data:
+ if _case_data['name'] == name:
+ _case_data['type'] = _t
+ if 'timeout' not in _case_data:
+ _case_data['timeout'] = _timeout
if child_case_num:
- dic['child case num'] = child_case_num
+ _case_data['child case num'] = child_case_num
try:
while True:
if data[1] and re.search(END_LIST_STR, data[1]):
break
# check if the unit test case names are correct, i.e. they could be found in the device
- for dic in extra_data:
- if 'type' not in dic:
- raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(dic.get('name')))
+ for _dic in extra_data:
+ if 'type' not in _dic:
+ raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
except ExpectTimeout:
Utility.console_log("Timeout during getting the test list", color="red")
finally:
default=1
)
parser.add_argument("--env_config_file", "-e",
- help="test env config file",
- default=None
- )
+ help="test env config file",
+ default=None
+ )
parser.add_argument("--app_bin", "-b",
- help="application binary file for flashing the chip",
- default=None
- )
+ help="application binary file for flashing the chip",
+ default=None
+ )
parser.add_argument(
'test',
help='Comma separated list of <option>:<argument> where option can be "name" (default), "child case num", \
env_config['app'] = UT
env_config['dut'] = IDF.IDFDUT
env_config['test_suite_name'] = 'unit_test_parsing'
- env = Env.Env(**env_config)
- detect_update_unit_test_info(env, extra_data=list_of_dicts, app_bin=args.app_bin)
+ test_env = Env.Env(**env_config)
+ detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
- for i in range(1, args.repeat+1):
+ for index in range(1, args.repeat+1):
if args.repeat > 1:
- Utility.console_log("Repetition {}".format(i), color="green")
+ Utility.console_log("Repetition {}".format(index), color="green")
for dic in list_of_dicts:
t = dic.get('type', SIMPLE_TEST_ID)
if t == SIMPLE_TEST_ID: