break
return ret
- def get_data(self, timeout=0):
+ def get_data(self, timeout=0.0):
"""
get a copy of data from cache.
"""
DEFAULT_EXPECT_TIMEOUT = 5
+ MAX_EXPECT_FAILURES_TO_SAVED = 10
def __init__(self, name, port, log_file, app, **kwargs):
self.app = app
self.data_cache = _DataCache()
self.receive_thread = None
+ self.expect_failures = []
# open and start during init
self.open()
def __str__(self):
return "DUT({}: {})".format(self.name, str(self.port))
+ def _save_expect_failure(self, pattern, data, start_time):
+ self.expect_failures.insert(0, {"pattern": pattern, "data": data,
+ "start": start_time, "end": time.time()})
+ self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
+
# define for methods need to be overwritten by Port
@classmethod
def list_available_ports(cls):
data = self.data_cache.get_data(time.time() + timeout - start_time)
if ret is None:
- raise ExpectTimeout(self.name + ": " + _pattern_to_string(pattern))
+ pattern = _pattern_to_string(pattern)
+ self._save_expect_failure(pattern, data, start_time)
+ raise ExpectTimeout(self.name + ": " + pattern)
return ret
def _expect_multi(self, expect_all, expect_item_list, timeout):
# flush already matched data
self.data_cache.flush(slice_index)
else:
- raise ExpectTimeout(self.name + ": " + str([_pattern_to_string(x) for x in expect_items]))
+ pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
+ self._save_expect_failure(pattern, data, start_time)
+ raise ExpectTimeout(self.name + ": " + pattern)
@_expect_lock
def expect_any(self, *expect_items, **timeout):
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
return self._expect_multi(True, expect_items, **timeout)
+ @staticmethod
+ def _format_ts(ts):
+ return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
+
+ def print_debug_info(self):
+ """
+ Print debug info of current DUT. Currently we will print debug info for expect failures.
+ """
+ Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
+
+ for failure in self.expect_failures:
+ Utility.console_log("\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
+ .format(failure["pattern"], failure["data"],
+ self._format_ts(failure["start"]), self._format_ts(failure["end"])),
+ color="orange")
+
class SerialDUT(BaseDUT):
""" serial with logging received data feature """
self.serial_configs.update(kwargs)
super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
- @staticmethod
- def _format_data(data):
+ def _format_data(self, data):
"""
format data for logging. do decode and add timestamp.
:param data: raw data from read
:return: formatted data (str)
"""
- timestamp = time.time()
- timestamp = "[{}:{}]".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
- str(timestamp % 1)[2:5])
+ timestamp = "[{}]".format(self._format_ts(time.time()))
formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
return formatted_data
xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
XUNIT_FILE_NAME)
XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
+ result = False
try:
Utility.console_log("starting running test: " + test_func.__name__, color="green")
# execute test function
except Exception as e:
# handle all the exceptions here
traceback.print_exc()
- result = False
# log failure
XUNIT_RECEIVER.failure(str(e), test_func_file_name)
finally:
- # do close all DUTs
- env_inst.close()
+ # do close all DUTs, if result is False then print DUT debug info
+ env_inst.close(dut_debug=(not result))
# end case and output result
XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
with open(xunit_file, "ab+") as f: