if not test_suite_name:
test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
sdk_path = cls.get_sdk_path()
- return os.path.join(sdk_path, "TEST_LOGS",
- test_suite_name +
- time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
+ log_folder = os.path.join(sdk_path, "TEST_LOGS",
+ test_suite_name +
+ time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
+ if not os.path.exists(log_folder):
+ os.makedirs(log_folder)
+ return log_folder
def process_app_info(self):
"""
import re
import argparse
-import yaml
-
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path:
sys.path.insert(0, test_fw_path)
-from Utility import CaseConfig, SearchCases, GitlabCIJob
-
-
-class Group(object):
-
- MAX_EXECUTION_TIME = 30
- MAX_CASE = 15
- SORT_KEYS = ["env_tag"]
-
- def __init__(self, case):
- self.execution_time = 0
- self.case_list = [case]
- self.filters = dict(zip(self.SORT_KEYS, [case.case_info[x] for x in self.SORT_KEYS]))
-
- def accept_new_case(self):
- """
- check if allowed to add any case to this group
-
- :return: True or False
- """
- max_time = (sum([x.case_info["execution_time"] for x in self.case_list]) < self.MAX_EXECUTION_TIME)
- max_case = (len(self.case_list) < self.MAX_CASE)
- return max_time and max_case
-
- def add_case(self, case):
- """
- add case to current group
-
- :param case: test case
- :return: True if add succeed, else False
- """
- added = False
- if self.accept_new_case():
- for key in self.filters:
- if case.case_info[key] != self.filters[key]:
- break
- else:
- self.case_list.append(case)
- added = True
- return added
-
- def output(self):
- """
- output data for job configs
-
- :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
- """
- output_data = {
- "Filter": self.filters,
- "CaseConfig": [{"name": x.case_info["name"]} for x in self.case_list],
- }
- return output_data
+from Utility.CIAssignTest import AssignTest, Group
-class AssignTest(object):
- """
- Auto assign tests to CI jobs.
+class ExampleGroup(Group):
+ SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "chip"]
- :param test_case: path of test case file(s)
- :param ci_config_file: path of ``.gitlab-ci.yml``
- """
+class CIExampleAssignTest(AssignTest):
CI_TEST_JOB_PATTERN = re.compile(r"^example_test_.+")
- def __init__(self, test_case, ci_config_file):
- self.test_cases = self._search_cases(test_case)
- self.jobs = self._parse_gitlab_ci_config(ci_config_file)
-
- def _parse_gitlab_ci_config(self, ci_config_file):
-
- with open(ci_config_file, "r") as f:
- ci_config = yaml.load(f)
-
- job_list = list()
- for job_name in ci_config:
- if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
- job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
- return job_list
-
- @staticmethod
- def _search_cases(test_case, case_filter=None):
- """
- :param test_case: path contains test case folder
- :param case_filter: filter for test cases
- :return: filtered test case list
- """
- test_methods = SearchCases.Search.search_test_cases(test_case)
- return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
-
- def _group_cases(self):
- """
- separate all cases into groups according group rules. each group will be executed by one CI job.
-
- :return: test case groups.
- """
- groups = []
- for case in self.test_cases:
- for group in groups:
- # add to current group
- if group.add_case(case):
- break
- else:
- # create new group
- groups.append(Group(case))
- return groups
-
- def assign_cases(self):
- """
- separate test cases to groups and assign test cases to CI jobs.
-
- :raise AssertError: if failed to assign any case to CI job.
- :return: None
- """
- failed_to_assign = []
- test_groups = self._group_cases()
- for group in test_groups:
- for job in self.jobs:
- if job.match_group(group):
- job.assign_group(group)
- break
- else:
- failed_to_assign.append(group)
- assert not failed_to_assign
-
- def output_configs(self, output_path):
- """
-
- :param output_path: path to output config files for each CI job
- :return: None
- """
- if not os.path.exists(output_path):
- os.makedirs(output_path)
- for job in self.jobs:
- job.output_config(output_path)
-
if __name__ == '__main__':
parser = argparse.ArgumentParser()
help="output path of config files")
args = parser.parse_args()
- assign_test = AssignTest(args.test_case, args.ci_config_file)
+ assign_test = CIExampleAssignTest(args.test_case, args.ci_config_file, case_group=ExampleGroup)
assign_test.assign_cases()
assign_test.output_configs(args.output_path)
--- /dev/null
+"""
+Command line tool to assign unit tests to CI test jobs.
+"""
+
+import re
+import os
+import sys
+import argparse
+
+import yaml
+
+test_fw_path = os.getenv("TEST_FW_PATH")
+if test_fw_path:
+ sys.path.insert(0, test_fw_path)
+
+from Utility import CIAssignTest
+
+
+class Group(CIAssignTest.Group):
+ SORT_KEYS = ["config", "SDK", "test environment", "multi_device", "multi_stage", "tags"]
+ MAX_CASE = 30
+ ATTR_CONVERT_TABLE = {
+ "execution_time": "execution time"
+ }
+ # when IDF support multiple chips, SDK will be moved into tags, we can remove it
+ CI_JOB_MATCH_KEYS = ["test environment", "SDK"]
+
+ def __init__(self, case):
+ super(Group, self).__init__(case)
+ for tag in self._get_case_attr(case, "tags"):
+ self.ci_job_match_keys.add(tag)
+
+ @staticmethod
+ def _get_case_attr(case, attr):
+ if attr in Group.ATTR_CONVERT_TABLE:
+ attr = Group.ATTR_CONVERT_TABLE[attr]
+ return case[attr]
+
+ def _create_extra_data(self, test_function):
+ """
+ For unit test case, we need to copy some attributes of test cases into config file.
+ So unit test function knows how to run the case.
+ """
+ case_data = []
+ for case in self.case_list:
+ one_case_data = {
+ "config": self._get_case_attr(case, "config"),
+ "name": self._get_case_attr(case, "summary"),
+ "reset": self._get_case_attr(case, "reset"),
+ "timeout": self._get_case_attr(case, "timeout"),
+ }
+
+ if test_function in ["run_multiple_devices_cases", "run_multiple_stage_cases"]:
+ try:
+ one_case_data["child case num"] = self._get_case_attr(case, "child case num")
+ except KeyError as e:
+ print("multiple devices/stages cases must contains at least two test functions")
+ print("case name: {}".format(one_case_data["name"]))
+ raise e
+
+ case_data.append(one_case_data)
+ return case_data
+
+ def _map_test_function(self):
+ """
+ determine which test function to use according to current test case
+
+ :return: test function name to use
+ """
+ if self.filters["multi_device"] == "Yes":
+ test_function = "run_multiple_devices_cases"
+ elif self.filters["multi_stage"] == "Yes":
+ test_function = "run_multiple_stage_cases"
+ else:
+ test_function = "run_unit_test_cases"
+ return test_function
+
+ def output(self):
+ """
+ output data for job configs
+
+ :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
+ """
+ test_function = self._map_test_function()
+ output_data = {
+ # we don't need filter for test function, as UT uses a few test functions for all cases
+ "CaseConfig": [
+ {
+ "name": test_function,
+ "extra_data": self._create_extra_data(test_function),
+ }
+ ]
+ }
+ return output_data
+
+
+class UnitTestAssignTest(CIAssignTest.AssignTest):
+ CI_TEST_JOB_PATTERN = re.compile(r"^UT_.+")
+
+ def __init__(self, test_case_path, ci_config_file):
+ CIAssignTest.AssignTest.__init__(self, test_case_path, ci_config_file, case_group=Group)
+
+ def _search_cases(self, test_case_path, case_filter=None):
+ """
+ For unit test case, we don't search for test functions.
+ The unit test cases is stored in a yaml file which is created in job build-idf-test.
+ """
+
+ try:
+ with open(test_case_path, "r") as f:
+ raw_data = yaml.load(f)
+ test_cases = raw_data["test cases"]
+ except IOError:
+ print("Test case path is invalid. Should only happen when use @bot to skip unit test.")
+ test_cases = []
+ # filter keys are lower case. Do map lower case keys with original keys.
+ try:
+ key_mapping = {x.lower(): x for x in test_cases[0].keys()}
+ except IndexError:
+ key_mapping = dict()
+ if case_filter:
+ for key in case_filter:
+ filtered_cases = []
+ for case in test_cases:
+ try:
+ mapped_key = key_mapping[key]
+ # bot converts string to lower case
+ if isinstance(case[mapped_key], str):
+ _value = case[mapped_key].lower()
+ else:
+ _value = case[mapped_key]
+ if _value in case_filter[key]:
+ filtered_cases.append(case)
+ except KeyError:
+ # case don't have this key, regard as filter success
+ filtered_cases.append(case)
+ test_cases = filtered_cases
+ return test_cases
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument("test_case",
+ help="test case folder or file")
+ parser.add_argument("ci_config_file",
+ help="gitlab ci config file")
+ parser.add_argument("output_path",
+ help="output path of config files")
+ args = parser.parse_args()
+
+ assign_test = UnitTestAssignTest(args.test_case, args.ci_config_file)
+ assign_test.assign_cases()
+ assign_test.output_configs(args.output_path)
return data
+def _pattern_to_string(pattern):
+ try:
+ ret = "RegEx: " + pattern.pattern
+ except AttributeError:
+ ret = pattern
+ return ret
+
+
class _DataCache(_queue.Queue):
"""
Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
_queue.Queue.__init__(self, maxsize=maxsize)
self.data_cache = str()
- def get_data(self, timeout=0):
+ def _move_from_queue_to_cache(self):
+ """
+ move all of the available data in the queue to cache
+
+ :return: True if moved any item from queue to data cache, else False
+ """
+ ret = False
+ while True:
+ try:
+ self.data_cache += _decode_data(self.get(0))
+ ret = True
+ except _queue.Empty:
+ break
+ return ret
+
+ def get_data(self, timeout=0.0):
"""
get a copy of data from cache.
if timeout < 0:
timeout = 0
- try:
- data = self.get(timeout=timeout)
- self.data_cache += _decode_data(data)
- except _queue.Empty:
- # don't do anything when on update for cache
- pass
+ ret = self._move_from_queue_to_cache()
+
+ if not ret:
+ # we only wait for new data if we can't provide a new data_cache
+ try:
+ data = self.get(timeout=timeout)
+ self.data_cache += _decode_data(data)
+ except _queue.Empty:
+ # don't do anything when on update for cache
+ pass
return copy.deepcopy(self.data_cache)
def flush(self, index=0xFFFFFFFF):
self.data_cache = self.data_cache[index:]
+class _LogThread(threading.Thread, _queue.Queue):
+ """
+ We found some SD card on Raspberry Pi could have very bad performance.
+ It could take seconds to save small amount of data.
+ If the DUT receives data and save it as log, then it stops receiving data until log is saved.
+ This could lead to expect timeout.
+ As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs.
+ Then data will be passed to ``expect`` as soon as received.
+ """
+ def __init__(self):
+ threading.Thread.__init__(self, name="LogThread")
+ _queue.Queue.__init__(self, maxsize=0)
+ self.setDaemon(True)
+ self.flush_lock = threading.Lock()
+
+ def save_log(self, filename, data):
+ """
+ :param filename: log file name
+ :param data: log data. Must be ``bytes``.
+ """
+ self.put({"filename": filename, "data": data})
+
+ def flush_data(self):
+ with self.flush_lock:
+ data_cache = dict()
+ while True:
+ # move all data from queue to data cache
+ try:
+ log = self.get_nowait()
+ try:
+ data_cache[log["filename"]] += log["data"]
+ except KeyError:
+ data_cache[log["filename"]] = log["data"]
+ except _queue.Empty:
+ break
+ # flush data
+ for filename in data_cache:
+ with open(filename, "ab+") as f:
+ f.write(data_cache[filename])
+
+ def run(self):
+ while True:
+ time.sleep(1)
+ self.flush_data()
+
+
class _RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
- def __init__(self, read, data_cache):
+ def __init__(self, read, data_cache, recorded_data, record_data_lock):
super(_RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
self.data_cache = data_cache
+ self.recorded_data = recorded_data
+ self.record_data_lock = record_data_lock
# cache the last line of recv data for collecting performance
self._line_cache = str()
while not self.exit_event.isSet():
data = self.read(1000)
if data:
- self.data_cache.put(data)
+ with self.record_data_lock:
+ self.data_cache.put(data)
+ for capture_id in self.recorded_data:
+ self.recorded_data[capture_id].put(data)
self.collect_performance(data)
def exit(self):
"""
DEFAULT_EXPECT_TIMEOUT = 5
+ MAX_EXPECT_FAILURES_TO_SAVED = 10
+
+ LOG_THREAD = _LogThread()
+ LOG_THREAD.start()
def __init__(self, name, port, log_file, app, **kwargs):
self.log_file = log_file
self.app = app
self.data_cache = _DataCache()
+ # the main process of recorded data are done in receive thread
+ # but receive thread could be closed in DUT lifetime (tool methods)
+ # so we keep it in BaseDUT, as their life cycle are same
+ self.recorded_data = dict()
+ self.record_data_lock = threading.RLock()
self.receive_thread = None
+ self.expect_failures = []
# open and start during init
self.open()
def __str__(self):
return "DUT({}: {})".format(self.name, str(self.port))
+ def _save_expect_failure(self, pattern, data, start_time):
+ """
+ Save expect failure. If the test fails, then it will print the expect failures.
+ In some cases, user will handle expect exceptions.
+ The expect failures could be false alarm, and test case might generate a lot of such failures.
+ Therefore, we don't print the failure immediately and limit the max size of failure list.
+ """
+ self.expect_failures.insert(0, {"pattern": pattern, "data": data,
+ "start": start_time, "end": time.time()})
+ self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
+
+ def _save_dut_log(self, data):
+ """
+ Save DUT log into file using another thread.
+ This is a workaround for some devices takes long time for file system operations.
+
+ See descriptions in ``_LogThread`` for details.
+ """
+ self.LOG_THREAD.save_log(self.log_file, data)
+
# define for methods need to be overwritten by Port
@classmethod
def list_available_ports(cls):
:return: None
"""
self._port_open()
- self.receive_thread = _RecvThread(self._port_read, self.data_cache)
+ self.receive_thread = _RecvThread(self._port_read, self.data_cache,
+ self.recorded_data, self.record_data_lock)
self.receive_thread.start()
def close(self):
if self.receive_thread:
self.receive_thread.exit()
self._port_close()
+ self.LOG_THREAD.flush_data()
def write(self, data, eol="\r\n", flush=True):
"""
if flush:
self.data_cache.flush()
# do write if cache
- if data:
+ if data is not None:
self._port_write(data + eol if eol else data)
@_expect_lock
self.data_cache.flush(size)
return data
+ def start_capture_raw_data(self, capture_id="default"):
+ """
+ Sometime application want to get DUT raw data and use ``expect`` method at the same time.
+ Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method.
+
+ If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID.
+
+ :param capture_id: ID of capture. You can use different IDs to do different captures at the same time.
+ """
+ with self.record_data_lock:
+ try:
+ # if start capture on existed ID, we do flush data and restart capture
+ self.recorded_data[capture_id].flush()
+ except KeyError:
+ # otherwise, create new data cache
+ self.recorded_data[capture_id] = _DataCache()
+
+ def stop_capture_raw_data(self, capture_id="default"):
+ """
+ Stop capture and get raw data.
+ This method should be used after ``start_capture_raw_data`` on the same capture ID.
+
+ :param capture_id: ID of capture.
+ :return: captured raw data between start capture and stop capture.
+ """
+ with self.record_data_lock:
+ try:
+ ret = self.recorded_data[capture_id].get_data()
+ self.recorded_data.pop(capture_id)
+ except KeyError as e:
+ e.message = "capture_id does not exist. " \
+ "You should call start_capture_raw_data with same ID " \
+ "before calling stop_capture_raw_data"
+ raise e
+ return ret
+
# expect related methods
@staticmethod
start_time = time.time()
while True:
ret, index = method(data, pattern)
- if ret is not None or time.time() - start_time > timeout:
+ if ret is not None:
self.data_cache.flush(index)
break
+ time_remaining = start_time + timeout - time.time()
+ if time_remaining < 0:
+ break
# wait for new data from cache
- data = self.data_cache.get_data(time.time() + timeout - start_time)
+ data = self.data_cache.get_data(time_remaining)
if ret is None:
- raise ExpectTimeout(self.name + ": " + str(pattern))
+ pattern = _pattern_to_string(pattern)
+ self._save_expect_failure(pattern, data, start_time)
+ raise ExpectTimeout(self.name + ": " + pattern)
return ret
def _expect_multi(self, expect_all, expect_item_list, timeout):
if expect_item["ret"] is not None:
# match succeed for one item
matched_expect_items.append(expect_item)
- break
# if expect all, then all items need to be matched,
# else only one item need to matched
if expect_all:
- match_succeed = (matched_expect_items == expect_items)
+ match_succeed = len(matched_expect_items) == len(expect_items)
else:
match_succeed = True if matched_expect_items else False
- if time.time() - start_time > timeout or match_succeed:
+ time_remaining = start_time + timeout - time.time()
+ if time_remaining < 0 or match_succeed:
break
else:
- data = self.data_cache.get_data(time.time() + timeout - start_time)
+ data = self.data_cache.get_data(time_remaining)
if match_succeed:
- # do callback and flush matched data cache
+ # sort matched items according to order of appearance in the input data,
+ # so that the callbacks are invoked in correct order
+ matched_expect_items = sorted(matched_expect_items, key=lambda it: it["index"])
+ # invoke callbacks and flush matched data cache
slice_index = -1
for expect_item in matched_expect_items:
# trigger callback
# flush already matched data
self.data_cache.flush(slice_index)
else:
- raise ExpectTimeout(self.name + ": " + str(expect_items))
+ pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
+ self._save_expect_failure(pattern, data, start_time)
+ raise ExpectTimeout(self.name + ": " + pattern)
@_expect_lock
def expect_any(self, *expect_items, **timeout):
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
return self._expect_multi(True, expect_items, **timeout)
+ @staticmethod
+ def _format_ts(ts):
+ return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
+
+ def print_debug_info(self):
+ """
+ Print debug info of current DUT. Currently we will print debug info for expect failures.
+ """
+ Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
+
+ for failure in self.expect_failures:
+ Utility.console_log(u"\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
+ .format(failure["pattern"], failure["data"],
+ self._format_ts(failure["start"]), self._format_ts(failure["end"])),
+ color="orange")
+
class SerialDUT(BaseDUT):
""" serial with logging received data feature """
self.serial_configs.update(kwargs)
super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
- @staticmethod
- def _format_data(data):
+ def _format_data(self, data):
"""
format data for logging. do decode and add timestamp.
:param data: raw data from read
:return: formatted data (str)
"""
- timestamp = time.time()
- timestamp = "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
- str(timestamp % 1)[2:5])
- formatted_data = "[{}]:\r\n{}\r\n".format(timestamp, _decode_data(data))
+ timestamp = "[{}]".format(self._format_ts(time.time()))
+ formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
return formatted_data
def _port_open(self):
def _port_read(self, size=1):
data = self.port_inst.read(size)
if data:
- with open(self.log_file, "a+") as _log_file:
- _log_file.write(self._format_data(data))
+ self._save_dut_log(self._format_data(data))
return data
def _port_write(self, data):
+ if isinstance(data, str):
+ data = data.encode()
self.port_inst.write(data)
@classmethod
import threading
import functools
+import netifaces
+
import EnvConfig
dut=None,
env_tag=None,
env_config_file=None,
- test_name=None,
+ test_suite_name=None,
**kwargs):
self.app_cls = app
self.default_dut_cls = dut
self.config = EnvConfig.Config(env_config_file, env_tag)
- self.log_path = self.app_cls.get_log_folder(test_name)
+ self.log_path = self.app_cls.get_log_folder(test_suite_name)
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
"""
return self.config.get_variable(variable_name)
+ PROTO_MAP = {
+ "ipv4": netifaces.AF_INET,
+ "ipv6": netifaces.AF_INET6,
+ "mac": netifaces.AF_LINK,
+ }
+
@_synced
- def get_pc_nic_info(self, nic_name="pc_nic"):
+ def get_pc_nic_info(self, nic_name="pc_nic", proto="ipv4"):
"""
get_pc_nic_info(nic_name="pc_nic")
- try to get nic info (ip address, ipv6 address, mac address)
+ try to get info of a specified NIC and protocol.
- :param nic_name: pc nic name. allows passing variable name, nic name value or omitted (to get default nic info).
- :return: a dict of address ("ipv4", "ipv6", "mac") if successfully found. otherwise None.
+ :param nic_name: pc nic name. allows passing variable name, nic name value.
+ :param proto: "ipv4", "ipv6" or "mac"
+ :return: a dict of nic info if successfully found. otherwise None.
+ nic info keys could be different for different protocols.
+ key "addr" is available for both mac, ipv4 and ipv6 pic info.
"""
- # TODO: need to implement auto get nic info method
- return self.config.get_variable("nic_info/" + nic_name)
+ interfaces = netifaces.interfaces()
+ if nic_name in interfaces:
+ # the name is in the interface list, we regard it as NIC name
+ if_addr = netifaces.ifaddresses(nic_name)
+ else:
+ # it's not in interface name list, we assume it's variable name
+ _nic_name = self.get_variable(nic_name)
+ if_addr = netifaces.ifaddresses(_nic_name)
+
+ return if_addr[self.PROTO_MAP[proto]][0]
@_synced
- def close(self):
+ def close(self, dut_debug=False):
"""
close()
close all DUTs of the Env.
+ :param dut_debug: if dut_debug is True, then print all dut expect failures before close it
:return: None
"""
for dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
+ if dut_debug:
+ dut.print_debug_info()
dut.close()
self.allocated_duts = dict()
try:
with open(config_file) as f:
configs = yaml.load(f)[env_name]
- except (OSError, TypeError):
+ except (OSError, TypeError, IOError):
configs = dict()
return configs
class UT(IDFApp):
def get_binary_path(self, app_path):
- if app_path:
- # specified path, join it and the idf path
- path = os.path.join(self.idf_path, app_path)
- else:
- path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
+ """
+ :param app_path: app path or app config
+ :return: binary path
+ """
+ if not app_path:
+ app_path = "default"
+
+ path = os.path.join(self.idf_path, app_path)
+ if not os.path.exists(path):
+ while True:
+ # try to get by config
+ if app_path == "default":
+ # it's default config, we first try to get form build folder of unit-test-app
+ path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
+ if os.path.exists(path):
+ # found, use bin in build path
+ break
+ # ``make ut-build-all-configs`` or ``make ut-build-CONFIG`` will copy binary to output folder
+ path = os.path.join(self.idf_path, "tools", "unit-test-app", "output", app_path)
+ if os.path.exists(path):
+ break
+ raise OSError("Failed to get unit-test-app binary path")
return path
import random
import tempfile
+from serial.tools import list_ports
+
import DUT
execution_time=execution_time, level=level, **kwargs)
+def idf_unit_test(app=UT, dut=IDFDUT, chip="ESP32", module="unit-test", execution_time=1,
+ level="unit", erase_nvs=True, **kwargs):
+ """
+ decorator for testing idf unit tests (with default values for some keyword args).
+
+ :param app: test application class
+ :param dut: dut class
+ :param chip: chip supported, string or tuple
+ :param module: module, string
+ :param execution_time: execution time in minutes, int
+ :param level: test level, could be used to filter test cases, string
+ :param erase_nvs: if need to erase_nvs in DUT.start_app()
+ :param kwargs: other keyword args
+ :return: test method
+ """
+ try:
+ # try to config the default behavior of erase nvs
+ dut.ERASE_NVS = erase_nvs
+ except AttributeError:
+ pass
+
+ return TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
+ execution_time=execution_time, level=level, **kwargs)
+
+
def log_performance(item, value):
"""
do print performance with pre-defined format to console
:param item: performance item name
:param value: performance value
"""
- Utility.console_log("[Performance][{}]: {}".format(item, value), "orange")
+ performance_msg = "[Performance][{}]: {}".format(item, value)
+ Utility.console_log(performance_msg, "orange")
+ # update to junit test report
+ current_junit_case = TinyFW.JunitReport.get_current_test_case()
+ current_junit_case.stdout += performance_msg + "\r\n"
def check_performance(item, value):
def __init__(self, test_case, case_config, env_config_file=None):
super(Runner, self).__init__()
self.setDaemon(True)
- test_methods = SearchCases.Search.search_test_cases(test_case)
- self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
- self.test_result = True
if case_config:
test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
else:
test_suite_name = "TestRunner"
TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
+ test_methods = SearchCases.Search.search_test_cases(test_case)
+ self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
+ self.test_result = []
def run(self):
for case in self.test_cases:
- self.test_result = self.test_result and case.run()
+ result = case.run()
+ self.test_result.append(result)
+
+ def get_test_result(self):
+ return self.test_result and all(self.test_result)
if __name__ == '__main__':
except KeyboardInterrupt:
print("exit by Ctrl-C")
break
- if not runner.test_result:
+ if not runner.get_test_result():
sys.exit(1)
# limitations under the License.
""" Interface for test cases. """
-import sys
import os
import time
import traceback
-import inspect
import functools
-import xunitgen
+import junit_xml
import Env
import DUT
import Utility
-XUNIT_FILE_NAME = "XUNIT_RESULT.xml"
-XUNIT_RECEIVER = xunitgen.EventReceiver()
-XUNIT_DEFAULT_TEST_SUITE = "test-suite"
-
-
class DefaultEnvConfig(object):
"""
default test configs. There're 3 places to set configs, priority is (high -> low):
get_default_config = DefaultEnvConfig.get_default_config
-class TestResult(object):
- TEST_RESULT = {
- "pass": [],
- "fail": [],
- }
+MANDATORY_INFO = {
+ "execution_time": 1,
+ "env_tag": "default",
+ "category": "function",
+ "ignore": False,
+}
+
+
+class JunitReport(object):
+ # wrapper for junit test report
+ # TODO: Don't support by multi-thread (although not likely to be used this way).
+
+ JUNIT_FILE_NAME = "XUNIT_RESULT.xml"
+ JUNIT_DEFAULT_TEST_SUITE = "test-suite"
+ JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE)
+ JUNIT_CURRENT_TEST_CASE = None
+ _TEST_CASE_CREATED_TS = 0
@classmethod
- def get_failed_cases(cls):
- """
- :return: failed test cases
- """
- return cls.TEST_RESULT["fail"]
+ def output_report(cls, junit_file_path):
+ """ Output current test result to file. """
+ with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f:
+ cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False)
@classmethod
- def get_passed_cases(cls):
+ def get_current_test_case(cls):
"""
- :return: passed test cases
+ By default, the test framework will handle junit test report automatically.
+ While some test case might want to update some info to test report.
+ They can use this method to get current test case created by test framework.
+
+ :return: current junit test case instance created by ``JunitTestReport.create_test_case``
"""
- return cls.TEST_RESULT["pass"]
+ return cls.JUNIT_CURRENT_TEST_CASE
@classmethod
- def set_result(cls, result, case_name):
+ def test_case_finish(cls, test_case):
"""
- :param result: True or False
- :param case_name: test case name
- :return: None
+ Append the test case to test suite so it can be output to file.
+ Execution time will be automatically updated (compared to ``create_test_case``).
"""
- cls.TEST_RESULT["pass" if result else "fail"].append(case_name)
+ test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS
+ cls.JUNIT_TEST_SUITE.test_cases.append(test_case)
+ @classmethod
+ def create_test_case(cls, name):
+ """
+ Extend ``junit_xml.TestCase`` with:
-get_failed_cases = TestResult.get_failed_cases
-get_passed_cases = TestResult.get_passed_cases
-
+ 1. save create test case so it can be get by ``get_current_test_case``
+ 2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``.
-MANDATORY_INFO = {
- "execution_time": 1,
- "env_tag": "default",
-}
+ :param name: test case name
+ :return: instance of ``junit_xml.TestCase``
+ """
+ # set stdout to empty string, so we can always append string to stdout.
+ # It won't affect output logic. If stdout is empty, it won't be put to report.
+ test_case = junit_xml.TestCase(name, stdout="")
+ cls.JUNIT_CURRENT_TEST_CASE = test_case
+ cls._TEST_CASE_CREATED_TS = time.time()
+ return test_case
def test_method(**kwargs):
:keyword env_config_file: test env config file. usually will not set this keyword when define case
:keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
usually will not set this keyword when define case
+ :keyword junit_report_by_case: By default the test fw will handle junit report generation.
+ In some cases, one test function might test many test cases.
+ If this flag is set, test case can update junit report by its own.
"""
def test(test_func):
- # get test function file name
- frame = inspect.stack()
- test_func_file_name = frame[1][1]
case_info = MANDATORY_INFO.copy()
- case_info["name"] = test_func.__name__
+ case_info["name"] = case_info["ID"] = test_func.__name__
+ case_info["junit_report_by_case"] = False
case_info.update(kwargs)
- # create env instance
- env_config = DefaultEnvConfig.get_default_config()
- for key in kwargs:
- if key in env_config:
- env_config[key] = kwargs[key]
-
@functools.wraps(test_func)
def handle_test(extra_data=None, **overwrite):
"""
:param overwrite: args that runner or main want to overwrite
:return: None
"""
+ # create env instance
+ env_config = DefaultEnvConfig.get_default_config()
+ for key in kwargs:
+ if key in env_config:
+ env_config[key] = kwargs[key]
+
env_config.update(overwrite)
env_inst = Env.Env(**env_config)
+
# prepare for xunit test results
- xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
- XUNIT_FILE_NAME)
- XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
+ junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"])
+ junit_test_case = JunitReport.create_test_case(case_info["name"])
+ result = False
+
try:
Utility.console_log("starting running test: " + test_func.__name__, color="green")
# execute test function
except Exception as e:
# handle all the exceptions here
traceback.print_exc()
- result = False
# log failure
- XUNIT_RECEIVER.failure(str(e), test_func_file_name)
+ junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
finally:
- # do close all DUTs
- env_inst.close()
+ if not case_info["junit_report_by_case"]:
+ JunitReport.test_case_finish(junit_test_case)
+ # do close all DUTs, if result is False then print DUT debug info
+ env_inst.close(dut_debug=(not result))
+
# end case and output result
- XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
- with open(xunit_file, "ab+") as f:
- f.write(xunitgen.toxml(XUNIT_RECEIVER.results(),
- XUNIT_DEFAULT_TEST_SUITE))
+ JunitReport.output_report(junit_file_path)
if result:
Utility.console_log("Test Succeed: " + test_func.__name__, color="green")
else:
Utility.console_log(("Test Fail: " + test_func.__name__), color="red")
- TestResult.set_result(result, test_func.__name__)
return result
handle_test.case_info = case_info
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Common logic to assign test cases to CI jobs.
+
+Some background knowledge about Gitlab CI and use flow in esp-idf:
+
+* Gitlab CI jobs are static in ``.gitlab-ci.yml``. We can't dynamically create test jobs
+* For test job running on DUT, we use ``tags`` to select runners with different test environment
+* We have ``assign_test`` stage, will collect cases, and then assign them to correct test jobs
+* ``assign_test`` will fail if failed to assign any cases
+* with ``assign_test``, we can:
+ * dynamically filter test case we want to test
+ * alert user if they forget to add CI jobs and guide how to add test jobs
+* the last step of ``assign_test`` is to output config files, then test jobs will run these cases
+
+The Basic logic to assign test cases is as follow:
+
+1. do search all the cases
+2. do filter case (if filter is specified by @bot)
+3. put cases to different groups according to rule of ``Group``
+ * try to put them in existed groups
+ * if failed then create a new group and add this case
+4. parse and filter the test jobs from CI config file
+5. try to assign all groups to jobs according to tags
+6. output config files for jobs
+
+"""
+
+import os
+import re
+import json
+
+import yaml
+
+from Utility import (CaseConfig, SearchCases, GitlabCIJob, console_log)
+
+
+class Group(object):
+
+ MAX_EXECUTION_TIME = 30
+ MAX_CASE = 15
+ SORT_KEYS = ["env_tag"]
+ # Matching CI job rules could be different from the way we want to group test cases.
+ # For example, when assign unit test cases, different test cases need to use different test functions.
+ # We need to put them into different groups.
+ # But these groups can be assigned to jobs with same tags, as they use the same test environment.
+ CI_JOB_MATCH_KEYS = SORT_KEYS
+
+ def __init__(self, case):
+ self.execution_time = 0
+ self.case_list = [case]
+ self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS]))
+ # we use ci_job_match_keys to match CI job tags. It's a set of required tags.
+ self.ci_job_match_keys = set([self._get_case_attr(case, x) for x in self.CI_JOB_MATCH_KEYS])
+
+ @staticmethod
+ def _get_case_attr(case, attr):
+ # we might use different type for case (dict or test_func)
+ # this method will do get attribute form cases
+ return case.case_info[attr]
+
+ def accept_new_case(self):
+ """
+ check if allowed to add any case to this group
+
+ :return: True or False
+ """
+ max_time = (sum([self._get_case_attr(x, "execution_time") for x in self.case_list])
+ < self.MAX_EXECUTION_TIME)
+ max_case = (len(self.case_list) < self.MAX_CASE)
+ return max_time and max_case
+
+ def add_case(self, case):
+ """
+ add case to current group
+
+ :param case: test case
+ :return: True if add succeed, else False
+ """
+ added = False
+ if self.accept_new_case():
+ for key in self.filters:
+ if self._get_case_attr(case, key) != self.filters[key]:
+ break
+ else:
+ self.case_list.append(case)
+ added = True
+ return added
+
+ def output(self):
+ """
+ output data for job configs
+
+ :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
+ """
+ output_data = {
+ "Filter": self.filters,
+ "CaseConfig": [{"name": self._get_case_attr(x, "name")} for x in self.case_list],
+ }
+ return output_data
+
+
+class AssignTest(object):
+ """
+ Auto assign tests to CI jobs.
+
+ :param test_case_path: path of test case file(s)
+ :param ci_config_file: path of ``.gitlab-ci.yml``
+ """
+ # subclass need to rewrite CI test job pattern, to filter all test jobs
+ CI_TEST_JOB_PATTERN = re.compile(r"^test_.+")
+ # by default we only run function in CI, as other tests could take long time
+ DEFAULT_FILTER = {
+ "category": "function",
+ "ignore": False,
+ }
+
+ def __init__(self, test_case_path, ci_config_file, case_group=Group):
+ self.test_case_path = test_case_path
+ self.test_cases = []
+ self.jobs = self._parse_gitlab_ci_config(ci_config_file)
+ self.case_group = case_group
+
+ def _parse_gitlab_ci_config(self, ci_config_file):
+
+ with open(ci_config_file, "r") as f:
+ ci_config = yaml.load(f)
+
+ job_list = list()
+ for job_name in ci_config:
+ if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
+ job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
+ job_list.sort(key=lambda x: x["name"])
+ return job_list
+
+ def _search_cases(self, test_case_path, case_filter=None):
+ """
+ :param test_case_path: path contains test case folder
+ :param case_filter: filter for test cases. the filter to use is default filter updated with case_filter param.
+ :return: filtered test case list
+ """
+ _case_filter = self.DEFAULT_FILTER.copy()
+ if case_filter:
+ _case_filter.update(case_filter)
+ test_methods = SearchCases.Search.search_test_cases(test_case_path)
+ return CaseConfig.filter_test_cases(test_methods, _case_filter)
+
+ def _group_cases(self):
+ """
+ separate all cases into groups according group rules. each group will be executed by one CI job.
+
+ :return: test case groups.
+ """
+ groups = []
+ for case in self.test_cases:
+ for group in groups:
+ # add to current group
+ if group.add_case(case):
+ break
+ else:
+ # create new group
+ groups.append(self.case_group(case))
+ return groups
+
+ @staticmethod
+ def _apply_bot_filter():
+ """
+ we support customize CI test with bot.
+ here we process from and return the filter which ``_search_cases`` accepts.
+
+ :return: filter for search test cases
+ """
+ bot_filter = os.getenv("BOT_CASE_FILTER")
+ if bot_filter:
+ bot_filter = json.loads(bot_filter)
+ else:
+ bot_filter = dict()
+ return bot_filter
+
+ def _apply_bot_test_count(self):
+ """
+ Bot could also pass test count.
+ If filtered cases need to be tested for several times, then we do duplicate them here.
+ """
+ test_count = os.getenv("BOT_TEST_COUNT")
+ if test_count:
+ test_count = int(test_count)
+ self.test_cases *= test_count
+
+ def assign_cases(self):
+ """
+ separate test cases to groups and assign test cases to CI jobs.
+
+ :raise AssertError: if failed to assign any case to CI job.
+ :return: None
+ """
+ failed_to_assign = []
+ case_filter = self._apply_bot_filter()
+ self.test_cases = self._search_cases(self.test_case_path, case_filter)
+ self._apply_bot_test_count()
+ test_groups = self._group_cases()
+ for group in test_groups:
+ for job in self.jobs:
+ if job.match_group(group):
+ job.assign_group(group)
+ break
+ else:
+ failed_to_assign.append(group)
+ if failed_to_assign:
+ console_log("Too many test cases vs jobs to run. Please add the following jobs to .gitlab-ci.yml with specific tags:", "R")
+ for group in failed_to_assign:
+ console_log("* Add job with: " + ",".join(group.ci_job_match_keys), "R")
+ raise RuntimeError("Failed to assign test case to CI jobs")
+
+ def output_configs(self, output_path):
+ """
+ :param output_path: path to output config files for each CI job
+ :return: None
+ """
+ if not os.path.exists(output_path):
+ os.makedirs(output_path)
+ for job in self.jobs:
+ job.output_config(output_path)
import TestCase
+def _convert_to_lower_case(item):
+ """
+ bot filter is always lower case string.
+ this function will convert to all string to lower case.
+ """
+ if isinstance(item, (tuple, list)):
+ output = [_convert_to_lower_case(v) for v in item]
+ elif isinstance(item, str):
+ output = item.lower()
+ else:
+ output = item
+ return output
+
+
def _filter_one_case(test_method, case_filter):
""" Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """
filter_result = True
- for key in case_filter:
+ # filter keys are lower case. Do map lower case keys with original keys.
+ key_mapping = {x.lower(): x for x in test_method.case_info.keys()}
+
+ for orig_key in case_filter:
+ key = key_mapping[orig_key]
if key in test_method.case_info:
# the filter key is both in case and filter
# we need to check if they match
- filter_item, accepted_item = case_filter[key], test_method.case_info[key]
+ filter_item = _convert_to_lower_case(case_filter[orig_key])
+ accepted_item = _convert_to_lower_case(test_method.case_info[key])
if isinstance(filter_item, (tuple, list)) \
and isinstance(accepted_item, (tuple, list)):
* if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple
* if both are list/tuple, then check if they have common item
2. if only case attribute or filter have the key, filter succeed
+ 3. will do case insensitive compare for string
for example, the following are match succeed scenarios
(the rule is symmetric, result is same if exchange values for user filter and case attribute):
def __init__(self, job, job_name):
super(Job, self).__init__(job)
self["name"] = job_name
+ self.tags = set(self["tags"])
def match_group(self, group):
"""
:return: True or False
"""
match_result = False
- for _ in range(1):
- if "case group" in self:
- # this job is already assigned
- break
- for value in group.filters.values():
- if value not in self["tags"]:
- break
- else:
- continue
- break
- else:
+ if "case group" not in self and group.ci_job_match_keys == self.tags:
+ # group not assigned and all tags match
match_result = True
return match_result
file_name = os.path.join(file_path, self["name"] + ".yml")
if "case group" in self:
with open(file_name, "w") as f:
- yaml.dump(self["case group"].output(), f)
+ yaml.dump(self["case group"].output(), f, default_flow_style=False)
if __name__ == '__main__':
- TinyFW.set_default_config(config_file="EnvConfigTemplate.yml")
+ TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml")
test_examples_protocol_https_request()
* pyserial
* pyyaml
- * xunitgen
+ * junit_xml
+ * netifaces
+ * matplotlib (if use Utility.LineChart)
To build document, we need to install ``Sphinx`` and ``sphinx-rtd-theme`` (you may replace this with your own theme).
if __name__ == '__main__':
- TinyFW.set_default_config(config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
+ TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
test_examples_protocol_https_request()
--- /dev/null
+pyserial
+pyyaml
+junit_xml
+netifaces
+matplotlib