]> granicus.if.org Git - esp-idf/commitdiff
tiny-test-fw: backport tiny-test-fw from v3.1 to v3.0
authorHe Yin Ling <heyinling@espressif.com>
Mon, 3 Dec 2018 02:58:37 +0000 (10:58 +0800)
committerHe Yin Ling <heyinling@espressif.com>
Thu, 6 Dec 2018 12:28:53 +0000 (20:28 +0800)
17 files changed:
tools/tiny-test-fw/App.py
tools/tiny-test-fw/CIAssignExampleTest.py
tools/tiny-test-fw/CIAssignUnitTest.py [new file with mode: 0644]
tools/tiny-test-fw/DUT.py
tools/tiny-test-fw/Env.py
tools/tiny-test-fw/EnvConfig.py
tools/tiny-test-fw/IDF/IDFApp.py
tools/tiny-test-fw/IDF/IDFDUT.py
tools/tiny-test-fw/IDF/__init__.py
tools/tiny-test-fw/Runner.py
tools/tiny-test-fw/TinyFW.py
tools/tiny-test-fw/Utility/CIAssignTest.py [new file with mode: 0644]
tools/tiny-test-fw/Utility/CaseConfig.py
tools/tiny-test-fw/Utility/GitlabCIJob.py
tools/tiny-test-fw/docs/index.rst
tools/tiny-test-fw/example.py
tools/tiny-test-fw/requirements.txt [new file with mode: 0644]

index 84e8716a171890d36aafd8dca89af8f85a71a767..1dbadf85afd041e297c1494870ce93831873bc93 100644 (file)
@@ -78,9 +78,12 @@ class BaseApp(object):
         if not test_suite_name:
             test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
         sdk_path = cls.get_sdk_path()
-        return os.path.join(sdk_path, "TEST_LOGS",
-                            test_suite_name +
-                            time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
+        log_folder = os.path.join(sdk_path, "TEST_LOGS",
+                                  test_suite_name +
+                                  time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
+        if not os.path.exists(log_folder):
+            os.makedirs(log_folder)
+        return log_folder
 
     def process_app_info(self):
         """
index 1cd36131310aa65e9dd0916167a0ae2d8307d931..3d9df0360b295bcca9f9f41bf7b692d68381c625 100644 (file)
@@ -22,147 +22,20 @@ import sys
 import re
 import argparse
 
-import yaml
-
 test_fw_path = os.getenv("TEST_FW_PATH")
 if test_fw_path:
     sys.path.insert(0, test_fw_path)
 
-from Utility import CaseConfig, SearchCases, GitlabCIJob
-
-
-class Group(object):
-
-    MAX_EXECUTION_TIME = 30
-    MAX_CASE = 15
-    SORT_KEYS = ["env_tag"]
-
-    def __init__(self, case):
-        self.execution_time = 0
-        self.case_list = [case]
-        self.filters = dict(zip(self.SORT_KEYS, [case.case_info[x] for x in self.SORT_KEYS]))
-
-    def accept_new_case(self):
-        """
-        check if allowed to add any case to this group
-
-        :return: True or False
-        """
-        max_time = (sum([x.case_info["execution_time"] for x in self.case_list]) < self.MAX_EXECUTION_TIME)
-        max_case = (len(self.case_list) < self.MAX_CASE)
-        return max_time and max_case
-
-    def add_case(self, case):
-        """
-        add case to current group
-
-        :param case: test case
-        :return: True if add succeed, else False
-        """
-        added = False
-        if self.accept_new_case():
-            for key in self.filters:
-                if case.case_info[key] != self.filters[key]:
-                    break
-            else:
-                self.case_list.append(case)
-                added = True
-        return added
-
-    def output(self):
-        """
-        output data for job configs
-
-        :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
-        """
-        output_data = {
-            "Filter": self.filters,
-            "CaseConfig": [{"name": x.case_info["name"]} for x in self.case_list],
-        }
-        return output_data
+from Utility.CIAssignTest import AssignTest, Group
 
 
-class AssignTest(object):
-    """
-    Auto assign tests to CI jobs.
+class ExampleGroup(Group):
+    SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "chip"]
 
-    :param test_case: path of test case file(s)
-    :param ci_config_file: path of ``.gitlab-ci.yml``
-    """
 
+class CIExampleAssignTest(AssignTest):
     CI_TEST_JOB_PATTERN = re.compile(r"^example_test_.+")
 
-    def __init__(self, test_case, ci_config_file):
-        self.test_cases = self._search_cases(test_case)
-        self.jobs = self._parse_gitlab_ci_config(ci_config_file)
-
-    def _parse_gitlab_ci_config(self, ci_config_file):
-
-        with open(ci_config_file, "r") as f:
-            ci_config = yaml.load(f)
-
-        job_list = list()
-        for job_name in ci_config:
-            if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
-                job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
-        return job_list
-
-    @staticmethod
-    def _search_cases(test_case, case_filter=None):
-        """
-        :param test_case: path contains test case folder
-        :param case_filter: filter for test cases
-        :return: filtered test case list
-        """
-        test_methods = SearchCases.Search.search_test_cases(test_case)
-        return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
-
-    def _group_cases(self):
-        """
-        separate all cases into groups according group rules. each group will be executed by one CI job.
-
-        :return: test case groups.
-        """
-        groups = []
-        for case in self.test_cases:
-            for group in groups:
-                # add to current group
-                if group.add_case(case):
-                    break
-            else:
-                # create new group
-                groups.append(Group(case))
-        return groups
-
-    def assign_cases(self):
-        """
-        separate test cases to groups and assign test cases to CI jobs.
-
-        :raise AssertError: if failed to assign any case to CI job.
-        :return: None
-        """
-        failed_to_assign = []
-        test_groups = self._group_cases()
-        for group in test_groups:
-            for job in self.jobs:
-                if job.match_group(group):
-                    job.assign_group(group)
-                    break
-            else:
-                failed_to_assign.append(group)
-        assert not failed_to_assign
-
-    def output_configs(self, output_path):
-        """
-
-        :param output_path: path to output config files for each CI job
-        :return: None
-        """
-        if not os.path.exists(output_path):
-            os.makedirs(output_path)
-        for job in self.jobs:
-            job.output_config(output_path)
-
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser()
@@ -174,6 +47,6 @@ if __name__ == '__main__':
                         help="output path of config files")
     args = parser.parse_args()
 
-    assign_test = AssignTest(args.test_case, args.ci_config_file)
+    assign_test = CIExampleAssignTest(args.test_case, args.ci_config_file, case_group=ExampleGroup)
     assign_test.assign_cases()
     assign_test.output_configs(args.output_path)
diff --git a/tools/tiny-test-fw/CIAssignUnitTest.py b/tools/tiny-test-fw/CIAssignUnitTest.py
new file mode 100644 (file)
index 0000000..a77f294
--- /dev/null
@@ -0,0 +1,153 @@
+"""
+Command line tool to assign unit tests to CI test jobs.
+"""
+
+import re
+import os
+import sys
+import argparse
+
+import yaml
+
+test_fw_path = os.getenv("TEST_FW_PATH")
+if test_fw_path:
+    sys.path.insert(0, test_fw_path)
+
+from Utility import CIAssignTest
+
+
+class Group(CIAssignTest.Group):
+    SORT_KEYS = ["config", "SDK", "test environment", "multi_device", "multi_stage", "tags"]
+    MAX_CASE = 30
+    ATTR_CONVERT_TABLE = {
+        "execution_time": "execution time"
+    }
+    # when IDF support multiple chips, SDK will be moved into tags, we can remove it
+    CI_JOB_MATCH_KEYS = ["test environment", "SDK"]
+
+    def __init__(self, case):
+        super(Group, self).__init__(case)
+        for tag in self._get_case_attr(case, "tags"):
+            self.ci_job_match_keys.add(tag)
+
+    @staticmethod
+    def _get_case_attr(case, attr):
+        if attr in Group.ATTR_CONVERT_TABLE:
+            attr = Group.ATTR_CONVERT_TABLE[attr]
+        return case[attr]
+
+    def _create_extra_data(self, test_function):
+        """
+        For unit test case, we need to copy some attributes of test cases into config file.
+        So unit test function knows how to run the case.
+        """
+        case_data = []
+        for case in self.case_list:
+            one_case_data = {
+                "config": self._get_case_attr(case, "config"),
+                "name": self._get_case_attr(case, "summary"),
+                "reset": self._get_case_attr(case, "reset"),
+                "timeout": self._get_case_attr(case, "timeout"),
+            }
+
+            if test_function in ["run_multiple_devices_cases", "run_multiple_stage_cases"]:
+                try:
+                    one_case_data["child case num"] = self._get_case_attr(case, "child case num")
+                except KeyError as e:
+                    print("multiple devices/stages cases must contains at least two test functions")
+                    print("case name: {}".format(one_case_data["name"]))
+                    raise e
+
+            case_data.append(one_case_data)
+        return case_data
+
+    def _map_test_function(self):
+        """
+        determine which test function to use according to current test case
+
+        :return: test function name to use
+        """
+        if self.filters["multi_device"] == "Yes":
+            test_function = "run_multiple_devices_cases"
+        elif self.filters["multi_stage"] == "Yes":
+            test_function = "run_multiple_stage_cases"
+        else:
+            test_function = "run_unit_test_cases"
+        return test_function
+
+    def output(self):
+        """
+        output data for job configs
+
+        :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
+        """
+        test_function = self._map_test_function()
+        output_data = {
+            # we don't need filter for test function, as UT uses a few test functions for all cases
+            "CaseConfig": [
+                {
+                    "name": test_function,
+                    "extra_data": self._create_extra_data(test_function),
+                }
+            ]
+        }
+        return output_data
+
+
+class UnitTestAssignTest(CIAssignTest.AssignTest):
+    CI_TEST_JOB_PATTERN = re.compile(r"^UT_.+")
+
+    def __init__(self, test_case_path, ci_config_file):
+        CIAssignTest.AssignTest.__init__(self, test_case_path, ci_config_file, case_group=Group)
+
+    def _search_cases(self, test_case_path, case_filter=None):
+        """
+        For unit test case, we don't search for test functions.
+        The unit test cases is stored in a yaml file which is created in job build-idf-test.
+        """
+
+        try:
+            with open(test_case_path, "r") as f:
+                raw_data = yaml.load(f)
+            test_cases = raw_data["test cases"]
+        except IOError:
+            print("Test case path is invalid. Should only happen when use @bot to skip unit test.")
+            test_cases = []
+        # filter keys are lower case. Do map lower case keys with original keys.
+        try:
+            key_mapping = {x.lower(): x for x in test_cases[0].keys()}
+        except IndexError:
+            key_mapping = dict()
+        if case_filter:
+            for key in case_filter:
+                filtered_cases = []
+                for case in test_cases:
+                    try:
+                        mapped_key = key_mapping[key]
+                        # bot converts string to lower case
+                        if isinstance(case[mapped_key], str):
+                            _value = case[mapped_key].lower()
+                        else:
+                            _value = case[mapped_key]
+                        if _value in case_filter[key]:
+                            filtered_cases.append(case)
+                    except KeyError:
+                        # case don't have this key, regard as filter success
+                        filtered_cases.append(case)
+                test_cases = filtered_cases
+        return test_cases
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument("test_case",
+                        help="test case folder or file")
+    parser.add_argument("ci_config_file",
+                        help="gitlab ci config file")
+    parser.add_argument("output_path",
+                        help="output path of config files")
+    args = parser.parse_args()
+
+    assign_test = UnitTestAssignTest(args.test_case, args.ci_config_file)
+    assign_test.assign_cases()
+    assign_test.output_configs(args.output_path)
index 1c6526709b4271a9b0da42a863422a23e307376d..d52f39684ee1477cbc11261df725559676136dfa 100644 (file)
@@ -85,6 +85,14 @@ def _decode_data(data):
     return data
 
 
+def _pattern_to_string(pattern):
+    try:
+        ret = "RegEx: " + pattern.pattern
+    except AttributeError:
+        ret = pattern
+    return ret
+
+
 class _DataCache(_queue.Queue):
     """
     Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
@@ -94,7 +102,22 @@ class _DataCache(_queue.Queue):
         _queue.Queue.__init__(self, maxsize=maxsize)
         self.data_cache = str()
 
-    def get_data(self, timeout=0):
+    def _move_from_queue_to_cache(self):
+        """
+        move all of the available data in the queue to cache
+
+        :return: True if moved any item from queue to data cache, else False
+        """
+        ret = False
+        while True:
+            try:
+                self.data_cache += _decode_data(self.get(0))
+                ret = True
+            except _queue.Empty:
+                break
+        return ret
+
+    def get_data(self, timeout=0.0):
         """
         get a copy of data from cache.
 
@@ -105,12 +128,16 @@ class _DataCache(_queue.Queue):
         if timeout < 0:
             timeout = 0
 
-        try:
-            data = self.get(timeout=timeout)
-            self.data_cache += _decode_data(data)
-        except _queue.Empty:
-            # don't do anything when on update for cache
-            pass
+        ret = self._move_from_queue_to_cache()
+
+        if not ret:
+            # we only wait for new data if we can't provide a new data_cache
+            try:
+                data = self.get(timeout=timeout)
+                self.data_cache += _decode_data(data)
+            except _queue.Empty:
+                # don't do anything when on update for cache
+                pass
         return copy.deepcopy(self.data_cache)
 
     def flush(self, index=0xFFFFFFFF):
@@ -127,16 +154,64 @@ class _DataCache(_queue.Queue):
             self.data_cache = self.data_cache[index:]
 
 
+class _LogThread(threading.Thread, _queue.Queue):
+    """
+    We found some SD card on Raspberry Pi could have very bad performance.
+    It could take seconds to save small amount of data.
+    If the DUT receives data and save it as log, then it stops receiving data until log is saved.
+    This could lead to expect timeout.
+    As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs.
+    Then data will be passed to ``expect`` as soon as received.
+    """
+    def __init__(self):
+        threading.Thread.__init__(self, name="LogThread")
+        _queue.Queue.__init__(self, maxsize=0)
+        self.setDaemon(True)
+        self.flush_lock = threading.Lock()
+
+    def save_log(self, filename, data):
+        """
+        :param filename: log file name
+        :param data: log data. Must be ``bytes``.
+        """
+        self.put({"filename": filename, "data": data})
+
+    def flush_data(self):
+        with self.flush_lock:
+            data_cache = dict()
+            while True:
+                # move all data from queue to data cache
+                try:
+                    log = self.get_nowait()
+                    try:
+                        data_cache[log["filename"]] += log["data"]
+                    except KeyError:
+                        data_cache[log["filename"]] = log["data"]
+                except _queue.Empty:
+                    break
+            # flush data
+            for filename in data_cache:
+                with open(filename, "ab+") as f:
+                    f.write(data_cache[filename])
+
+    def run(self):
+        while True:
+            time.sleep(1)
+            self.flush_data()
+
+
 class _RecvThread(threading.Thread):
 
     PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
 
-    def __init__(self, read, data_cache):
+    def __init__(self, read, data_cache, recorded_data, record_data_lock):
         super(_RecvThread, self).__init__()
         self.exit_event = threading.Event()
         self.setDaemon(True)
         self.read = read
         self.data_cache = data_cache
+        self.recorded_data = recorded_data
+        self.record_data_lock = record_data_lock
         # cache the last line of recv data for collecting performance
         self._line_cache = str()
 
@@ -169,7 +244,10 @@ class _RecvThread(threading.Thread):
         while not self.exit_event.isSet():
             data = self.read(1000)
             if data:
-                self.data_cache.put(data)
+                with self.record_data_lock:
+                    self.data_cache.put(data)
+                    for capture_id in self.recorded_data:
+                        self.recorded_data[capture_id].put(data)
                 self.collect_performance(data)
 
     def exit(self):
@@ -187,6 +265,10 @@ class BaseDUT(object):
     """
 
     DEFAULT_EXPECT_TIMEOUT = 5
+    MAX_EXPECT_FAILURES_TO_SAVED = 10
+
+    LOG_THREAD = _LogThread()
+    LOG_THREAD.start()
 
     def __init__(self, name, port, log_file, app, **kwargs):
 
@@ -196,13 +278,39 @@ class BaseDUT(object):
         self.log_file = log_file
         self.app = app
         self.data_cache = _DataCache()
+        # the main process of recorded data are done in receive thread
+        # but receive thread could be closed in DUT lifetime (tool methods)
+        # so we keep it in BaseDUT, as their life cycle are same
+        self.recorded_data = dict()
+        self.record_data_lock = threading.RLock()
         self.receive_thread = None
+        self.expect_failures = []
         # open and start during init
         self.open()
 
     def __str__(self):
         return "DUT({}: {})".format(self.name, str(self.port))
 
+    def _save_expect_failure(self, pattern, data, start_time):
+        """
+        Save expect failure. If the test fails, then it will print the expect failures.
+        In some cases, user will handle expect exceptions.
+        The expect failures could be false alarm, and test case might generate a lot of such failures.
+        Therefore, we don't print the failure immediately and limit the max size of failure list.
+        """
+        self.expect_failures.insert(0, {"pattern": pattern, "data": data,
+                                        "start": start_time, "end": time.time()})
+        self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
+
+    def _save_dut_log(self, data):
+        """
+        Save DUT log into file using another thread.
+        This is a workaround for some devices takes long time for file system operations.
+
+        See descriptions in ``_LogThread`` for details.
+        """
+        self.LOG_THREAD.save_log(self.log_file, data)
+
     # define for methods need to be overwritten by Port
     @classmethod
     def list_available_ports(cls):
@@ -290,7 +398,8 @@ class BaseDUT(object):
         :return: None
         """
         self._port_open()
-        self.receive_thread = _RecvThread(self._port_read, self.data_cache)
+        self.receive_thread = _RecvThread(self._port_read, self.data_cache,
+                                          self.recorded_data, self.record_data_lock)
         self.receive_thread.start()
 
     def close(self):
@@ -302,6 +411,7 @@ class BaseDUT(object):
         if self.receive_thread:
             self.receive_thread.exit()
         self._port_close()
+        self.LOG_THREAD.flush_data()
 
     def write(self, data, eol="\r\n", flush=True):
         """
@@ -316,7 +426,7 @@ class BaseDUT(object):
         if flush:
             self.data_cache.flush()
         # do write if cache
-        if data:
+        if data is not None:
             self._port_write(data + eol if eol else data)
 
     @_expect_lock
@@ -333,6 +443,42 @@ class BaseDUT(object):
         self.data_cache.flush(size)
         return data
 
+    def start_capture_raw_data(self, capture_id="default"):
+        """
+        Sometime application want to get DUT raw data and use ``expect`` method at the same time.
+        Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method.
+
+        If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID.
+
+        :param capture_id: ID of capture. You can use different IDs to do different captures at the same time.
+        """
+        with self.record_data_lock:
+            try:
+                # if start capture on existed ID, we do flush data and restart capture
+                self.recorded_data[capture_id].flush()
+            except KeyError:
+                # otherwise, create new data cache
+                self.recorded_data[capture_id] = _DataCache()
+
+    def stop_capture_raw_data(self, capture_id="default"):
+        """
+        Stop capture and get raw data.
+        This method should be used after ``start_capture_raw_data`` on the same capture ID.
+
+        :param capture_id: ID of capture.
+        :return: captured raw data between start capture and stop capture.
+        """
+        with self.record_data_lock:
+            try:
+                ret = self.recorded_data[capture_id].get_data()
+                self.recorded_data.pop(capture_id)
+            except KeyError as e:
+                e.message = "capture_id does not exist. " \
+                            "You should call start_capture_raw_data with same ID " \
+                            "before calling stop_capture_raw_data"
+                raise e
+        return ret
+
     # expect related methods
 
     @staticmethod
@@ -410,14 +556,19 @@ class BaseDUT(object):
         start_time = time.time()
         while True:
             ret, index = method(data, pattern)
-            if ret is not None or time.time() - start_time > timeout:
+            if ret is not None:
                 self.data_cache.flush(index)
                 break
+            time_remaining = start_time + timeout - time.time()
+            if time_remaining < 0:
+                break
             # wait for new data from cache
-            data = self.data_cache.get_data(time.time() + timeout - start_time)
+            data = self.data_cache.get_data(time_remaining)
 
         if ret is None:
-            raise ExpectTimeout(self.name + ": " + str(pattern))
+            pattern = _pattern_to_string(pattern)
+            self._save_expect_failure(pattern, data, start_time)
+            raise ExpectTimeout(self.name + ": " + pattern)
         return ret
 
     def _expect_multi(self, expect_all, expect_item_list, timeout):
@@ -457,22 +608,25 @@ class BaseDUT(object):
                     if expect_item["ret"] is not None:
                         # match succeed for one item
                         matched_expect_items.append(expect_item)
-                        break
 
             # if expect all, then all items need to be matched,
             # else only one item need to matched
             if expect_all:
-                match_succeed = (matched_expect_items == expect_items)
+                match_succeed = len(matched_expect_items) == len(expect_items)
             else:
                 match_succeed = True if matched_expect_items else False
 
-            if time.time() - start_time > timeout or match_succeed:
+            time_remaining = start_time + timeout - time.time()
+            if time_remaining < 0 or match_succeed:
                 break
             else:
-                data = self.data_cache.get_data(time.time() + timeout - start_time)
+                data = self.data_cache.get_data(time_remaining)
 
         if match_succeed:
-            # do callback and flush matched data cache
+            # sort matched items according to order of appearance in the input data,
+            # so that the callbacks are invoked in correct order
+            matched_expect_items = sorted(matched_expect_items, key=lambda it: it["index"])
+            # invoke callbacks and flush matched data cache
             slice_index = -1
             for expect_item in matched_expect_items:
                 # trigger callback
@@ -482,7 +636,9 @@ class BaseDUT(object):
             # flush already matched data
             self.data_cache.flush(slice_index)
         else:
-            raise ExpectTimeout(self.name + ": " + str(expect_items))
+            pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
+            self._save_expect_failure(pattern, data, start_time)
+            raise ExpectTimeout(self.name + ": " + pattern)
 
     @_expect_lock
     def expect_any(self, *expect_items, **timeout):
@@ -528,6 +684,22 @@ class BaseDUT(object):
             timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
         return self._expect_multi(True, expect_items, **timeout)
 
+    @staticmethod
+    def _format_ts(ts):
+        return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
+
+    def print_debug_info(self):
+        """
+        Print debug info of current DUT. Currently we will print debug info for expect failures.
+        """
+        Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
+
+        for failure in self.expect_failures:
+            Utility.console_log(u"\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
+                                .format(failure["pattern"], failure["data"],
+                                        self._format_ts(failure["start"]), self._format_ts(failure["end"])),
+                                color="orange")
+
 
 class SerialDUT(BaseDUT):
     """ serial with logging received data feature """
@@ -548,18 +720,15 @@ class SerialDUT(BaseDUT):
         self.serial_configs.update(kwargs)
         super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
 
-    @staticmethod
-    def _format_data(data):
+    def _format_data(self, data):
         """
         format data for logging. do decode and add timestamp.
 
         :param data: raw data from read
         :return: formatted data (str)
         """
-        timestamp = time.time()
-        timestamp = "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
-                                   str(timestamp % 1)[2:5])
-        formatted_data = "[{}]:\r\n{}\r\n".format(timestamp, _decode_data(data))
+        timestamp = "[{}]".format(self._format_ts(time.time()))
+        formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
         return formatted_data
 
     def _port_open(self):
@@ -571,11 +740,12 @@ class SerialDUT(BaseDUT):
     def _port_read(self, size=1):
         data = self.port_inst.read(size)
         if data:
-            with open(self.log_file, "a+") as _log_file:
-                _log_file.write(self._format_data(data))
+            self._save_dut_log(self._format_data(data))
         return data
 
     def _port_write(self, data):
+        if isinstance(data, str):
+            data = data.encode()
         self.port_inst.write(data)
 
     @classmethod
index da5c0b5982008dcee7e5a818d46e1cce4488689a..b18df22737cfa5c46a2c0278c251dc9ce76db232 100644 (file)
@@ -17,6 +17,8 @@ import os
 import threading
 import functools
 
+import netifaces
+
 import EnvConfig
 
 
@@ -47,12 +49,12 @@ class Env(object):
                  dut=None,
                  env_tag=None,
                  env_config_file=None,
-                 test_name=None,
+                 test_suite_name=None,
                  **kwargs):
         self.app_cls = app
         self.default_dut_cls = dut
         self.config = EnvConfig.Config(env_config_file, env_tag)
-        self.log_path = self.app_cls.get_log_folder(test_name)
+        self.log_path = self.app_cls.get_log_folder(test_suite_name)
         if not os.path.exists(self.log_path):
             os.makedirs(self.log_path)
 
@@ -130,27 +132,47 @@ class Env(object):
         """
         return self.config.get_variable(variable_name)
 
+    PROTO_MAP = {
+        "ipv4": netifaces.AF_INET,
+        "ipv6": netifaces.AF_INET6,
+        "mac": netifaces.AF_LINK,
+    }
+
     @_synced
-    def get_pc_nic_info(self, nic_name="pc_nic"):
+    def get_pc_nic_info(self, nic_name="pc_nic", proto="ipv4"):
         """
         get_pc_nic_info(nic_name="pc_nic")
-        try to get nic info (ip address, ipv6 address, mac address)
+        try to get info of a specified NIC and protocol.
 
-        :param nic_name: pc nic name. allows passing variable name, nic name value or omitted (to get default nic info).
-        :return: a dict of address ("ipv4", "ipv6", "mac") if successfully found. otherwise None.
+        :param nic_name: pc nic name. allows passing variable name, nic name value.
+        :param proto: "ipv4", "ipv6" or "mac"
+        :return: a dict of nic info if successfully found. otherwise None.
+                 nic info keys could be different for different protocols.
+                 key "addr" is available for both mac, ipv4 and ipv6 pic info.
         """
-        # TODO: need to implement auto get nic info method
-        return self.config.get_variable("nic_info/" + nic_name)
+        interfaces = netifaces.interfaces()
+        if nic_name in interfaces:
+            # the name is in the interface list, we regard it as NIC name
+            if_addr = netifaces.ifaddresses(nic_name)
+        else:
+            # it's not in interface name list, we assume it's variable name
+            _nic_name = self.get_variable(nic_name)
+            if_addr = netifaces.ifaddresses(_nic_name)
+
+        return if_addr[self.PROTO_MAP[proto]][0]
 
     @_synced
-    def close(self):
+    def close(self, dut_debug=False):
         """
         close()
         close all DUTs of the Env.
 
+        :param dut_debug: if dut_debug is True, then print all dut expect failures before close it
         :return: None
         """
         for dut_name in self.allocated_duts:
             dut = self.allocated_duts[dut_name]["dut"]
+            if dut_debug:
+                dut.print_debug_info()
             dut.close()
         self.allocated_duts = dict()
index 2ce28d811e3ff5055c24d58295dd78c2bfa06488..79de6bd3f896ac7288d879f6cde53862dcf8e916 100644 (file)
@@ -53,7 +53,7 @@ class Config(object):
         try:
             with open(config_file) as f:
                 configs = yaml.load(f)[env_name]
-        except (OSError, TypeError):
+        except (OSError, TypeError, IOError):
             configs = dict()
         return configs
 
index 3828277ed860b42ba1fdf5dbb07d1d861c373f80..4bf667f64bf55711ee869cf9acac3a8e4ffe42cf 100644 (file)
@@ -144,11 +144,28 @@ class Example(IDFApp):
 
 class UT(IDFApp):
     def get_binary_path(self, app_path):
-        if app_path:
-            # specified path, join it and the idf path
-            path = os.path.join(self.idf_path, app_path)
-        else:
-            path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
+        """
+        :param app_path: app path or app config
+        :return: binary path
+        """
+        if not app_path:
+            app_path = "default"
+
+        path = os.path.join(self.idf_path, app_path)
+        if not os.path.exists(path):
+            while True:
+                # try to get by config
+                if app_path == "default":
+                    # it's default config, we first try to get form build folder of unit-test-app
+                    path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
+                    if os.path.exists(path):
+                        # found, use bin in build path
+                        break
+                # ``make ut-build-all-configs`` or ``make ut-build-CONFIG`` will copy binary to output folder
+                path = os.path.join(self.idf_path, "tools", "unit-test-app", "output", app_path)
+                if os.path.exists(path):
+                    break
+                raise OSError("Failed to get unit-test-app binary path")
         return path
 
 
index e2636b5ff88f7536994026c6940b5ad03430f8d2..984eca54464d4ffa245f4de2f433f6eba6c5743b 100644 (file)
@@ -20,6 +20,8 @@ import functools
 import random
 import tempfile
 
+from serial.tools import list_ports
+
 import DUT
 
 
index 4340d07db44f3e55fc3360ccb14734d8c064280a..0e342e84420be15ac884a2488f264d69bef3ec8c 100644 (file)
@@ -45,6 +45,31 @@ def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32", module="examples", e
                               execution_time=execution_time, level=level, **kwargs)
 
 
+def idf_unit_test(app=UT, dut=IDFDUT, chip="ESP32", module="unit-test", execution_time=1,
+                  level="unit", erase_nvs=True, **kwargs):
+    """
+    decorator for testing idf unit tests (with default values for some keyword args).
+
+    :param app: test application class
+    :param dut: dut class
+    :param chip: chip supported, string or tuple
+    :param module: module, string
+    :param execution_time: execution time in minutes, int
+    :param level: test level, could be used to filter test cases, string
+    :param erase_nvs: if need to erase_nvs in DUT.start_app()
+    :param kwargs: other keyword args
+    :return: test method
+    """
+    try:
+        # try to config the default behavior of erase nvs
+        dut.ERASE_NVS = erase_nvs
+    except AttributeError:
+        pass
+
+    return TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
+                              execution_time=execution_time, level=level, **kwargs)
+
+
 def log_performance(item, value):
     """
     do print performance with pre-defined format to console
@@ -52,7 +77,11 @@ def log_performance(item, value):
     :param item: performance item name
     :param value: performance value
     """
-    Utility.console_log("[Performance][{}]: {}".format(item, value), "orange")
+    performance_msg = "[Performance][{}]: {}".format(item, value)
+    Utility.console_log(performance_msg, "orange")
+    # update to junit test report
+    current_junit_case = TinyFW.JunitReport.get_current_test_case()
+    current_junit_case.stdout += performance_msg + "\r\n"
 
 
 def check_performance(item, value):
index 0adf441fe0bf7e626f029275b097c86370a9eb11..ea124c1489e2018132364d221934d6acaccea24b 100644 (file)
@@ -40,18 +40,22 @@ class Runner(threading.Thread):
     def __init__(self, test_case, case_config, env_config_file=None):
         super(Runner, self).__init__()
         self.setDaemon(True)
-        test_methods = SearchCases.Search.search_test_cases(test_case)
-        self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
-        self.test_result = True
         if case_config:
             test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
         else:
             test_suite_name = "TestRunner"
         TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
+        test_methods = SearchCases.Search.search_test_cases(test_case)
+        self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
+        self.test_result = []
 
     def run(self):
         for case in self.test_cases:
-            self.test_result = self.test_result and case.run()
+            result = case.run()
+            self.test_result.append(result)
+
+    def get_test_result(self):
+        return self.test_result and all(self.test_result)
 
 
 if __name__ == '__main__':
@@ -76,5 +80,5 @@ if __name__ == '__main__':
         except KeyboardInterrupt:
             print("exit by Ctrl-C")
             break
-    if not runner.test_result:
+    if not runner.get_test_result():
         sys.exit(1)
index 09b950c583a582e5ffe76ddcffc1ce43c7852d69..e9f9289d30fd7e87bc18175f6a4eef2dcc987a17 100644 (file)
 # limitations under the License.
 
 """ Interface for test cases. """
-import sys
 import os
 import time
 import traceback
-import inspect
 import functools
 
-import xunitgen
+import junit_xml
 
 import Env
 import DUT
@@ -28,11 +26,6 @@ import App
 import Utility
 
 
-XUNIT_FILE_NAME = "XUNIT_RESULT.xml"
-XUNIT_RECEIVER = xunitgen.EventReceiver()
-XUNIT_DEFAULT_TEST_SUITE = "test-suite"
-
-
 class DefaultEnvConfig(object):
     """
     default test configs. There're 3 places to set configs, priority is (high -> low):
@@ -69,44 +62,67 @@ set_default_config = DefaultEnvConfig.set_default_config
 get_default_config = DefaultEnvConfig.get_default_config
 
 
-class TestResult(object):
-    TEST_RESULT = {
-        "pass": [],
-        "fail": [],
-    }
+MANDATORY_INFO = {
+    "execution_time": 1,
+    "env_tag": "default",
+    "category": "function",
+    "ignore": False,
+}
+
+
+class JunitReport(object):
+    # wrapper for junit test report
+    # TODO: Don't support by multi-thread (although not likely to be used this way).
+
+    JUNIT_FILE_NAME = "XUNIT_RESULT.xml"
+    JUNIT_DEFAULT_TEST_SUITE = "test-suite"
+    JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE)
+    JUNIT_CURRENT_TEST_CASE = None
+    _TEST_CASE_CREATED_TS = 0
 
     @classmethod
-    def get_failed_cases(cls):
-        """
-        :return: failed test cases
-        """
-        return cls.TEST_RESULT["fail"]
+    def output_report(cls, junit_file_path):
+        """ Output current test result to file. """
+        with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f:
+            cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False)
 
     @classmethod
-    def get_passed_cases(cls):
+    def get_current_test_case(cls):
         """
-        :return: passed test cases
+        By default, the test framework will handle junit test report automatically.
+        While some test case might want to update some info to test report.
+        They can use this method to get current test case created by test framework.
+
+        :return: current junit test case instance created by ``JunitTestReport.create_test_case``
         """
-        return cls.TEST_RESULT["pass"]
+        return cls.JUNIT_CURRENT_TEST_CASE
 
     @classmethod
-    def set_result(cls, result, case_name):
+    def test_case_finish(cls, test_case):
         """
-        :param result: True or False
-        :param case_name: test case name
-        :return: None
+        Append the test case to test suite so it can be output to file.
+        Execution time will be automatically updated (compared to ``create_test_case``).
         """
-        cls.TEST_RESULT["pass" if result else "fail"].append(case_name)
+        test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS
+        cls.JUNIT_TEST_SUITE.test_cases.append(test_case)
 
+    @classmethod
+    def create_test_case(cls, name):
+        """
+        Extend ``junit_xml.TestCase`` with:
 
-get_failed_cases = TestResult.get_failed_cases
-get_passed_cases = TestResult.get_passed_cases
-
+        1. save create test case so it can be get by ``get_current_test_case``
+        2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``.
 
-MANDATORY_INFO = {
-    "execution_time": 1,
-    "env_tag": "default",
-}
+        :param name: test case name
+        :return: instance of ``junit_xml.TestCase``
+        """
+        # set stdout to empty string, so we can always append string to stdout.
+        # It won't affect output logic. If stdout is empty, it won't be put to report.
+        test_case = junit_xml.TestCase(name, stdout="")
+        cls.JUNIT_CURRENT_TEST_CASE = test_case
+        cls._TEST_CASE_CREATED_TS = time.time()
+        return test_case
 
 
 def test_method(**kwargs):
@@ -122,22 +138,17 @@ def test_method(**kwargs):
     :keyword env_config_file: test env config file. usually will not set this keyword when define case
     :keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
                               usually will not set this keyword when define case
+    :keyword junit_report_by_case: By default the test fw will handle junit report generation.
+                                   In some cases, one test function might test many test cases.
+                                   If this flag is set, test case can update junit report by its own.
     """
     def test(test_func):
-        # get test function file name
-        frame = inspect.stack()
-        test_func_file_name = frame[1][1]
 
         case_info = MANDATORY_INFO.copy()
-        case_info["name"] = test_func.__name__
+        case_info["name"] = case_info["ID"] = test_func.__name__
+        case_info["junit_report_by_case"] = False
         case_info.update(kwargs)
 
-        # create env instance
-        env_config = DefaultEnvConfig.get_default_config()
-        for key in kwargs:
-            if key in env_config:
-                env_config[key] = kwargs[key]
-
         @functools.wraps(test_func)
         def handle_test(extra_data=None, **overwrite):
             """
@@ -147,12 +158,20 @@ def test_method(**kwargs):
             :param overwrite: args that runner or main want to overwrite
             :return: None
             """
+            # create env instance
+            env_config = DefaultEnvConfig.get_default_config()
+            for key in kwargs:
+                if key in env_config:
+                    env_config[key] = kwargs[key]
+
             env_config.update(overwrite)
             env_inst = Env.Env(**env_config)
+
             # prepare for xunit test results
-            xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
-                                      XUNIT_FILE_NAME)
-            XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
+            junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"])
+            junit_test_case = JunitReport.create_test_case(case_info["name"])
+            result = False
+
             try:
                 Utility.console_log("starting running test: " + test_func.__name__, color="green")
                 # execute test function
@@ -162,23 +181,21 @@ def test_method(**kwargs):
             except Exception as e:
                 # handle all the exceptions here
                 traceback.print_exc()
-                result = False
                 # log failure
-                XUNIT_RECEIVER.failure(str(e), test_func_file_name)
+                junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
             finally:
-                # do close all DUTs
-                env_inst.close()
+                if not case_info["junit_report_by_case"]:
+                    JunitReport.test_case_finish(junit_test_case)
+                # do close all DUTs, if result is False then print DUT debug info
+                env_inst.close(dut_debug=(not result))
+
             # end case and output result
-            XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
-            with open(xunit_file, "ab+") as f:
-                f.write(xunitgen.toxml(XUNIT_RECEIVER.results(),
-                                       XUNIT_DEFAULT_TEST_SUITE))
+            JunitReport.output_report(junit_file_path)
 
             if result:
                 Utility.console_log("Test Succeed: " + test_func.__name__, color="green")
             else:
                 Utility.console_log(("Test Fail: " + test_func.__name__), color="red")
-            TestResult.set_result(result, test_func.__name__)
             return result
 
         handle_test.case_info = case_info
diff --git a/tools/tiny-test-fw/Utility/CIAssignTest.py b/tools/tiny-test-fw/Utility/CIAssignTest.py
new file mode 100644 (file)
index 0000000..9d727b5
--- /dev/null
@@ -0,0 +1,236 @@
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Common logic to assign test cases to CI jobs.
+
+Some background knowledge about Gitlab CI and use flow in esp-idf:
+
+* Gitlab CI jobs are static in ``.gitlab-ci.yml``. We can't dynamically create test jobs
+* For test job running on DUT, we use ``tags`` to select runners with different test environment
+* We have ``assign_test`` stage, will collect cases, and then assign them to correct test jobs
+* ``assign_test`` will fail if failed to assign any cases
+* with ``assign_test``, we can:
+    * dynamically filter test case we want to test
+    * alert user if they forget to add CI jobs and guide how to add test jobs
+* the last step of ``assign_test`` is to output config files, then test jobs will run these cases
+
+The Basic logic to assign test cases is as follow:
+
+1. do search all the cases
+2. do filter case (if filter is specified by @bot)
+3. put cases to different groups according to rule of ``Group``
+    * try to put them in existed groups
+    * if failed then create a new group and add this case
+4. parse and filter the test jobs from CI config file
+5. try to assign all groups to jobs according to tags
+6. output config files for jobs
+
+"""
+
+import os
+import re
+import json
+
+import yaml
+
+from Utility import (CaseConfig, SearchCases, GitlabCIJob, console_log)
+
+
+class Group(object):
+
+    MAX_EXECUTION_TIME = 30
+    MAX_CASE = 15
+    SORT_KEYS = ["env_tag"]
+    # Matching CI job rules could be different from the way we want to group test cases.
+    # For example, when assign unit test cases, different test cases need to use different test functions.
+    # We need to put them into different groups.
+    # But these groups can be assigned to jobs with same tags, as they use the same test environment.
+    CI_JOB_MATCH_KEYS = SORT_KEYS
+
+    def __init__(self, case):
+        self.execution_time = 0
+        self.case_list = [case]
+        self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS]))
+        # we use ci_job_match_keys to match CI job tags. It's a set of required tags.
+        self.ci_job_match_keys = set([self._get_case_attr(case, x) for x in self.CI_JOB_MATCH_KEYS])
+
+    @staticmethod
+    def _get_case_attr(case, attr):
+        # we might use different type for case (dict or test_func)
+        # this method will do get attribute form cases
+        return case.case_info[attr]
+
+    def accept_new_case(self):
+        """
+        check if allowed to add any case to this group
+
+        :return: True or False
+        """
+        max_time = (sum([self._get_case_attr(x, "execution_time") for x in self.case_list])
+                    < self.MAX_EXECUTION_TIME)
+        max_case = (len(self.case_list) < self.MAX_CASE)
+        return max_time and max_case
+
+    def add_case(self, case):
+        """
+        add case to current group
+
+        :param case: test case
+        :return: True if add succeed, else False
+        """
+        added = False
+        if self.accept_new_case():
+            for key in self.filters:
+                if self._get_case_attr(case, key) != self.filters[key]:
+                    break
+            else:
+                self.case_list.append(case)
+                added = True
+        return added
+
+    def output(self):
+        """
+        output data for job configs
+
+        :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
+        """
+        output_data = {
+            "Filter": self.filters,
+            "CaseConfig": [{"name": self._get_case_attr(x, "name")} for x in self.case_list],
+        }
+        return output_data
+
+
+class AssignTest(object):
+    """
+    Auto assign tests to CI jobs.
+
+    :param test_case_path: path of test case file(s)
+    :param ci_config_file: path of ``.gitlab-ci.yml``
+    """
+    # subclass need to rewrite CI test job pattern, to filter all test jobs
+    CI_TEST_JOB_PATTERN = re.compile(r"^test_.+")
+    # by default we only run function in CI, as other tests could take long time
+    DEFAULT_FILTER = {
+        "category": "function",
+        "ignore": False,
+    }
+
+    def __init__(self, test_case_path, ci_config_file, case_group=Group):
+        self.test_case_path = test_case_path
+        self.test_cases = []
+        self.jobs = self._parse_gitlab_ci_config(ci_config_file)
+        self.case_group = case_group
+
+    def _parse_gitlab_ci_config(self, ci_config_file):
+
+        with open(ci_config_file, "r") as f:
+            ci_config = yaml.load(f)
+
+        job_list = list()
+        for job_name in ci_config:
+            if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
+                job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
+        job_list.sort(key=lambda x: x["name"])
+        return job_list
+
+    def _search_cases(self, test_case_path, case_filter=None):
+        """
+        :param test_case_path: path contains test case folder
+        :param case_filter: filter for test cases. the filter to use is default filter updated with case_filter param.
+        :return: filtered test case list
+        """
+        _case_filter = self.DEFAULT_FILTER.copy()
+        if case_filter:
+            _case_filter.update(case_filter)
+        test_methods = SearchCases.Search.search_test_cases(test_case_path)
+        return CaseConfig.filter_test_cases(test_methods, _case_filter)
+
+    def _group_cases(self):
+        """
+        separate all cases into groups according group rules. each group will be executed by one CI job.
+
+        :return: test case groups.
+        """
+        groups = []
+        for case in self.test_cases:
+            for group in groups:
+                # add to current group
+                if group.add_case(case):
+                    break
+            else:
+                # create new group
+                groups.append(self.case_group(case))
+        return groups
+
+    @staticmethod
+    def _apply_bot_filter():
+        """
+        we support customize CI test with bot.
+        here we process from and return the filter which ``_search_cases`` accepts.
+
+        :return: filter for search test cases
+        """
+        bot_filter = os.getenv("BOT_CASE_FILTER")
+        if bot_filter:
+            bot_filter = json.loads(bot_filter)
+        else:
+            bot_filter = dict()
+        return bot_filter
+
+    def _apply_bot_test_count(self):
+        """
+        Bot could also pass test count.
+        If filtered cases need to be tested for several times, then we do duplicate them here.
+        """
+        test_count = os.getenv("BOT_TEST_COUNT")
+        if test_count:
+            test_count = int(test_count)
+            self.test_cases *= test_count
+
+    def assign_cases(self):
+        """
+        separate test cases to groups and assign test cases to CI jobs.
+
+        :raise AssertError: if failed to assign any case to CI job.
+        :return: None
+        """
+        failed_to_assign = []
+        case_filter = self._apply_bot_filter()
+        self.test_cases = self._search_cases(self.test_case_path, case_filter)
+        self._apply_bot_test_count()
+        test_groups = self._group_cases()
+        for group in test_groups:
+            for job in self.jobs:
+                if job.match_group(group):
+                    job.assign_group(group)
+                    break
+            else:
+                failed_to_assign.append(group)
+        if failed_to_assign:
+            console_log("Too many test cases vs jobs to run. Please add the following jobs to .gitlab-ci.yml with specific tags:", "R")
+            for group in failed_to_assign:
+                console_log("* Add job with: " + ",".join(group.ci_job_match_keys), "R")
+            raise RuntimeError("Failed to assign test case to CI jobs")
+
+    def output_configs(self, output_path):
+        """
+        :param output_path: path to output config files for each CI job
+        :return: None
+        """
+        if not os.path.exists(output_path):
+            os.makedirs(output_path)
+        for job in self.jobs:
+            job.output_config(output_path)
index 1fe5df42bad71dc6ccdc18ba7a8edf098bf85549..3260c9b6ee855e74583c13eac5fc57a23462cf4b 100644 (file)
@@ -51,14 +51,33 @@ import yaml
 import TestCase
 
 
+def _convert_to_lower_case(item):
+    """
+    bot filter is always lower case string.
+    this function will convert to all string to lower case.
+    """
+    if isinstance(item, (tuple, list)):
+        output = [_convert_to_lower_case(v) for v in item]
+    elif isinstance(item, str):
+        output = item.lower()
+    else:
+        output = item
+    return output
+
+
 def _filter_one_case(test_method, case_filter):
     """ Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """
     filter_result = True
-    for key in case_filter:
+    # filter keys are lower case. Do map lower case keys with original keys.
+    key_mapping = {x.lower(): x for x in test_method.case_info.keys()}
+
+    for orig_key in case_filter:
+        key = key_mapping[orig_key]
         if key in test_method.case_info:
             # the filter key is both in case and filter
             # we need to check if they match
-            filter_item, accepted_item = case_filter[key], test_method.case_info[key]
+            filter_item = _convert_to_lower_case(case_filter[orig_key])
+            accepted_item = _convert_to_lower_case(test_method.case_info[key])
 
             if isinstance(filter_item, (tuple, list)) \
                     and isinstance(accepted_item, (tuple, list)):
@@ -91,6 +110,7 @@ def filter_test_cases(test_methods, case_filter):
             * if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple
             * if both are list/tuple, then check if they have common item
         2. if only case attribute or filter have the key, filter succeed
+        3. will do case insensitive compare for string
 
     for example, the following are match succeed scenarios
     (the rule is symmetric, result is same if exchange values for user filter and case attribute):
index 05f6393c66e02e75698764398f4c2d62ac0e9d3a..e6173b035c6740026ec80c67543f76022fdc18db 100644 (file)
@@ -27,6 +27,7 @@ class Job(dict):
     def __init__(self, job, job_name):
         super(Job, self).__init__(job)
         self["name"] = job_name
+        self.tags = set(self["tags"])
 
     def match_group(self, group):
         """
@@ -37,17 +38,8 @@ class Job(dict):
         :return: True or False
         """
         match_result = False
-        for _ in range(1):
-            if "case group" in self:
-                # this job is already assigned
-                break
-            for value in group.filters.values():
-                if value not in self["tags"]:
-                    break
-            else:
-                continue
-            break
-        else:
+        if "case group" not in self and group.ci_job_match_keys == self.tags:
+            # group not assigned and all tags match
             match_result = True
         return match_result
 
@@ -70,4 +62,4 @@ class Job(dict):
         file_name = os.path.join(file_path, self["name"] + ".yml")
         if "case group" in self:
             with open(file_name, "w") as f:
-                yaml.dump(self["case group"].output(), f)
+                yaml.dump(self["case group"].output(), f, default_flow_style=False)
index a70e6bf328e01b793f0a4a96d2c544f0dc2a302d..df132bc0965301c7a0d1a5b31fd6d8335fa8a65c 100644 (file)
@@ -69,7 +69,7 @@ Let's first check a simple simple::
 
 
     if __name__ == '__main__':
-        TinyFW.set_default_config(config_file="EnvConfigTemplate.yml")
+        TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml")
         test_examples_protocol_https_request()
 
 
@@ -127,7 +127,9 @@ The following 3rd party lib is required:
 
     * pyserial
     * pyyaml
-    * xunitgen
+    * junit_xml
+    * netifaces
+    * matplotlib (if use Utility.LineChart)
 
 To build document, we need to install ``Sphinx`` and ``sphinx-rtd-theme`` (you may replace this with your own theme).
 
index df1b25576e0fd04bcdf88f3c303ae4194ba5e4f5..324c90438340cf8615a742a2fd42b90d58a7bf16 100644 (file)
@@ -47,5 +47,5 @@ def test_examples_protocol_https_request(env, extra_data):
 
 
 if __name__ == '__main__':
-    TinyFW.set_default_config(config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
+    TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
     test_examples_protocol_https_request()
diff --git a/tools/tiny-test-fw/requirements.txt b/tools/tiny-test-fw/requirements.txt
new file mode 100644 (file)
index 0000000..aa6b53b
--- /dev/null
@@ -0,0 +1,5 @@
+pyserial
+pyyaml
+junit_xml
+netifaces
+matplotlib