--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+class for handling Test Apps. Currently it provides the following features:
+
+1. get SDK path
+2. get SDK tools
+3. parse application info from its path. for example:
+ * provide download info
+ * provide partition table info
+
+Test Apps should inherent from BaseApp class and overwrite the methods.
+"""
+import os
+import sys
+import time
+
+# timestamp used for calculate log folder name
+LOG_FOLDER_TIMESTAMP = time.time()
+
+
+class BaseApp(object):
+ """
+ Base Class for App.
+ Defines the mandatory methods that App need to implement.
+ Also implements some common methods.
+
+ :param app_path: the path for app.
+ """
+
+ def __init__(self, app_path):
+ pass
+
+ @classmethod
+ def get_sdk_path(cls):
+ """
+ get sdk path.
+
+ subclass must overwrite this method.
+
+ :return: abs sdk path
+ """
+ pass
+
+ @classmethod
+ def get_tools(cls):
+ """
+ get SDK related tools for applications
+
+ subclass must overwrite this method.
+
+ :return: tuple, abs path of each tool
+ """
+ pass
+
+ @classmethod
+ def get_log_folder(cls, test_suite_name):
+ """
+ By default log folder is ``${SDK_PATH}/TEST_LOGS/${test_suite_name}_${timestamp}``.
+
+ The log folder name is consist once start running, ensure all logs of will be put into the same folder.
+
+ :param test_suite_name: the test suite name, by default it's the base file name for main module
+ :return: the log folder path
+ """
+ if not test_suite_name:
+ test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
+ sdk_path = cls.get_sdk_path()
+ return os.path.join(sdk_path, "TEST_LOGS",
+ test_suite_name +
+ time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
+
+ def process_app_info(self):
+ """
+ parse built app info for DUTTool
+
+ subclass must overwrite this method.
+
+ :return: required info for specific DUTTool
+ """
+ pass
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Command line tool to assign example tests to CI test jobs.
+"""
+
+# TODO: Need to handle running examples on different chips
+import os
+import sys
+import re
+import argparse
+
+import yaml
+
+test_fw_path = os.getenv("TEST_FW_PATH")
+if test_fw_path:
+ sys.path.insert(0, test_fw_path)
+
+from Utility import CaseConfig, SearchCases, GitlabCIJob
+
+
+class Group(object):
+
+ MAX_EXECUTION_TIME = 30
+ MAX_CASE = 15
+ SORT_KEYS = ["env_tag"]
+
+ def __init__(self, case):
+ self.execution_time = 0
+ self.case_list = [case]
+ self.filters = dict(zip(self.SORT_KEYS, [case.case_info[x] for x in self.SORT_KEYS]))
+
+ def accept_new_case(self):
+ """
+ check if allowed to add any case to this group
+
+ :return: True or False
+ """
+ max_time = (sum([x.case_info["execution_time"] for x in self.case_list]) < self.MAX_EXECUTION_TIME)
+ max_case = (len(self.case_list) < self.MAX_CASE)
+ return max_time and max_case
+
+ def add_case(self, case):
+ """
+ add case to current group
+
+ :param case: test case
+ :return: True if add succeed, else False
+ """
+ added = False
+ if self.accept_new_case():
+ for key in self.filters:
+ if case.case_info[key] != self.filters[key]:
+ break
+ else:
+ self.case_list.append(case)
+ added = True
+ return added
+
+ def output(self):
+ """
+ output data for job configs
+
+ :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
+ """
+ output_data = {
+ "Filter": self.filters,
+ "CaseConfig": [{"name": x.case_info["name"]} for x in self.case_list],
+ }
+ return output_data
+
+
+class AssignTest(object):
+ """
+ Auto assign tests to CI jobs.
+
+ :param test_case: path of test case file(s)
+ :param ci_config_file: path of ``.gitlab-ci.yml``
+ """
+
+ CI_TEST_JOB_PATTERN = re.compile(r"^example_test_.+")
+
+ def __init__(self, test_case, ci_config_file):
+ self.test_cases = self._search_cases(test_case)
+ self.jobs = self._parse_gitlab_ci_config(ci_config_file)
+
+ def _parse_gitlab_ci_config(self, ci_config_file):
+
+ with open(ci_config_file, "r") as f:
+ ci_config = yaml.load(f)
+
+ job_list = list()
+ for job_name in ci_config:
+ if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
+ job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
+ return job_list
+
+ @staticmethod
+ def _search_cases(test_case, case_filter=None):
+ """
+ :param test_case: path contains test case folder
+ :param case_filter: filter for test cases
+ :return: filtered test case list
+ """
+ test_methods = SearchCases.Search.search_test_cases(test_case)
+ return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
+
+ def _group_cases(self):
+ """
+ separate all cases into groups according group rules. each group will be executed by one CI job.
+
+ :return: test case groups.
+ """
+ groups = []
+ for case in self.test_cases:
+ for group in groups:
+ # add to current group
+ if group.add_case(case):
+ break
+ else:
+ # create new group
+ groups.append(Group(case))
+ return groups
+
+ def assign_cases(self):
+ """
+ separate test cases to groups and assign test cases to CI jobs.
+
+ :raise AssertError: if failed to assign any case to CI job.
+ :return: None
+ """
+ failed_to_assign = []
+ test_groups = self._group_cases()
+ for group in test_groups:
+ for job in self.jobs:
+ if job.match_group(group):
+ job.assign_group(group)
+ break
+ else:
+ failed_to_assign.append(group)
+ assert not failed_to_assign
+
+ def output_configs(self, output_path):
+ """
+
+ :param output_path: path to output config files for each CI job
+ :return: None
+ """
+ if not os.path.exists(output_path):
+ os.makedirs(output_path)
+ for job in self.jobs:
+ job.output_config(output_path)
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument("test_case",
+ help="test case folder or file")
+ parser.add_argument("ci_config_file",
+ help="gitlab ci config file")
+ parser.add_argument("output_path",
+ help="output path of config files")
+ args = parser.parse_args()
+
+ assign_test = AssignTest(args.test_case, args.ci_config_file)
+ assign_test.assign_cases()
+ assign_test.output_configs(args.output_path)
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+DUT provides 3 major groups of features:
+
+* DUT port feature, provide basic open/close/read/write features
+* DUT tools, provide extra methods to control the device, like download and start app
+* DUT expect method, provide features for users to check DUT outputs
+
+The current design of DUT have 3 classes for one DUT: BaseDUT, DUTPort, DUTTool.
+
+* BaseDUT class:
+ * defines methods DUT port and DUT tool need to overwrite
+ * provide the expect methods and some other methods based on DUTPort
+* DUTPort class:
+ * inherent from BaseDUT class
+ * implements the port features by overwriting port methods defined in BaseDUT
+* DUTTool class:
+ * inherent from one of the DUTPort class
+ * implements the tools features by overwriting tool methods defined in BaseDUT
+ * could add some new methods provided by the tool
+
+This module implements the BaseDUT class and one of the port class SerialDUT.
+User should implement their DUTTool classes.
+If they using different port then need to implement their DUTPort class as well.
+"""
+
+import time
+import re
+import threading
+import copy
+import sys
+import functools
+
+import serial
+from serial.tools import list_ports
+
+if sys.version_info[0] == 2:
+ import Queue as _queue
+else:
+ import queue as _queue
+
+
+class ExpectTimeout(ValueError):
+ """ timeout for expect method """
+ pass
+
+
+class UnsupportedExpectItem(ValueError):
+ """ expect item not supported by the expect method """
+ pass
+
+
+def _expect_lock(func):
+ @functools.wraps(func)
+ def handler(self, *args, **kwargs):
+ with self.expect_lock:
+ ret = func(self, *args, **kwargs)
+ return ret
+ return handler
+
+
+class _DataCache(_queue.Queue):
+ """
+ Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
+ """
+
+ def __init__(self, maxsize=0):
+ _queue.Queue.__init__(self, maxsize=maxsize)
+ self.data_cache = str()
+
+ def get_data(self, timeout=0):
+ """
+ get a copy of data from cache.
+
+ :param timeout: timeout for waiting new queue item
+ :return: copy of data cache
+ """
+ # make sure timeout is non-negative
+ if timeout < 0:
+ timeout = 0
+
+ try:
+ data = self.get(timeout=timeout)
+ if isinstance(data, bytes):
+ # convert bytes to string
+ try:
+ data = data.decode("utf-8", "ignore")
+ except UnicodeDecodeError:
+ data = data.decode("iso8859-1",)
+ self.data_cache += data
+ except _queue.Empty:
+ # don't do anything when on update for cache
+ pass
+ return copy.deepcopy(self.data_cache)
+
+ def flush(self, index=0xFFFFFFFF):
+ """
+ flush data from cache.
+
+ :param index: if < 0 then don't do flush, otherwise flush data before index
+ :return: None
+ """
+ # first add data in queue to cache
+ self.get_data()
+
+ if index > 0:
+ self.data_cache = self.data_cache[index:]
+
+
+class _RecvThread(threading.Thread):
+
+ def __init__(self, read, data_cache):
+ super(_RecvThread, self).__init__()
+ self.exit_event = threading.Event()
+ self.setDaemon(True)
+ self.read = read
+ self.data_cache = data_cache
+
+ def run(self):
+ while not self.exit_event.isSet():
+ data = self.read(1000)
+ if data:
+ self.data_cache.put(data)
+
+ def exit(self):
+ self.exit_event.set()
+ self.join()
+
+
+class BaseDUT(object):
+ """
+ :param name: application defined name for port
+ :param port: comport name, used to create DUT port
+ :param log_file: log file name
+ :param app: test app instance
+ :param kwargs: extra args for DUT to create ports
+ """
+
+ DEFAULT_EXPECT_TIMEOUT = 5
+
+ def __init__(self, name, port, log_file, app, **kwargs):
+
+ self.expect_lock = threading.Lock()
+ self.name = name
+ self.port = port
+ self.log_file = log_file
+ self.app = app
+ self.data_cache = _DataCache()
+ self.receive_thread = None
+ # open and start during init
+ self.open()
+
+ def __str__(self):
+ return "DUT({}: {})".format(self.name, str(self.port))
+
+ # define for methods need to be overwritten by Port
+ @classmethod
+ def list_available_ports(cls):
+ """
+ list all available ports.
+
+ subclass (port) must overwrite this method.
+
+ :return: list of available comports
+ """
+ pass
+
+ def _port_open(self):
+ """
+ open the port.
+
+ subclass (port) must overwrite this method.
+
+ :return: None
+ """
+ pass
+
+ def _port_read(self, size=1):
+ """
+ read form port. This method should not blocking for long time, otherwise receive thread can not exit.
+
+ subclass (port) must overwrite this method.
+
+ :param size: max size to read.
+ :return: read data.
+ """
+ pass
+
+ def _port_write(self, data):
+ """
+ write to port.
+
+ subclass (port) must overwrite this method.
+
+ :param data: data to write
+ :return: None
+ """
+ pass
+
+ def _port_close(self):
+ """
+ close port.
+
+ subclass (port) must overwrite this method.
+
+ :return: None
+ """
+ pass
+
+ # methods that need to be overwritten by Tool
+ @classmethod
+ def confirm_dut(cls, port, app, **kwargs):
+ """
+ confirm if it's a DUT, usually used by auto detecting DUT in by Env config.
+
+ subclass (tool) must overwrite this method.
+
+ :param port: comport
+ :param app: app instance
+ :return: True or False
+ """
+ pass
+
+ def start_app(self):
+ """
+ usually after we got DUT, we need to do some extra works to let App start.
+ For example, we need to reset->download->reset to let IDF application start on DUT.
+
+ subclass (tool) must overwrite this method.
+
+ :return: None
+ """
+ pass
+
+ # methods that features raw port methods
+ def open(self):
+ """
+ open port and create thread to receive data.
+
+ :return: None
+ """
+ self._port_open()
+ self.receive_thread = _RecvThread(self._port_read, self.data_cache)
+ self.receive_thread.start()
+
+ def close(self):
+ """
+ close receive thread and then close port.
+
+ :return: None
+ """
+ if self.receive_thread:
+ self.receive_thread.exit()
+ self._port_close()
+
+ def write(self, data, eol="\r\n", flush=True):
+ """
+ :param data: data
+ :param eol: end of line pattern.
+ :param flush: if need to flush received data cache before write data.
+ usually we need to flush data before write,
+ make sure processing outputs generated by wrote.
+ :return: None
+ """
+ # do flush before write
+ if flush:
+ self.data_cache.flush()
+ # do write if cache
+ if data:
+ self._port_write(data + eol if eol else data)
+
+ @_expect_lock
+ def read(self, size=0xFFFFFFFF):
+ """
+ read(size=0xFFFFFFFF)
+ read raw data. NOT suggested to use this method.
+ Only use it if expect method doesn't meet your requirement.
+
+ :param size: read size. default read all data
+ :return: read data
+ """
+ data = self.data_cache.get_data(0)[:size]
+ self.data_cache.flush(size)
+ return data
+
+ # expect related methods
+
+ @staticmethod
+ def _expect_str(data, pattern):
+ """
+ protected method. check if string is matched in data cache.
+
+ :param data: data to process
+ :param pattern: string
+ :return: pattern if match succeed otherwise None
+ """
+ index = data.find(pattern)
+ if index != -1:
+ ret = pattern
+ index += len(pattern)
+ else:
+ ret = None
+ return ret, index
+
+ @staticmethod
+ def _expect_re(data, pattern):
+ """
+ protected method. check if re pattern is matched in data cache
+
+ :param data: data to process
+ :param pattern: compiled RegEx pattern
+ :return: match groups if match succeed otherwise None
+ """
+ ret = None
+ match = pattern.search(data)
+ if match:
+ ret = match.groups()
+ index = match.end()
+ else:
+ index = -1
+ return ret, index
+
+ EXPECT_METHOD = [
+ [type(re.compile("")), "_expect_re"],
+ [str, "_expect_str"],
+ ]
+
+ def _get_expect_method(self, pattern):
+ """
+ protected method. get expect method according to pattern type.
+
+ :param pattern: expect pattern, string or compiled RegEx
+ :return: ``_expect_str`` or ``_expect_re``
+ """
+ for expect_method in self.EXPECT_METHOD:
+ if isinstance(pattern, expect_method[0]):
+ method = expect_method[1]
+ break
+ else:
+ raise UnsupportedExpectItem()
+ return self.__getattribute__(method)
+
+ @_expect_lock
+ def expect(self, pattern, timeout=DEFAULT_EXPECT_TIMEOUT):
+ """
+ expect(pattern, timeout=DEFAULT_EXPECT_TIMEOUT)
+ expect received data on DUT match the pattern. will raise exception when expect timeout.
+
+ :raise ExpectTimeout: failed to find the pattern before timeout
+ :raise UnsupportedExpectItem: pattern is not string or compiled RegEx
+
+ :param pattern: string or compiled RegEx(string pattern)
+ :param timeout: timeout for expect
+ :return: string if pattern is string; matched groups if pattern is RegEx
+ """
+ method = self._get_expect_method(pattern)
+
+ # non-blocking get data for first time
+ data = self.data_cache.get_data(0)
+ start_time = time.time()
+ while True:
+ ret, index = method(data, pattern)
+ if ret is not None or time.time() - start_time > timeout:
+ self.data_cache.flush(index)
+ break
+ # wait for new data from cache
+ data = self.data_cache.get_data(time.time() + timeout - start_time)
+
+ if ret is None:
+ raise ExpectTimeout(self.name + ": " + str(pattern))
+ return ret
+
+ def _expect_multi(self, expect_all, expect_item_list, timeout):
+ """
+ protected method. internal logical for expect multi.
+
+ :param expect_all: True or False, expect all items in the list or any in the list
+ :param expect_item_list: expect item list
+ :param timeout: timeout
+ :return: None
+ """
+ def process_expected_item(item_raw):
+ # convert item raw data to standard dict
+ item = {
+ "pattern": item_raw[0] if isinstance(item_raw, tuple) else item_raw,
+ "method": self._get_expect_method(item_raw[0] if isinstance(item_raw, tuple)
+ else item_raw),
+ "callback": item_raw[1] if isinstance(item_raw, tuple) else None,
+ "index": -1,
+ "ret": None,
+ }
+ return item
+
+ expect_items = [process_expected_item(x) for x in expect_item_list]
+
+ # non-blocking get data for first time
+ data = self.data_cache.get_data(0)
+
+ start_time = time.time()
+ matched_expect_items = list()
+ while True:
+ for expect_item in expect_items:
+ if expect_item not in matched_expect_items:
+ # exclude those already matched
+ expect_item["ret"], expect_item["index"] = \
+ expect_item["method"](data, expect_item["pattern"])
+ if expect_item["ret"] is not None:
+ # match succeed for one item
+ matched_expect_items.append(expect_item)
+ break
+
+ # if expect all, then all items need to be matched,
+ # else only one item need to matched
+ if expect_all:
+ match_succeed = (matched_expect_items == expect_items)
+ else:
+ match_succeed = True if matched_expect_items else False
+
+ if time.time() - start_time > timeout or match_succeed:
+ break
+ else:
+ data = self.data_cache.get_data(time.time() + timeout - start_time)
+
+ if match_succeed:
+ # do callback and flush matched data cache
+ slice_index = -1
+ for expect_item in matched_expect_items:
+ # trigger callback
+ if expect_item["callback"]:
+ expect_item["callback"](expect_item["ret"])
+ slice_index = max(slice_index, expect_item["index"])
+ # flush already matched data
+ self.data_cache.flush(slice_index)
+ else:
+ raise ExpectTimeout(self.name + ": " + str(expect_items))
+
+ @_expect_lock
+ def expect_any(self, *expect_items, **timeout):
+ """
+ expect_any(*expect_items, timeout=DEFAULT_TIMEOUT)
+ expect any of the patterns.
+ will call callback (if provided) if pattern match succeed and then return.
+ will pass match result to the callback.
+
+ :raise ExpectTimeout: failed to match any one of the expect items before timeout
+ :raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
+
+ :arg expect_items: one or more expect items.
+ string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
+ :keyword timeout: timeout for expect
+ :return: None
+ """
+ # to be compatible with python2
+ # in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
+ if "timeout" not in timeout:
+ timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
+ return self._expect_multi(False, expect_items, **timeout)
+
+ @_expect_lock
+ def expect_all(self, *expect_items, **timeout):
+ """
+ expect_all(*expect_items, timeout=DEFAULT_TIMEOUT)
+ expect all of the patterns.
+ will call callback (if provided) if all pattern match succeed and then return.
+ will pass match result to the callback.
+
+ :raise ExpectTimeout: failed to match all of the expect items before timeout
+ :raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
+
+ :arg expect_items: one or more expect items.
+ string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
+ :keyword timeout: timeout for expect
+ :return: None
+ """
+ # to be compatible with python2
+ # in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
+ if "timeout" not in timeout:
+ timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
+ return self._expect_multi(True, expect_items, **timeout)
+
+
+class SerialDUT(BaseDUT):
+ """ serial with logging received data feature """
+
+ DEFAULT_UART_CONFIG = {
+ "baudrate": 115200,
+ "bytesize": serial.EIGHTBITS,
+ "parity": serial.PARITY_NONE,
+ "stopbits": serial.STOPBITS_ONE,
+ "timeout": 0.05,
+ "xonxoff": False,
+ "rtscts": False,
+ }
+
+ def __init__(self, name, port, log_file, app, **kwargs):
+ self.port_inst = None
+ self.serial_configs = self.DEFAULT_UART_CONFIG.copy()
+ self.serial_configs.update(kwargs)
+ super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
+
+ @staticmethod
+ def _format_data(data):
+ """
+ format data for logging. do decode and add timestamp.
+
+ :param data: raw data from read
+ :return: formatted data (str)
+ """
+ timestamp = time.time()
+ timestamp = "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
+ str(timestamp % 1)[2:5])
+ try:
+ formatted_data = "[{}]:\r\n{}\r\n".format(timestamp, data.decode("utf-8", "ignore"))
+ except UnicodeDecodeError:
+ # if utf-8 fail, use iso-8859-1 (single char codec with range 0-255)
+ formatted_data = "[{}]:\r\n{}\r\n".format(timestamp, data.decode("iso8859-1",))
+ return formatted_data
+
+ def _port_open(self):
+ self.port_inst = serial.Serial(self.port, **self.serial_configs)
+
+ def _port_close(self):
+ self.port_inst.close()
+
+ def _port_read(self, size=1):
+ data = self.port_inst.read(size)
+ if data:
+ with open(self.log_file, "a+") as _log_file:
+ _log_file.write(self._format_data(data))
+ return data
+
+ def _port_write(self, data):
+ self.port_inst.write(data)
+
+ @classmethod
+ def list_available_ports(cls):
+ return [x.device for x in list_ports.comports()]
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Test Env, manages DUT, App and EnvConfig, interface for test cases to access these components """
+import os
+import threading
+import functools
+
+import EnvConfig
+
+
+def _synced(func):
+ @functools.wraps(func)
+ def decorator(self, *args, **kwargs):
+ with self.lock:
+ ret = func(self, *args, **kwargs)
+ return ret
+
+ decorator.__doc__ = func.__doc__
+ return decorator
+
+
+class Env(object):
+ """
+ test env, manages DUTs and env configs.
+
+ :keyword app: class for default application
+ :keyword dut: class for default DUT
+ :keyword env_tag: test env tag, used to select configs from env config file
+ :keyword env_config_file: test env config file path
+ :keyword test_name: test suite name, used when generate log folder name
+ """
+
+ def __init__(self,
+ app=None,
+ dut=None,
+ env_tag=None,
+ env_config_file=None,
+ test_name=None,
+ **kwargs):
+ self.app_cls = app
+ self.default_dut_cls = dut
+ self.config = EnvConfig.Config(env_config_file, env_tag)
+ self.log_path = self.app_cls.get_log_folder(test_name)
+ if not os.path.exists(self.log_path):
+ os.makedirs(self.log_path)
+
+ self.allocated_duts = dict()
+ self.lock = threading.RLock()
+
+ @_synced
+ def get_dut(self, dut_name, app_path, dut_class=None, app_class=None):
+ """
+ get_dut(dut_name, app_path, dut_class=None, app_class=None)
+
+ :param dut_name: user defined name for DUT
+ :param app_path: application path, app instance will use this path to process application info
+ :param dut_class: dut class, if not specified will use default dut class of env
+ :param app_class: app class, if not specified will use default app of env
+ :return: dut instance
+ """
+ if dut_name in self.allocated_duts:
+ dut = self.allocated_duts[dut_name]["dut"]
+ else:
+ if dut_class is None:
+ dut_class = self.default_dut_cls
+ if app_class is None:
+ app_class = self.app_cls
+ app_inst = app_class(app_path)
+ try:
+ port = self.config.get_variable(dut_name)
+ except ValueError:
+ # try to auto detect ports
+ allocated_ports = [self.allocated_duts[x]["port"] for x in self.allocated_duts]
+ available_ports = dut_class.list_available_ports()
+ for port in available_ports:
+ if port not in allocated_ports:
+ if dut_class.confirm_dut(port, app_inst):
+ break
+ else:
+ port = None
+ if port:
+ try:
+ dut_config = self.get_variable(dut_name + "_port_config")
+ except ValueError:
+ dut_config = dict()
+ dut = self.default_dut_cls(dut_name, port,
+ os.path.join(self.log_path, dut_name + ".log"),
+ app_inst,
+ **dut_config)
+ self.allocated_duts[dut_name] = {"port": port, "dut": dut}
+ else:
+ raise ValueError("Failed to get DUT")
+ return dut
+
+ @_synced
+ def close_dut(self, dut_name):
+ """
+ close_dut(dut_name)
+ close one DUT by name if DUT name is valid (the name used by ``get_dut``). otherwise will do nothing.
+
+ :param dut_name: user defined name for DUT
+ :return: None
+ """
+ try:
+ dut = self.allocated_duts.pop(dut_name)["dut"]
+ dut.close()
+ except KeyError:
+ pass
+
+ @_synced
+ def get_variable(self, variable_name):
+ """
+ get_variable(variable_name)
+ get variable from config file. If failed then try to auto-detected it.
+
+ :param variable_name: name of the variable
+ :return: value of variable if successfully found. otherwise None.
+ """
+ return self.config.get_variable(variable_name)
+
+ @_synced
+ def get_pc_nic_info(self, nic_name="pc_nic"):
+ """
+ get_pc_nic_info(nic_name="pc_nic")
+ try to get nic info (ip address, ipv6 address, mac address)
+
+ :param nic_name: pc nic name. allows passing variable name, nic name value or omitted (to get default nic info).
+ :return: a dict of address ("ipv4", "ipv6", "mac") if successfully found. otherwise None.
+ """
+ # TODO: need to implement auto get nic info method
+ return self.config.get_variable("nic_info/" + nic_name)
+
+ @_synced
+ def close(self):
+ """
+ close()
+ close all DUTs of the Env.
+
+ :return: None
+ """
+ for dut_name in self.allocated_duts:
+ dut = self.allocated_duts[dut_name]["dut"]
+ dut.close()
+ self.allocated_duts = dict()
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+The test env could change when we running test from different computers.
+Test env config provide ``get_variable`` method to allow user get test environment related variables.
+It will first try to get variable from config file.
+If failed, then it will try to auto detect (Not supported yet).
+
+Config file format is yaml. it's a set of key-value pair. The following is an example of config file::
+
+ Example_WIFI:
+ ap_ssid: "myssid"
+ ap_password: "mypassword"
+ Example_ShieldBox:
+ attenuator_port: "/dev/ttyUSB2"
+ ap_ssid: "myssid"
+ ap_password: "mypassword"
+
+It will first define the env tag for each environment, then add its key-value pairs.
+This will prevent test cases from getting configs from other env when there're configs for multiple env in one file.
+"""
+
+import yaml
+
+
+class Config(object):
+ """ Test Env Config """
+
+ def __init__(self, config_file, env_tag):
+ self.configs = self.load_config_file(config_file, env_tag)
+
+ @staticmethod
+ def load_config_file(config_file, env_name):
+ """
+ load configs from config file.
+
+ :param config_file: config file path
+ :param env_name: env tag name
+ :return: configs for the test env
+ """
+ try:
+ with open(config_file) as f:
+ configs = yaml.load(f)[env_name]
+ except (OSError, TypeError):
+ configs = dict()
+ return configs
+
+ def get_variable(self, variable_name):
+ """
+ first try to get from config file. if not found, try to auto detect the variable.
+
+ :param variable_name: name of variable
+ :return: value or None
+ """
+ try:
+ value = self.configs[variable_name]
+ except KeyError:
+ #TODO: to support auto get variable here
+ value = None
+ if value is None:
+ raise ValueError("Failed to get variable")
+ return value
--- /dev/null
+.external_ap: &external_ap
+ ap_ssid: "myssid"
+ ap_password: "mypassword"
+
+Examples_WIFI:
+ <<: external_ap
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" IDF Test Applications """
+import subprocess
+
+import os
+import App
+
+
+class IDFApp(App.BaseApp):
+ """
+ Implements common esp-idf application behavior.
+ idf applications should inherent from this class and overwrite method get_binary_path.
+ """
+
+ IDF_DOWNLOAD_CONFIG_FILE = "download.config"
+
+ def __init__(self, app_path):
+ super(IDFApp, self).__init__(app_path)
+ self.idf_path = self.get_sdk_path()
+ self.binary_path = self.get_binary_path(app_path)
+ assert os.path.exists(self.binary_path)
+ assert self.IDF_DOWNLOAD_CONFIG_FILE in os.listdir(self.binary_path)
+ self.esptool, self.partition_tool = self.get_tools()
+
+ @classmethod
+ def get_sdk_path(cls):
+ idf_path = os.getenv("IDF_PATH")
+ assert idf_path
+ assert os.path.exists(idf_path)
+ return idf_path
+
+ @classmethod
+ def get_tools(cls):
+ idf_path = cls.get_sdk_path()
+ # get esptool and partition tool for esp-idf
+ esptool = os.path.join(idf_path, "components",
+ "esptool_py", "esptool", "esptool.py")
+ partition_tool = os.path.join(idf_path, "components",
+ "partition_table", "gen_esp32part.py")
+ assert os.path.exists(esptool) and os.path.exists(partition_tool)
+ return esptool, partition_tool
+
+ def get_binary_path(self, app_path):
+ """
+ get binary path according to input app_path.
+
+ subclass must overwrite this method.
+
+ :param app_path: path of application
+ :return: abs app binary path
+ """
+ pass
+
+ def process_arg(self, arg):
+ """
+ process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs.
+ """
+ if ".bin" in arg:
+ ret = os.path.join(self.binary_path, arg)
+ else:
+ ret = arg
+ return ret.strip("\r\n ")
+
+ def process_app_info(self):
+ """
+ get app download config and partition info from a specific app path
+
+ :return: download config, partition info
+ """
+ with open(os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE), "r") as f:
+ configs = f.read().split(" ")
+
+ download_configs = ["--chip", "auto", "--before", "default_reset",
+ "--after", "hard_reset", "write_flash", "-z"]
+ download_configs += [self.process_arg(x) for x in configs]
+
+ # handle partition table
+ for partition_file in download_configs:
+ if "partition" in partition_file:
+ partition_file = os.path.join(self.binary_path, partition_file)
+ break
+ else:
+ raise ValueError("No partition table found for IDF binary path: {}".format(self.binary_path))
+
+ process = subprocess.Popen(["python", self.partition_tool, partition_file],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ raw_data = process.stdout.read()
+ if isinstance(raw_data, bytes):
+ raw_data = raw_data.decode()
+ partition_table = dict()
+
+ for line in raw_data.splitlines():
+ if line[0] != "#":
+ try:
+ _name, _type, _subtype, _offset, _size, _flags = line.split(",")
+ if _size[-1] == "K":
+ _size = int(_size[:-1]) * 1024
+ elif _size[-1] == "M":
+ _size = int(_size[:-1]) * 1024 * 1024
+ else:
+ _size = int(_size)
+ except ValueError:
+ continue
+ partition_table[_name] = {
+ "type": _type,
+ "subtype": _subtype,
+ "offset": _offset,
+ "size": _size,
+ "flags": _flags
+ }
+ return download_configs, partition_table
+
+
+class Example(IDFApp):
+ def get_binary_path(self, app_path):
+ # build folder of example path
+ path = os.path.join(self.idf_path, app_path, "build")
+ if not os.path.exists(path):
+ # search for CI build folders
+ app = os.path.basename(app_path)
+ example_path = os.path.join(self.idf_path, "build_examples", "example_builds")
+ for dirpath, dirnames, files in os.walk(example_path):
+ if dirnames:
+ if dirnames[0] == app:
+ path = os.path.join(example_path, dirpath, dirnames[0], "build")
+ break
+ else:
+ raise OSError("Failed to find example binary")
+ return path
+
+
+class UT(IDFApp):
+ def get_binary_path(self, app_path):
+ if app_path:
+ # specified path, join it and the idf path
+ path = os.path.join(self.idf_path, app_path)
+ else:
+ path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
+ return path
+
+
+class SSC(IDFApp):
+ def get_binary_path(self, app_path):
+ # TODO: to implement SSC get binary path
+ return app_path
+
+
+class AT(IDFApp):
+ def get_binary_path(self, app_path):
+ # TODO: to implement AT get binary path
+ return app_path
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" DUT for IDF applications """
+import os
+import re
+import subprocess
+import functools
+
+import DUT
+
+
+class IDFToolError(OSError):
+ pass
+
+
+def _tool_method(func):
+ """ close port, execute tool method and then reopen port """
+ @functools.wraps(func)
+ def handler(self, *args, **kwargs):
+ self.close()
+ ret = func(self, *args, **kwargs)
+ self.open()
+ return ret
+ return handler
+
+
+class IDFDUT(DUT.SerialDUT):
+ """ IDF DUT, extends serial with ESPTool methods """
+
+ CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
+
+ def __init__(self, name, port, log_file, app, **kwargs):
+ self.download_config, self.partition_table = app.process_app_info()
+ super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
+
+ @classmethod
+ def get_chip(cls, app, port):
+ """
+ get chip id via esptool
+
+ :param app: application instance (to get tool)
+ :param port: comport
+ :return: chip ID or None
+ """
+ try:
+ output = subprocess.check_output(["python", app.esptool, "--port", port, "chip_id"])
+ except subprocess.CalledProcessError:
+ output = bytes()
+ if isinstance(output, bytes):
+ output = output.decode()
+ chip_type = cls.CHIP_TYPE_PATTERN.search(output)
+ return chip_type.group(1) if chip_type else None
+
+ @classmethod
+ def confirm_dut(cls, port, app, **kwargs):
+ return cls.get_chip(app, port) is not None
+
+ @_tool_method
+ def start_app(self):
+ """
+ download and start app.
+
+ :return: None
+ """
+ retry_baud_rates = ["921600", "115200"]
+ error = IDFToolError()
+ for baud_rate in retry_baud_rates:
+ try:
+ subprocess.check_output(["python", self.app.esptool,
+ "--port", self.port, "--baud", baud_rate]
+ + self.download_config)
+ break
+ except subprocess.CalledProcessError as error:
+ continue
+ else:
+ raise error
+
+ @_tool_method
+ def reset(self):
+ """
+ reset DUT with esptool
+
+ :return: None
+ """
+ subprocess.check_output(["python", self.app.esptool, "--port", self.port, "run"])
+
+ @_tool_method
+ def dump_flush(self, output_file, **kwargs):
+ """
+ dump flush
+
+ :param output_file: output file name, if relative path, will use sdk path as base path.
+ :keyword partition: partition name, dump the partition.
+ ``partition`` is preferred than using ``address`` and ``size``.
+ :keyword address: dump from address (need to be used with size)
+ :keyword size: dump size (need to be used with address)
+ :return: None
+ """
+ if os.path.isabs(output_file) is False:
+ output_file = os.path.relpath(output_file, self.app.get_log_folder())
+ if "partition" in kwargs:
+ partition = self.partition_table[kwargs["partition"]]
+ _address = partition["offset"]
+ _size = partition["size"]
+ elif "address" in kwargs and "size" in kwargs:
+ _address = kwargs["address"]
+ _size = kwargs["size"]
+ else:
+ raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
+ subprocess.check_output(
+ ["python", self.app.esptool, "--port", self.port, "--baud", "921600",
+ "--before", "default_reset", "--after", "hard_reset", "read_flash",
+ _address, _size, output_file]
+ )
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import TinyFW
+from IDF.IDFApp import Example, UT
+from IDF.IDFDUT import IDFDUT
+
+
+def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32",
+ module="examples", execution_time=1,
+ **kwargs):
+ """
+ decorator for testing idf examples (with default values for some keyword args).
+
+ :param app: test application class
+ :param dut: dut class
+ :param chip: chip supported, string or tuple
+ :param module: module, string
+ :param execution_time: execution time in minutes, int
+ :param kwargs: other keyword args
+ :return: test method
+ """
+ # not use partial function as define as function support auto generating document
+ return TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
+ execution_time=execution_time, **kwargs)
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Command line interface to run test cases from a given path.
+
+* search and run test cases of a given path
+* config file which support to filter test cases and passing data to test case
+
+Use ``python Runner.py test_case_path -c config_file -e env_config_file`` to run test cases.
+
+"""
+import os
+import sys
+import argparse
+import threading
+
+import TinyFW
+from Utility import SearchCases, CaseConfig
+
+
+class Runner(threading.Thread):
+ """
+ :param test_case: test case file or folder
+ :param case_config: case config file, allow to filter test cases and pass data to test case
+ :param env_config_file: env config file
+ """
+
+ def __init__(self, test_case, case_config, env_config_file=None):
+ super(Runner, self).__init__()
+ self.setDaemon(True)
+ test_methods = SearchCases.Search.search_test_cases(test_case)
+ self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
+ self.test_result = True
+ if case_config:
+ test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
+ else:
+ test_suite_name = "TestRunner"
+ TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
+
+ def run(self):
+ for case in self.test_cases:
+ self.test_result = self.test_result and case.run()
+
+
+if __name__ == '__main__':
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("test_case",
+ help="test case folder or file")
+ parser.add_argument("--case_config", "-c", default=None,
+ help="case filter/config file")
+ parser.add_argument("--env_config_file", "-e", default=None,
+ help="test env config file")
+ args = parser.parse_args()
+
+ runner = Runner(args.test_case, args.case_config, args.env_config_file)
+ runner.start()
+
+ while True:
+ try:
+ runner.join(1)
+ if not runner.isAlive():
+ break
+ except KeyboardInterrupt:
+ print("exit by Ctrl-C")
+ break
+ if not runner.test_result:
+ sys.exit(1)
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import yaml
+
+
+class TestCase(object):
+ """
+ Test Case Object, mainly used with runner.
+ runner can parse all test cases from a given path, set data and config for test case in prepare stage.
+ TestCase instance will record these data, provide run method to let runner execute test case.
+
+ :param test_method: test function
+ :param extra_data: data passed to test function
+ :param overwrite_args: kwargs that overwrite original test case configs
+ """
+ DEFAULT_CASE_DOC = dict()
+
+ def __init__(self, test_method, extra_data, **overwrite_args):
+ self.test_method = test_method
+ self.extra_data = extra_data
+ self.overwrite_args = overwrite_args
+
+ def run(self):
+ """ execute the test case """
+ return self.test_method(self.extra_data, **self.overwrite_args)
+
+ def document(self):
+ """
+ generate test case document.
+ parse the case doc with yaml parser and update to original case attributes.
+
+ :return: case document, dict of case attributes and values
+ """
+ doc_string = self.test_method.__doc__
+ try:
+ doc = yaml.load(doc_string)
+ except (AttributeError, OSError, UnicodeDecodeError):
+ doc = self.DEFAULT_CASE_DOC
+ doc.update(self.test_method.env_args)
+ doc.update(self.test_method.accepted_filter)
+ return doc
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Interface for test cases. """
+import sys
+import os
+import time
+import traceback
+import inspect
+import functools
+
+import xunitgen
+
+import Env
+import DUT
+import App
+
+
+XUNIT_FILE_NAME = "XUNIT_RESULT.xml"
+XUNIT_RECEIVER = xunitgen.EventReceiver()
+XUNIT_DEFAULT_TEST_SUITE = "test-suite"
+
+
+_COLOR_CODES = {
+ "white": '\033[0m',
+ "red": '\033[31m',
+ "green": '\033[32m',
+ "orange": '\033[33m',
+ "blue": '\033[34m',
+ "purple": '\033[35m',
+ "W": '\033[0m',
+ "R": '\033[31m',
+ "G": '\033[32m',
+ "O": '\033[33m',
+ "B": '\033[34m',
+ "P": '\033[35m'
+}
+
+
+def console_log(data, color="white"):
+ """
+ log data to console.
+ (if not flush console log, Gitlab-CI won't update logs during job execution)
+
+ :param data: data content
+ :param color: color
+ """
+ if color not in _COLOR_CODES:
+ color = "white"
+ color_codes = _COLOR_CODES[color]
+ print(color_codes + data)
+ if color not in ["white", "W"]:
+ # reset color to white for later logs
+ print(_COLOR_CODES["white"] + "\r")
+ sys.stdout.flush()
+
+
+class DefaultEnvConfig(object):
+ """
+ default test configs. There're 3 places to set configs, priority is (high -> low):
+
+ 1. overwrite set by caller of test method
+ 2. values set by test_method decorator
+ 3. default env config get from this class
+ """
+ DEFAULT_CONFIG = {
+ "app": App.BaseApp,
+ "dut": DUT.BaseDUT,
+ "env_tag": "default",
+ "env_config_file": None,
+ "test_suite_name": None,
+ }
+
+ @classmethod
+ def set_default_config(cls, **kwargs):
+ """
+ :param kwargs: configs need to be updated
+ :return: None
+ """
+ cls.DEFAULT_CONFIG.update(kwargs)
+
+ @classmethod
+ def get_default_config(cls):
+ """
+ :return: current default config
+ """
+ return cls.DEFAULT_CONFIG.copy()
+
+
+set_default_config = DefaultEnvConfig.set_default_config
+get_default_config = DefaultEnvConfig.get_default_config
+
+
+class TestResult(object):
+ TEST_RESULT = {
+ "pass": [],
+ "fail": [],
+ }
+
+ @classmethod
+ def get_failed_cases(cls):
+ """
+ :return: failed test cases
+ """
+ return cls.TEST_RESULT["fail"]
+
+ @classmethod
+ def get_passed_cases(cls):
+ """
+ :return: passed test cases
+ """
+ return cls.TEST_RESULT["pass"]
+
+ @classmethod
+ def set_result(cls, result, case_name):
+ """
+ :param result: True or False
+ :param case_name: test case name
+ :return: None
+ """
+ cls.TEST_RESULT["pass" if result else "fail"].append(case_name)
+
+
+get_failed_cases = TestResult.get_failed_cases
+get_passed_cases = TestResult.get_passed_cases
+
+
+MANDATORY_INFO = {
+ "execution_time": 1,
+ "env_tag": "default",
+}
+
+
+def test_method(**kwargs):
+ """
+ decorator for test case function.
+ The following keyword arguments are pre-defined.
+ Any other keyword arguments will be regarded as filter for the test case,
+ able to access them by ``case_info`` attribute of test method.
+
+ :keyword app: class for test app. see :doc:`App <App>` for details
+ :keyword dut: class for current dut. see :doc:`DUT <DUT>` for details
+ :keyword env_tag: name for test environment, used to select configs from config file
+ :keyword env_config_file: test env config file. usually will not set this keyword when define case
+ :keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
+ usually will not set this keyword when define case
+ """
+ def test(test_func):
+ # get test function file name
+ frame = inspect.stack()
+ test_func_file_name = frame[1][1]
+
+ case_info = MANDATORY_INFO.copy()
+ case_info["name"] = test_func.__name__
+ case_info.update(kwargs)
+
+ # create env instance
+ env_config = DefaultEnvConfig.get_default_config()
+ for key in kwargs:
+ if key in env_config:
+ env_config[key] = kwargs[key]
+
+ @functools.wraps(test_func)
+ def handle_test(extra_data=None, **overwrite):
+ """
+ create env, run test and record test results
+
+ :param extra_data: extra data that runner or main passed to test case
+ :param overwrite: args that runner or main want to overwrite
+ :return: None
+ """
+ env_config.update(overwrite)
+ env_inst = Env.Env(**env_config)
+ # prepare for xunit test results
+ xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
+ XUNIT_FILE_NAME)
+ XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
+ try:
+ console_log("starting running test: " + test_func.__name__, color="green")
+ # execute test function
+ test_func(env_inst, extra_data)
+ # if finish without exception, test result is True
+ result = True
+ except Exception as e:
+ # handle all the exceptions here
+ traceback.print_exc()
+ result = False
+ # log failure
+ XUNIT_RECEIVER.failure(str(e), test_func_file_name)
+ finally:
+ # do close all DUTs
+ env_inst.close()
+ # end case and output result
+ XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
+ with open(xunit_file, "ab+") as f:
+ f.write(xunitgen.toxml(XUNIT_RECEIVER.results(),
+ XUNIT_DEFAULT_TEST_SUITE))
+
+ if result:
+ console_log("Test Succeed: " + test_func.__name__, color="green")
+ else:
+ console_log(("Test Fail: " + test_func.__name__), color="red")
+ TestResult.set_result(result, test_func.__name__)
+ return result
+
+ handle_test.case_info = case_info
+ handle_test.test_method = True
+ return handle_test
+ return test
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Processing case config files.
+This is mainly designed for CI, we need to auto create and assign test jobs.
+
+Template Config File::
+
+ TestConfig:
+ app:
+ path: Users/Test/TinyTestFW/IDF/IDFApp.py
+ class: Example
+ dut:
+ path:
+ class:
+ config_file: /somewhere/config_file_for_runner
+ test_name: CI_test_job_1
+
+ Filter:
+ chip: ESP32
+ env_tag: default
+
+ CaseConfig:
+ - name: test_examples_protocol_https_request
+ # optional
+ extra_data: some extra data passed to case with kwarg extra_data
+ overwrite: # overwrite test configs
+ app:
+ path: Users/Test/TinyTestFW/IDF/IDFApp.py
+ class: Example
+ - name: xxx
+"""
+
+# TODO: add a function to use suitable import lib for python2 and python3
+import imp
+
+import yaml
+
+import TestCase
+
+
+def _filter_one_case(test_method, case_filter):
+ """ Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """
+ filter_result = True
+ for key in case_filter:
+ if key in test_method.case_info:
+ # the filter key is both in case and filter
+ # we need to check if they match
+ filter_item, accepted_item = case_filter[key], test_method.case_info[key]
+
+ if isinstance(filter_item, (tuple, list)) \
+ and isinstance(accepted_item, (tuple, list)):
+ # both list/tuple, check if they have common item
+ filter_result = True if set(filter_item) & set(accepted_item) else False
+ elif isinstance(filter_item, (tuple, list)):
+ # filter item list/tuple, check if case accepted value in filter item list/tuple
+ filter_result = True if accepted_item in filter_item else False
+ elif isinstance(accepted_item, (tuple, list)):
+ # accepted item list/tuple, check if case filter value is in accept item list/tuple
+ filter_result = True if filter_item in accepted_item else False
+ else:
+ # both string/int, just do string compare
+ filter_result = (filter_item == accepted_item)
+ else:
+ # key in filter only, which means the case supports all values for this filter key, match succeed
+ pass
+ if not filter_result:
+ # match failed
+ break
+ return filter_result
+
+
+def filter_test_cases(test_methods, case_filter):
+ """
+ filter test case. filter logic:
+
+ 1. if filter key both in case attribute and filter:
+ * if both value is string/int, then directly compare
+ * if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple
+ * if both are list/tuple, then check if they have common item
+ 2. if only case attribute or filter have the key, filter succeed
+
+ for example, the following are match succeed scenarios
+ (the rule is symmetric, result is same if exchange values for user filter and case attribute):
+
+ * user case filter is ``chip: ["esp32", "esp32c"]``, case doesn't have ``chip`` attribute
+ * user case filter is ``chip: ["esp32", "esp32c"]``, case attribute is ``chip: "esp32"``
+ * user case filter is ``chip: "esp32"``, case attribute is ``chip: "esp32"``
+
+
+ :param test_methods: a list of test methods functions
+ :param case_filter: case filter
+ :return: filtered test methods
+ """
+ filtered_test_methods = []
+ for test_method in test_methods:
+ if _filter_one_case(test_method, case_filter):
+ filtered_test_methods.append(test_method)
+ return filtered_test_methods
+
+
+class Parser(object):
+ DEFAULT_CONFIG = {
+ "TestConfig": dict(),
+ "Filter": dict(),
+ "CaseConfig": [{"extra_data": None}],
+ }
+
+ @classmethod
+ def parse_config_file(cls, config_file):
+ """
+ parse from config file and then update to default config.
+
+ :param config_file: config file path
+ :return: configs
+ """
+ configs = cls.DEFAULT_CONFIG.copy()
+ if config_file:
+ with open(config_file, "r") as f:
+ configs.update(yaml.load(f))
+ return configs
+
+ @classmethod
+ def handle_overwrite_args(cls, overwrite):
+ """
+ handle overwrite configs. import module from path and then get the required class.
+
+ :param overwrite: overwrite args
+ :return: dict of (original key: class)
+ """
+ output = dict()
+ for key in overwrite:
+ _path = overwrite[key]["path"]
+ # TODO: add a function to use suitable import lib for python2 and python3
+ _module = imp.load_source(str(hash(_path)), overwrite[key]["path"])
+ output[key] = _module.__getattribute__(overwrite[key]["class"])
+ return output
+
+ @classmethod
+ def apply_config(cls, test_methods, config_file):
+ """
+ apply config for test methods
+
+ :param test_methods: a list of test methods functions
+ :param config_file: case filter file
+ :return: filtered cases
+ """
+ configs = cls.parse_config_file(config_file)
+ test_case_list = []
+ for _config in configs["CaseConfig"]:
+ _filter = configs["Filter"].copy()
+ _filter.update(_config)
+ _overwrite = cls.handle_overwrite_args(_filter.pop("overwrite", dict()))
+ _extra_data = _filter.pop("extra_data", None)
+ for test_method in test_methods:
+ if _filter_one_case(test_method, _filter):
+ test_case_list.append(TestCase.TestCase(test_method, _extra_data, **_overwrite))
+ return test_case_list
+
+
+class Generator(object):
+ """ Case config file generator """
+
+ def __init__(self):
+ self.default_config = {
+ "TestConfig": dict(),
+ "Filter": dict(),
+ }
+
+ def set_default_configs(self, test_config, case_filter):
+ """
+ :param test_config: "TestConfig" value
+ :param case_filter: "Filter" value
+ :return: None
+ """
+ self.default_config = {"TestConfig": test_config, "Filter": case_filter}
+
+ def generate_config(self, case_configs, output_file):
+ """
+ :param case_configs: "CaseConfig" value
+ :param output_file: output file path
+ :return: None
+ """
+ config = self.default_config.copy()
+ config.update({"CaseConfig": case_configs})
+ with open(output_file, "w") as f:
+ yaml.dump(config, f)
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import yaml
+
+
+class Job(dict):
+ """
+ Gitlab CI job
+
+ :param job: job data loaded from .gitlab-ci.yml
+ :param job_name: job name
+ """
+ def __init__(self, job, job_name):
+ super(Job, self).__init__(job)
+ self["name"] = job_name
+
+ def match_group(self, group):
+ """
+ Match group by tags of job.
+ All filters values of group should be included in tags.
+
+ :param group: case group to match
+ :return: True or False
+ """
+ match_result = False
+ for _ in range(1):
+ if "case group" in self:
+ # this job is already assigned
+ break
+ for value in group.filters.values():
+ if value not in self["tags"]:
+ break
+ else:
+ continue
+ break
+ else:
+ match_result = True
+ return match_result
+
+ def assign_group(self, group):
+ """
+ assign a case group to a test job.
+
+ :param group: the case group to assign
+ """
+ self["case group"] = group
+
+ def output_config(self, file_path):
+ """
+ output test config to the given path.
+ file name will be job_name.yml
+
+ :param file_path: output file path
+ :return: None
+ """
+ file_name = os.path.join(file_path, self["name"] + ".yml")
+ if "case group" in self:
+ with open(file_name, "w") as f:
+ yaml.dump(self["case group"].output(), f)
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" search test cases from a given file or path """
+import os
+import fnmatch
+import types
+import copy
+# TODO: add a function to use suitable import lib for python2 and python3
+import imp
+
+
+class Search(object):
+ TEST_CASE_FILE_PATTERN = "*_test.py"
+
+ @classmethod
+ def _search_cases_from_file(cls, file_name):
+ """ get test cases from test case .py file """
+
+ print("Try to get cases from: " + file_name)
+ test_functions = []
+ try:
+ # TODO: add a function to use suitable import lib for python2 and python3
+ mod = imp.load_source(str(hash(file_name)), file_name)
+ for func in [mod.__getattribute__(x) for x in dir(mod)
+ if isinstance(mod.__getattribute__(x), types.FunctionType)]:
+ try:
+ # test method decorator will add test_method attribute to test function
+ if func.test_method:
+ test_functions.append(func)
+ except AttributeError:
+ continue
+ except ImportError as e:
+ print("ImportError: \r\n\tFile:" + file_name + "\r\n\tError:" + str(e))
+ for i, test_function in enumerate(test_functions):
+ print("\t{}. ".format(i+1) + test_function.case_info["name"])
+ return test_functions
+
+ @classmethod
+ def _search_test_case_files(cls, test_case, file_pattern):
+ """ search all test case files recursively of a path """
+
+ if not os.path.exists(test_case):
+ raise OSError("test case path not exist")
+ if os.path.isdir(test_case):
+ test_case_files = []
+ for root, _, file_names in os.walk(test_case):
+ for filename in fnmatch.filter(file_names, file_pattern):
+ test_case_files.append(os.path.join(root, filename))
+ else:
+ test_case_files = [test_case]
+ return test_case_files
+
+ @classmethod
+ def replicate_case(cls, case):
+ """
+ Replicate cases according to its filter values.
+ If one case has specified filter chip=(ESP32, ESP32C),
+ it will create 2 cases, one for ESP32 and on for ESP32C.
+ Once the cases are replicated, it's easy to filter those we want to execute.
+
+ :param case: the original case
+ :return: a list of replicated cases
+ """
+ replicate_config = []
+ for key in case.case_info:
+ if isinstance(case.case_info[key], (list, tuple)):
+ replicate_config.append(key)
+
+ def _replicate_for_key(case_list, replicate_key, replicate_list):
+ case_out = []
+ for _case in case_list:
+ for value in replicate_list:
+ new_case = copy.deepcopy(_case)
+ new_case.case_info[replicate_key] = value
+ case_out.append(new_case)
+ return case_out
+
+ replicated_cases = [case]
+ for key in replicate_config:
+ replicated_cases = _replicate_for_key(replicated_cases, key, case.case_info[key])
+
+ return replicated_cases
+
+ @classmethod
+ def search_test_cases(cls, test_case):
+ """
+ search all test cases from a folder or file, and then do case replicate.
+
+ :param test_case: test case file(s) path
+ :return: a list of replicated test methods
+ """
+ test_case_files = cls._search_test_case_files(test_case, cls.TEST_CASE_FILE_PATTERN)
+ test_cases = []
+ for test_case_file in test_case_files:
+ test_cases += cls._search_cases_from_file(test_case_file)
+ # handle replicate cases
+ test_case_out = []
+ for case in test_cases:
+ test_case_out += cls.replicate_case(case)
+ return test_case_out
--- /dev/null
+# Minimal makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line.
+SPHINXOPTS =
+SPHINXAPI = sphinx-apidoc
+SPHINXAPISRC = ..
+SPHINXBUILD = python -msphinx
+SPHINXPROJ = TinyTestFW
+SOURCEDIR = .
+BUILDDIR = _build
+
+# define the files to be excluded here
+EXCLUEDLIST = "$(SPHINXAPISRC)/example.py"
+
+# Put it first so that "make" without argument is like "make help".
+help:
+ @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
+
+.PHONY: help Makefile
+
+# Catch-all target: route all unknown targets to Sphinx using the new
+# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
+%: Makefile
+ @$(SPHINXAPI) -o $(SOURCEDIR) $(SPHINXAPISRC) $(EXCLUEDLIST)
+ @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
\ No newline at end of file
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# TinyTestFW documentation build configuration file, created by
+# sphinx-quickstart on Thu Sep 21 20:19:12 2017.
+#
+# This file is execfile()d with the current directory set to its
+# containing dir.
+#
+# Note that not all possible configuration values are present in this
+# autogenerated file.
+#
+# All configuration values have a default; values that are commented out
+# serve to show the default.
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+#
+import os
+import sys
+sys.path.insert(0, os.path.abspath('..'))
+
+# import sphinx_rtd_theme
+
+
+# -- General configuration ------------------------------------------------
+
+# If your documentation needs a minimal Sphinx version, state it here.
+#
+# needs_sphinx = '1.0'
+
+# Add any Sphinx extension module names here, as strings. They can be
+# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
+# ones.
+extensions = ['sphinx.ext.autodoc',
+ 'sphinx.ext.viewcode']
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ['_templates']
+
+# The suffix(es) of source filenames.
+# You can specify multiple suffix as a list of string:
+#
+# source_suffix = ['.rst', '.md']
+source_suffix = '.rst'
+
+# The master toctree document.
+master_doc = 'index'
+
+# General information about the project.
+project = u'TinyTestFW'
+copyright = u'2017, Espressif'
+author = u'Espressif'
+
+# The version info for the project you're documenting, acts as replacement for
+# |version| and |release|, also used in various other places throughout the
+# built documents.
+#
+# The short X.Y version.
+version = u'0.1'
+# The full version, including alpha/beta/rc tags.
+release = u'0.1'
+
+# The language for content autogenerated by Sphinx. Refer to documentation
+# for a list of supported languages.
+#
+# This is also used if you do content translation via gettext catalogs.
+# Usually you set "language" from the command line for these cases.
+language = None
+
+# List of patterns, relative to source directory, that match files and
+# directories to ignore when looking for source files.
+# This patterns also effect to html_static_path and html_extra_path
+exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
+
+# The name of the Pygments (syntax highlighting) style to use.
+pygments_style = 'sphinx'
+
+# If true, `todo` and `todoList` produce output, else they produce nothing.
+todo_include_todos = False
+
+
+# -- Options for HTML output ----------------------------------------------
+
+# The theme to use for HTML and HTML Help pages. See the documentation for
+# a list of builtin themes.
+#
+html_theme = 'sphinx_rtd_theme'
+
+# Theme options are theme-specific and customize the look and feel of a theme
+# further. For a list of options available for each theme, see the
+# documentation.
+#
+# html_theme_options = {}
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['_static']
+
+
+# -- Options for HTMLHelp output ------------------------------------------
+
+# Output file base name for HTML help builder.
+htmlhelp_basename = 'TinyTestFWdoc'
+
+
+# -- Options for LaTeX output ---------------------------------------------
+
+latex_elements = {
+ # The paper size ('letterpaper' or 'a4paper').
+ #
+ # 'papersize': 'letterpaper',
+
+ # The font size ('10pt', '11pt' or '12pt').
+ #
+ # 'pointsize': '10pt',
+
+ # Additional stuff for the LaTeX preamble.
+ #
+ # 'preamble': '',
+
+ # Latex figure (float) alignment
+ #
+ # 'figure_align': 'htbp',
+}
+
+# Grouping the document tree into LaTeX files. List of tuples
+# (source start file, target name, title,
+# author, documentclass [howto, manual, or own class]).
+latex_documents = [
+ (master_doc, 'TinyTestFW.tex', u'TinyTestFW Documentation',
+ u'He Yinling', 'manual'),
+]
+
+
+# -- Options for manual page output ---------------------------------------
+
+# One entry per manual page. List of tuples
+# (source start file, name, description, authors, manual section).
+man_pages = [
+ (master_doc, 'tinytestfw', u'TinyTestFW Documentation',
+ [author], 1)
+]
+
+
+# -- Options for Texinfo output -------------------------------------------
+
+# Grouping the document tree into Texinfo files. List of tuples
+# (source start file, target name, title, author,
+# dir menu entry, description, category)
+texinfo_documents = [
+ (master_doc, 'TinyTestFW', u'TinyTestFW Documentation',
+ author, 'TinyTestFW', 'One line description of project.',
+ 'Miscellaneous'),
+]
+
+
+
--- /dev/null
+.. TinyTestFW documentation master file, created by
+ sphinx-quickstart on Thu Sep 21 20:19:12 2017.
+ You can adapt this file completely to your liking, but it should at least
+ contain the root `toctree` directive.
+
+Welcome to TinyTestFW's documentation!
+======================================
+
+We have a lot of test which depends on interact with DUT via communication port.
+Usually we send command to the port and then check response to see if the test succeed.
+TinyTestFW is designed for such scenarios.
+It supports ESP-IDF applications and is able for other applications by writing new bundles.
+
+Test FW features
+----------------
+
+1. Test Environment:
+ 1. DUT: DUT provides methods to interact with DUT
+ * read/write through port
+ * expect method which supports expect one or multiple string or RegEx
+ * tool methods provided by the tool bundle, like ``start_app``, ``reset``
+ 2. App:
+ * provide some specific features to the test application of DUT, for example:
+ * SDK path
+ * SDK tools
+ * application information like partition table, download configs
+ 3. Environment Configs:
+ * support get env configs from config file or auto-detect from current PC
+ * provide ``get_variable`` method to get variables
+2. allow to customize components (DUT, App) to support different devices
+3. Integrate to CI:
+ * provide interfaces for Gitlab-CI
+ * provide ``search case`` and ``runner`` interfaces, able to integrate with other CI
+
+Example
+-------
+
+Let's first check a simple simple::
+
+ import re
+ import os
+ import sys
+ test_fw_path = os.getenv("TEST_FW_PATH")
+ if test_fw_path:
+ sys.path.insert(0, test_fw_path)
+
+ import TinyFW
+ from IDF import IDFApp, IDFDUT
+
+
+ @TinyFW.test_method(app=IDFApp.Example, dut=IDFDUT.IDFDUT, env_tag="Example_WIFI",
+ chip="ESP32", module="examples", execution_time=1)
+ def test_examples_protocol_https_request(env, extra_data):
+ """
+ steps: |
+ 1. join AP
+ 2. connect to www.howsmyssl.com:443
+ 3. send http request
+ """
+ dut1 = env.get_dut("https_request", "examples/protocols/https_request")
+ dut1.start_app()
+ dut1.expect("Connecting to www.howsmyssl.com:443", timeout=30)
+ dut1.expect("Performing the SSL/TLS handshake")
+ dut1.expect("Certificate verified.", timeout=15)
+ dut1.expect_all(re.compile(r"Cipher suite is TLS-ECDHE-RSA-WITH-AES-128-GCM-SHA256"),
+ "Reading HTTP response",
+ timeout=20)
+ dut1.expect(re.compile(r"Completed (\d) requests"))
+
+
+ if __name__ == '__main__':
+ TinyFW.set_default_config(config_file="EnvConfigTemplate.yml")
+ test_examples_protocol_https_request()
+
+
+SOP for adding test cases
+-------------------------
+
+1. import test framework:
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+* we assume ``TEST_FW_PATH`` is pre-defined before running the tests
+* Then we can import python packages and files from ``TEST_FW_PATH``
+
+2. define test case:
+^^^^^^^^^^^^^^^^^^^^
+
+1. define test case ``test_xxx(env, extra_data)``
+ * env: instance of test env, see :doc:`Test Env <Env>` for details
+ * extra_data: extra data passed from test case caller
+2. add decorator for test case
+ * add decorator ``TinyFW.test_method`` to test method
+ * define default case configs and filters in decorator, see :doc:`TinyFW.test_method <TinyFW>`
+
+3. execute test cases:
+^^^^^^^^^^^^^^^^^^^^^^
+
+* define in ``main`` section and execute from this file
+ 1. set preset configs(optional). If the config is not define in case decorator, it will use the preset configs.
+ 2. call test case method:
+ * if you don't pass any arguments, it will use default values
+ * you can pass ``extra_data`` to test case by adding ``extra_data=some_data`` as kwarg of test case method.
+ default value for extra_data is None.
+ * you can overwrite test case config by adding them as kwarg of test case method.
+ It will overwrite preset configs and case default configs.
+
+ Examples::
+
+ test_examples_protocol_https_request(extra_data=["data1", "data2"], dut=SomeOtherDUT, env_tag="OtherEnv")
+
+* or, use ``runner`` to execute. see :doc:`runner <Runner>` for details
+
+
+
+.. toctree::
+ :maxdepth: 2
+ :caption: Contents:
+
+ modules
+
+Dependency
+==========
+
+Support for both Python2 and Python3 (tested on python 2.7.13 and 3.6.2).
+
+The following 3rd party lib is required:
+
+ * pyserial
+ * pyyaml
+ * xunitgen
+
+To build document, we need to install ``Sphinx`` and ``sphinx-rtd-theme`` (you may replace this with your own theme).
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
--- /dev/null
+# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" example of writing test with TinyTestFW """
+import re
+import os
+import sys
+
+# if we want to run test case outside `tiny-test-fw` folder,
+# we need to insert tiny-test-fw path into sys path
+test_fw_path = os.getenv("TEST_FW_PATH")
+if test_fw_path and test_fw_path not in sys.path:
+ sys.path.insert(0, test_fw_path)
+
+import TinyFW
+import IDF
+
+
+@IDF.idf_example_test(env_tag="Example_WIFI")
+def test_examples_protocol_https_request(env, extra_data):
+ """
+ steps: |
+ 1. join AP
+ 2. connect to www.howsmyssl.com:443
+ 3. send http request
+ """
+ dut1 = env.get_dut("https_request", "examples/protocols/https_request")
+ dut1.start_app()
+ dut1.expect(re.compile(r"Connecting to www.howsmyssl.com:443"), timeout=30)
+ dut1.expect("Performing the SSL/TLS handshake")
+ dut1.expect("Certificate verified.", timeout=15)
+ dut1.expect_all(re.compile(r"Cipher suite is TLS-ECDHE-RSA-WITH-AES-128-GCM-SHA256"),
+ "Reading HTTP response",
+ timeout=20)
+ dut1.expect(re.compile(r"Completed (\d) requests"))
+
+
+if __name__ == '__main__':
+ TinyFW.set_default_config(config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
+ test_examples_protocol_https_request()