]> granicus.if.org Git - esp-idf/commitdiff
tiny-test-fw: support detect exception in IDFDUT
authorHe Yin Ling <heyinling@espressif.com>
Sat, 16 Mar 2019 12:07:52 +0000 (20:07 +0800)
committerHe Yin Ling <heyinling@espressif.com>
Tue, 19 Mar 2019 03:24:08 +0000 (11:24 +0800)
tools/tiny-test-fw/DUT.py
tools/tiny-test-fw/Env.py
tools/tiny-test-fw/IDF/IDFDUT.py
tools/tiny-test-fw/TinyFW.py

index c3d493753e059524013605fed99b5a58482052fc..860e1e4a2a7350a359ea54b56bf3b1e23f175c3a 100644 (file)
@@ -42,19 +42,19 @@ import time
 import re
 import threading
 import copy
-import sys
 import functools
 
+# python2 and python3 queue package name is different
+try:
+    import Queue as _queue
+except ImportError:
+    import queue as _queue
+
 import serial
 from serial.tools import list_ports
 
 import Utility
 
-if sys.version_info[0] == 2:
-    import Queue as _queue
-else:
-    import queue as _queue
-
 
 class ExpectTimeout(ValueError):
     """ timeout for expect method """
@@ -201,55 +201,61 @@ class _LogThread(threading.Thread, _queue.Queue):
             self.flush_data()
 
 
-class _RecvThread(threading.Thread):
+class RecvThread(threading.Thread):
 
-    PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
+    CHECK_FUNCTIONS = []
+    """ DUT subclass can define a few check functions to process received data. """
 
     def __init__(self, read, data_cache, recorded_data, record_data_lock):
-        super(_RecvThread, self).__init__()
+        super(RecvThread, self).__init__()
         self.exit_event = threading.Event()
         self.setDaemon(True)
         self.read = read
         self.data_cache = data_cache
         self.recorded_data = recorded_data
         self.record_data_lock = record_data_lock
-        # cache the last line of recv data for collecting performance
         self._line_cache = str()
 
-    def collect_performance(self, data):
-        """ collect performance """
-        if data:
-            decoded_data = _decode_data(data)
-
-            matches = self.PERFORMANCE_PATTERN.findall(self._line_cache + decoded_data)
-            for match in matches:
-                Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
-                                    color="orange")
-
-            # cache incomplete line to later process
-            lines = decoded_data.splitlines(True)
-            last_line = lines[-1]
-
-            if last_line[-1] != "\n":
-                if len(lines) == 1:
-                    # only one line and the line is not finished, then append this to cache
-                    self._line_cache += lines[-1]
-                else:
-                    # more than one line and not finished, replace line cache
-                    self._line_cache = lines[-1]
+    def _line_completion(self, data):
+        """
+        Usually check functions requires to check for one complete line.
+        This method will do line completion for the first line, and strip incomplete last line.
+        """
+        ret = self._line_cache
+        decoded_data = _decode_data(data)
+
+        # cache incomplete line to later process
+        lines = decoded_data.splitlines(True)
+        last_line = lines[-1]
+
+        if last_line[-1] != "\n":
+            if len(lines) == 1:
+                # only one line and the line is not finished, then append this to cache
+                self._line_cache += lines[-1]
+                ret = str()
             else:
-                # line finishes, flush cache
-                self._line_cache = str()
+                # more than one line and not finished, replace line cache
+                self._line_cache = lines[-1]
+                ret += "".join(lines[:-1])
+        else:
+            # line finishes, flush cache
+            self._line_cache = str()
+            ret += decoded_data
+        return ret
 
     def run(self):
         while not self.exit_event.isSet():
-            data = self.read(1000)
-            if data:
+            raw_data = self.read(1000)
+            if raw_data:
                 with self.record_data_lock:
-                    self.data_cache.put(data)
+                    self.data_cache.put(raw_data)
                     for capture_id in self.recorded_data:
-                        self.recorded_data[capture_id].put(data)
-                self.collect_performance(data)
+                        self.recorded_data[capture_id].put(raw_data)
+
+                # we need to do line completion before call check functions
+                comp_data = self._line_completion(raw_data)
+                for check_function in self.CHECK_FUNCTIONS:
+                    check_function(self, comp_data)
 
     def exit(self):
         self.exit_event.set()
@@ -267,7 +273,9 @@ class BaseDUT(object):
 
     DEFAULT_EXPECT_TIMEOUT = 10
     MAX_EXPECT_FAILURES_TO_SAVED = 10
-
+    RECV_THREAD_CLS = RecvThread
+    """ DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data.
+    For example, DUT can implement exception detect & analysis logic in receive thread subclass. """
     LOG_THREAD = _LogThread()
     LOG_THREAD.start()
 
@@ -398,8 +406,8 @@ class BaseDUT(object):
 
         :return: None
         """
-        self.receive_thread = _RecvThread(self._port_read, self.data_cache,
-                                          self.recorded_data, self.record_data_lock)
+        self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self.data_cache,
+                                                   self.recorded_data, self.record_data_lock)
         self.receive_thread.start()
 
     def stop_receive(self):
@@ -429,9 +437,9 @@ class BaseDUT(object):
         if isinstance(data, type(u'')):
             try:
                 data = data.encode('utf-8')
-            except Exception:
+            except Exception as e:
                 print(u'Cannot encode {} of type {}'.format(data, type(data)))
-                raise
+                raise e
         return data
 
     def write(self, data, eol="\r\n", flush=True):
index b18df22737cfa5c46a2c0278c251dc9ce76db232..3622ba3824d9f61e1c95632a9fac78ebc35efcdb 100644 (file)
@@ -62,7 +62,7 @@ class Env(object):
         self.lock = threading.RLock()
 
     @_synced
-    def get_dut(self, dut_name, app_path, dut_class=None, app_class=None):
+    def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, **dut_init_args):
         """
         get_dut(dut_name, app_path, dut_class=None, app_class=None)
 
@@ -70,6 +70,7 @@ class Env(object):
         :param app_path: application path, app instance will use this path to process application info
         :param dut_class: dut class, if not specified will use default dut class of env
         :param app_class: app class, if not specified will use default app of env
+        :keyword dut_init_args: extra kwargs used when creating DUT instance
         :return: dut instance
         """
         if dut_name in self.allocated_duts:
@@ -97,6 +98,7 @@ class Env(object):
                     dut_config = self.get_variable(dut_name + "_port_config")
                 except ValueError:
                     dut_config = dict()
+                dut_config.update(dut_init_args)
                 dut = self.default_dut_cls(dut_name, port,
                                            os.path.join(self.log_path, dut_name + ".log"),
                                            app_inst,
@@ -168,11 +170,16 @@ class Env(object):
         close all DUTs of the Env.
 
         :param dut_debug: if dut_debug is True, then print all dut expect failures before close it
-        :return: None
+        :return: exceptions during close DUT
         """
+        dut_close_errors = []
         for dut_name in self.allocated_duts:
             dut = self.allocated_duts[dut_name]["dut"]
             if dut_debug:
                 dut.print_debug_info()
-            dut.close()
+            try:
+                dut.close()
+            except Exception as e:
+                dut_close_errors.append(e)
         self.allocated_duts = dict()
+        return dut_close_errors
index 551f29030e8e5b65465097a3940806b4d39153bf..73314979bc883e66921cac765db0385ddf221cf8 100644 (file)
@@ -20,9 +20,17 @@ import re
 import functools
 import tempfile
 
+# python2 and python3 queue package name is different
+try:
+    import Queue as _queue
+except ImportError:
+    import queue as _queue
+
+
 from serial.tools import list_ports
 
 import DUT
+import Utility
 
 try:
     import esptool
@@ -38,6 +46,56 @@ class IDFToolError(OSError):
     pass
 
 
+class IDFDUTException(RuntimeError):
+    pass
+
+
+class IDFRecvThread(DUT.RecvThread):
+
+    PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
+    EXCEPTION_PATTERNS = [
+        re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
+        re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"),
+        re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
+    ]
+    BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
+
+    def __init__(self, read, data_cache, recorded_data, record_data_lock):
+        super(IDFRecvThread, self).__init__(read, data_cache, recorded_data, record_data_lock)
+        self.exceptions = _queue.Queue()
+
+    def collect_performance(self, comp_data):
+        matches = self.PERFORMANCE_PATTERN.findall(comp_data)
+        for match in matches:
+            Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
+                                color="orange")
+
+    def detect_exception(self, comp_data):
+        for pattern in self.EXCEPTION_PATTERNS:
+            start = 0
+            while True:
+                match = pattern.search(comp_data, pos=start)
+                if match:
+                    start = match.end()
+                    self.exceptions.put(match.group(0))
+                    Utility.console_log("[Exception]: {}".format(match.group(0)), color="red")
+                else:
+                    break
+
+    def detect_backtrace(self, comp_data):
+        # TODO: to support auto parse backtrace
+        start = 0
+        while True:
+            match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
+            if match:
+                start = match.end()
+                Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
+            else:
+                break
+
+    CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
+
+
 def _uses_esptool(func):
     """ Suspend listener thread, connect with esptool,
     call target function with esptool instance,
@@ -78,9 +136,13 @@ class IDFDUT(DUT.SerialDUT):
     INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
     # if need to erase NVS partition in start app
     ERASE_NVS = True
+    RECV_THREAD_CLS = IDFRecvThread
 
-    def __init__(self, name, port, log_file, app, **kwargs):
+    def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
+        self.download_config, self.partition_table = app.process_app_info()
         super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
+        self.allow_dut_exception = allow_dut_exception
+        self.exceptions = _queue.Queue()
 
     @classmethod
     def get_mac(cls, app, port):
@@ -252,3 +314,33 @@ class IDFDUT(DUT.SerialDUT):
                 return [port_hint] + ports
 
         return ports
+
+    def stop_receive(self):
+        if self.receive_thread:
+            while True:
+                try:
+                    self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
+                except _queue.Empty:
+                    break
+        super(IDFDUT, self).stop_receive()
+
+    def get_exceptions(self):
+        """ Get exceptions detected by DUT receive thread. """
+        if self.receive_thread:
+            while True:
+                try:
+                    self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
+                except _queue.Empty:
+                    break
+        exceptions = []
+        while True:
+            try:
+                exceptions.append(self.exceptions.get(timeout=0))
+            except _queue.Empty:
+                break
+        return exceptions
+
+    def close(self):
+        super(IDFDUT, self).close()
+        if not self.allow_dut_exception and self.get_exceptions():
+            raise IDFDUTException()
index e9f9289d30fd7e87bc18175f6a4eef2dcc987a17..219851535b09bbfb452336d3849fd7ab690b89be 100644 (file)
@@ -184,10 +184,20 @@ def test_method(**kwargs):
                 # log failure
                 junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
             finally:
+                # do close all DUTs, if result is False then print DUT debug info
+                close_errors = env_inst.close(dut_debug=(not result))
+                # We have a hook in DUT close, allow DUT to raise error to fail test case.
+                # For example, we don't allow DUT exception (reset) during test execution.
+                # We don't want to implement in exception detection in test function logic,
+                # as we need to add it to every test case.
+                # We can implement it in DUT receive thread,
+                # and raise exception in DUT close to fail test case if reset detected.
+                if close_errors:
+                    for error in close_errors:
+                        junit_test_case.add_failure_info(str(error))
+                    result = False
                 if not case_info["junit_report_by_case"]:
                     JunitReport.test_case_finish(junit_test_case)
-                # do close all DUTs, if result is False then print DUT debug info
-                env_inst.close(dut_debug=(not result))
 
             # end case and output result
             JunitReport.output_report(junit_file_path)