import re
import threading
import copy
-import sys
import functools
+# python2 and python3 queue package name is different
+try:
+ import Queue as _queue
+except ImportError:
+ import queue as _queue
+
import serial
from serial.tools import list_ports
import Utility
-if sys.version_info[0] == 2:
- import Queue as _queue
-else:
- import queue as _queue
-
class ExpectTimeout(ValueError):
""" timeout for expect method """
self.flush_data()
-class _RecvThread(threading.Thread):
+class RecvThread(threading.Thread):
- PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
+ CHECK_FUNCTIONS = []
+ """ DUT subclass can define a few check functions to process received data. """
def __init__(self, read, data_cache, recorded_data, record_data_lock):
- super(_RecvThread, self).__init__()
+ super(RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
self.data_cache = data_cache
self.recorded_data = recorded_data
self.record_data_lock = record_data_lock
- # cache the last line of recv data for collecting performance
self._line_cache = str()
- def collect_performance(self, data):
- """ collect performance """
- if data:
- decoded_data = _decode_data(data)
-
- matches = self.PERFORMANCE_PATTERN.findall(self._line_cache + decoded_data)
- for match in matches:
- Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
- color="orange")
-
- # cache incomplete line to later process
- lines = decoded_data.splitlines(True)
- last_line = lines[-1]
-
- if last_line[-1] != "\n":
- if len(lines) == 1:
- # only one line and the line is not finished, then append this to cache
- self._line_cache += lines[-1]
- else:
- # more than one line and not finished, replace line cache
- self._line_cache = lines[-1]
+ def _line_completion(self, data):
+ """
+ Usually check functions requires to check for one complete line.
+ This method will do line completion for the first line, and strip incomplete last line.
+ """
+ ret = self._line_cache
+ decoded_data = _decode_data(data)
+
+ # cache incomplete line to later process
+ lines = decoded_data.splitlines(True)
+ last_line = lines[-1]
+
+ if last_line[-1] != "\n":
+ if len(lines) == 1:
+ # only one line and the line is not finished, then append this to cache
+ self._line_cache += lines[-1]
+ ret = str()
else:
- # line finishes, flush cache
- self._line_cache = str()
+ # more than one line and not finished, replace line cache
+ self._line_cache = lines[-1]
+ ret += "".join(lines[:-1])
+ else:
+ # line finishes, flush cache
+ self._line_cache = str()
+ ret += decoded_data
+ return ret
def run(self):
while not self.exit_event.isSet():
- data = self.read(1000)
- if data:
+ raw_data = self.read(1000)
+ if raw_data:
with self.record_data_lock:
- self.data_cache.put(data)
+ self.data_cache.put(raw_data)
for capture_id in self.recorded_data:
- self.recorded_data[capture_id].put(data)
- self.collect_performance(data)
+ self.recorded_data[capture_id].put(raw_data)
+
+ # we need to do line completion before call check functions
+ comp_data = self._line_completion(raw_data)
+ for check_function in self.CHECK_FUNCTIONS:
+ check_function(self, comp_data)
def exit(self):
self.exit_event.set()
DEFAULT_EXPECT_TIMEOUT = 10
MAX_EXPECT_FAILURES_TO_SAVED = 10
-
+ RECV_THREAD_CLS = RecvThread
+ """ DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data.
+ For example, DUT can implement exception detect & analysis logic in receive thread subclass. """
LOG_THREAD = _LogThread()
LOG_THREAD.start()
:return: None
"""
- self.receive_thread = _RecvThread(self._port_read, self.data_cache,
- self.recorded_data, self.record_data_lock)
+ self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self.data_cache,
+ self.recorded_data, self.record_data_lock)
self.receive_thread.start()
def stop_receive(self):
if isinstance(data, type(u'')):
try:
data = data.encode('utf-8')
- except Exception:
+ except Exception as e:
print(u'Cannot encode {} of type {}'.format(data, type(data)))
- raise
+ raise e
return data
def write(self, data, eol="\r\n", flush=True):
self.lock = threading.RLock()
@_synced
- def get_dut(self, dut_name, app_path, dut_class=None, app_class=None):
+ def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, **dut_init_args):
"""
get_dut(dut_name, app_path, dut_class=None, app_class=None)
:param app_path: application path, app instance will use this path to process application info
:param dut_class: dut class, if not specified will use default dut class of env
:param app_class: app class, if not specified will use default app of env
+ :keyword dut_init_args: extra kwargs used when creating DUT instance
:return: dut instance
"""
if dut_name in self.allocated_duts:
dut_config = self.get_variable(dut_name + "_port_config")
except ValueError:
dut_config = dict()
+ dut_config.update(dut_init_args)
dut = self.default_dut_cls(dut_name, port,
os.path.join(self.log_path, dut_name + ".log"),
app_inst,
close all DUTs of the Env.
:param dut_debug: if dut_debug is True, then print all dut expect failures before close it
- :return: None
+ :return: exceptions during close DUT
"""
+ dut_close_errors = []
for dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
if dut_debug:
dut.print_debug_info()
- dut.close()
+ try:
+ dut.close()
+ except Exception as e:
+ dut_close_errors.append(e)
self.allocated_duts = dict()
+ return dut_close_errors
import functools
import tempfile
+# python2 and python3 queue package name is different
+try:
+ import Queue as _queue
+except ImportError:
+ import queue as _queue
+
+
from serial.tools import list_ports
import DUT
+import Utility
try:
import esptool
pass
+class IDFDUTException(RuntimeError):
+ pass
+
+
+class IDFRecvThread(DUT.RecvThread):
+
+ PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
+ EXCEPTION_PATTERNS = [
+ re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
+ re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"),
+ re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
+ ]
+ BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
+
+ def __init__(self, read, data_cache, recorded_data, record_data_lock):
+ super(IDFRecvThread, self).__init__(read, data_cache, recorded_data, record_data_lock)
+ self.exceptions = _queue.Queue()
+
+ def collect_performance(self, comp_data):
+ matches = self.PERFORMANCE_PATTERN.findall(comp_data)
+ for match in matches:
+ Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
+ color="orange")
+
+ def detect_exception(self, comp_data):
+ for pattern in self.EXCEPTION_PATTERNS:
+ start = 0
+ while True:
+ match = pattern.search(comp_data, pos=start)
+ if match:
+ start = match.end()
+ self.exceptions.put(match.group(0))
+ Utility.console_log("[Exception]: {}".format(match.group(0)), color="red")
+ else:
+ break
+
+ def detect_backtrace(self, comp_data):
+ # TODO: to support auto parse backtrace
+ start = 0
+ while True:
+ match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
+ if match:
+ start = match.end()
+ Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
+ else:
+ break
+
+ CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
+
+
def _uses_esptool(func):
""" Suspend listener thread, connect with esptool,
call target function with esptool instance,
INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
# if need to erase NVS partition in start app
ERASE_NVS = True
+ RECV_THREAD_CLS = IDFRecvThread
- def __init__(self, name, port, log_file, app, **kwargs):
+ def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
+ self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
+ self.allow_dut_exception = allow_dut_exception
+ self.exceptions = _queue.Queue()
@classmethod
def get_mac(cls, app, port):
return [port_hint] + ports
return ports
+
+ def stop_receive(self):
+ if self.receive_thread:
+ while True:
+ try:
+ self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
+ except _queue.Empty:
+ break
+ super(IDFDUT, self).stop_receive()
+
+ def get_exceptions(self):
+ """ Get exceptions detected by DUT receive thread. """
+ if self.receive_thread:
+ while True:
+ try:
+ self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
+ except _queue.Empty:
+ break
+ exceptions = []
+ while True:
+ try:
+ exceptions.append(self.exceptions.get(timeout=0))
+ except _queue.Empty:
+ break
+ return exceptions
+
+ def close(self):
+ super(IDFDUT, self).close()
+ if not self.allow_dut_exception and self.get_exceptions():
+ raise IDFDUTException()
# log failure
junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
finally:
+ # do close all DUTs, if result is False then print DUT debug info
+ close_errors = env_inst.close(dut_debug=(not result))
+ # We have a hook in DUT close, allow DUT to raise error to fail test case.
+ # For example, we don't allow DUT exception (reset) during test execution.
+ # We don't want to implement in exception detection in test function logic,
+ # as we need to add it to every test case.
+ # We can implement it in DUT receive thread,
+ # and raise exception in DUT close to fail test case if reset detected.
+ if close_errors:
+ for error in close_errors:
+ junit_test_case.add_failure_info(str(error))
+ result = False
if not case_info["junit_report_by_case"]:
JunitReport.test_case_finish(junit_test_case)
- # do close all DUTs, if result is False then print DUT debug info
- env_inst.close(dut_debug=(not result))
# end case and output result
JunitReport.output_report(junit_file_path)