CHECK_FUNCTIONS = []
""" DUT subclass can define a few check functions to process received data. """
- def __init__(self, read, data_cache, recorded_data, record_data_lock):
+ def __init__(self, read, dut):
super(RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
- self.data_cache = data_cache
- self.recorded_data = recorded_data
- self.record_data_lock = record_data_lock
+ self.dut = dut
+ self.data_cache = dut.data_cache
+ self.recorded_data = dut.recorded_data
+ self.record_data_lock = dut.record_data_lock
self._line_cache = str()
def _line_completion(self, data):
:return: None
"""
- self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self.data_cache,
- self.recorded_data, self.record_data_lock)
+ self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self)
self.receive_thread.start()
def stop_receive(self):
super(IDFApp, self).__init__(app_path)
self.idf_path = self.get_sdk_path()
self.binary_path = self.get_binary_path(app_path)
+ self.elf_file = self._get_elf_file_path(self.binary_path)
assert os.path.exists(self.binary_path)
if self.IDF_DOWNLOAD_CONFIG_FILE not in os.listdir(self.binary_path):
if self.IDF_FLASH_ARGS_FILE not in os.listdir(self.binary_path):
"""
pass
+ @staticmethod
+ def _get_elf_file_path(binary_path):
+ ret = ""
+ file_names = os.listdir(binary_path)
+ for fn in file_names:
+ if os.path.splitext(fn)[1] == ".elf":
+ ret = os.path.join(binary_path, fn)
+ return ret
+
def _parse_flash_download_config(self):
"""
Parse flash download config from build metadata files
import re
import functools
import tempfile
+import subprocess
# python2 and python3 queue package name is different
try:
re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
]
BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
+ BACKTRACE_ADDRESS_PATTERN = re.compile(r"(0x[0-9a-f]{8}):0x[0-9a-f]{8}")
- def __init__(self, read, data_cache, recorded_data, record_data_lock):
- super(IDFRecvThread, self).__init__(read, data_cache, recorded_data, record_data_lock)
+ def __init__(self, read, dut):
+ super(IDFRecvThread, self).__init__(read, dut)
self.exceptions = _queue.Queue()
def collect_performance(self, comp_data):
break
def detect_backtrace(self, comp_data):
- # TODO: to support auto parse backtrace
start = 0
while True:
match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
if match:
start = match.end()
Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
+ # translate backtrace
+ addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
+ translated_backtrace = ""
+ for addr in addresses:
+ ret = self.dut.lookup_pc_address(addr)
+ if ret:
+ translated_backtrace += ret + "\n"
+ if translated_backtrace:
+ Utility.console_log("Translated backtrace\n:" + translated_backtrace, color="yellow")
+ else:
+ Utility.console_log("Failed to translate backtrace", color="yellow")
else:
break
# if need to erase NVS partition in start app
ERASE_NVS = True
RECV_THREAD_CLS = IDFRecvThread
+ TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
- self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
self.allow_dut_exception = allow_dut_exception
self.exceptions = _queue.Queue()
return ports
+ def lookup_pc_address(self, pc_addr):
+ cmd = ["%saddr2line" % self.TOOLCHAIN_PREFIX,
+ "-pfiaC", "-e", self.app.elf_file, pc_addr]
+ ret = ""
+ try:
+ translation = subprocess.check_output(cmd)
+ ret = translation.decode()
+ except OSError:
+ pass
+ return ret
+
def stop_receive(self):
if self.receive_thread:
while True:
def close(self):
super(IDFDUT, self).close()
if not self.allow_dut_exception and self.get_exceptions():
+ Utility.console_log("DUT exception detected on {}".format(self), color="red")
raise IDFDUTException()