raise AssertionError("Unit Test Failed")
+
class Handler(threading.Thread):
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
self.result = False
self.fail_name = None
self.timeout = timeout
+ self.force_stop = threading.Event() # it show the running status
threading.Thread.__init__(self, name="{} Handler".format(dut))
def run(self):
def device_wait_action(data):
start_time = time.time()
expected_signal = data[0]
- while not THREAD_TERMINATE_FLAG:
+ while 1:
if time.time() > start_time + self.timeout:
Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
break
self.dut.expect("Running " + self.parent_case_name + "...")
except ExpectTimeout:
Utility.console_log("No case detected!", color="orange")
- THREAD_TERMINATE_FLAG = True
-
- while not self.finish and not THREAD_TERMINATE_FLAG:
+ while not self.finish and not self.force_stop.isSet():
try:
self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
(self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
one_device_case_finish(False)
break
+ def stop(self):
+ self.force_stop.set()
+
def get_case_info(one_case):
parent_case = one_case["name"]
result = True
parent_case, case_num = get_case_info(one_case)
- global THREAD_TERMINATE_FLAG
- THREAD_TERMINATE_FLAG = False
-
for i in range(case_num):
dut = get_dut(duts, env, "dut%d" % i, ut_config, app_bin)
threads.append(Handler(dut, send_signal_list, lock,
thread.join()
result = result and thread.result
if not thread.result:
- THREAD_TERMINATE_FLAG = True
+ [thd.stop() for thd in threads]
if result:
Utility.console_log("Success: " + one_case["name"], color="green")