if not test_suite_name:
test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
sdk_path = cls.get_sdk_path()
- return os.path.join(sdk_path, "TEST_LOGS",
- test_suite_name +
- time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
+ log_folder = os.path.join(sdk_path, "TEST_LOGS",
+ test_suite_name +
+ time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
+ if not os.path.exists(log_folder):
+ os.makedirs(log_folder)
+ return log_folder
def process_app_info(self):
"""
return data
+def _pattern_to_string(pattern):
+ try:
+ ret = "RegEx: " + pattern.pattern
+ except AttributeError:
+ ret = pattern
+ return ret
+
+
class _DataCache(_queue.Queue):
"""
Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
_queue.Queue.__init__(self, maxsize=maxsize)
self.data_cache = str()
+ def _move_from_queue_to_cache(self):
+ """
+ move all of the available data in the queue to cache
+
+ :return: True if moved any item from queue to data cache, else False
+ """
+ ret = False
+ while True:
+ try:
+ self.data_cache += _decode_data(self.get(0))
+ ret = True
+ except _queue.Empty:
+ break
+ return ret
+
def get_data(self, timeout=0):
"""
get a copy of data from cache.
if timeout < 0:
timeout = 0
- try:
- data = self.get(timeout=timeout)
- self.data_cache += _decode_data(data)
- except _queue.Empty:
- # don't do anything when on update for cache
- pass
+ ret = self._move_from_queue_to_cache()
+
+ if not ret:
+ # we only wait for new data if we can't provide a new data_cache
+ try:
+ data = self.get(timeout=timeout)
+ self.data_cache += _decode_data(data)
+ except _queue.Empty:
+ # don't do anything when on update for cache
+ pass
return copy.deepcopy(self.data_cache)
def flush(self, index=0xFFFFFFFF):
data = self.data_cache.get_data(time.time() + timeout - start_time)
if ret is None:
- raise ExpectTimeout(self.name + ": " + str(pattern))
+ raise ExpectTimeout(self.name + ": " + _pattern_to_string(pattern))
return ret
def _expect_multi(self, expect_all, expect_item_list, timeout):
if expect_item["ret"] is not None:
# match succeed for one item
matched_expect_items.append(expect_item)
- break
# if expect all, then all items need to be matched,
# else only one item need to matched
if expect_all:
- match_succeed = (matched_expect_items == expect_items)
+ match_succeed = len(matched_expect_items) == len(expect_items)
else:
match_succeed = True if matched_expect_items else False
# flush already matched data
self.data_cache.flush(slice_index)
else:
- raise ExpectTimeout(self.name + ": " + str(expect_items))
+ raise ExpectTimeout(self.name + ": " + str([_pattern_to_string(x) for x in expect_items]))
@_expect_lock
def expect_any(self, *expect_items, **timeout):
dut=None,
env_tag=None,
env_config_file=None,
- test_name=None,
+ test_suite_name=None,
**kwargs):
self.app_cls = app
self.default_dut_cls = dut
self.config = EnvConfig.Config(env_config_file, env_tag)
- self.log_path = self.app_cls.get_log_folder(test_name)
+ self.log_path = self.app_cls.get_log_folder(test_suite_name)
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
try:
with open(config_file) as f:
configs = yaml.load(f)[env_name]
- except (OSError, TypeError):
+ except (OSError, TypeError, IOError):
configs = dict()
return configs
def __init__(self, test_case, case_config, env_config_file=None):
super(Runner, self).__init__()
self.setDaemon(True)
- test_methods = SearchCases.Search.search_test_cases(test_case)
- self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
- self.test_result = True
if case_config:
test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
else:
test_suite_name = "TestRunner"
TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
+ test_methods = SearchCases.Search.search_test_cases(test_case)
+ self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
+ self.test_result = []
def run(self):
for case in self.test_cases:
- self.test_result = self.test_result and case.run()
+ result = case.run()
+ self.test_result.append(result)
+
+ def get_test_result(self):
+ return self.test_result and all(self.test_result)
if __name__ == '__main__':
except KeyboardInterrupt:
print("exit by Ctrl-C")
break
- if not runner.test_result:
+ if not runner.get_test_result():
sys.exit(1)
case_info["name"] = test_func.__name__
case_info.update(kwargs)
- # create env instance
- env_config = DefaultEnvConfig.get_default_config()
- for key in kwargs:
- if key in env_config:
- env_config[key] = kwargs[key]
-
@functools.wraps(test_func)
def handle_test(extra_data=None, **overwrite):
"""
:param overwrite: args that runner or main want to overwrite
:return: None
"""
+ # create env instance
+ env_config = DefaultEnvConfig.get_default_config()
+ for key in kwargs:
+ if key in env_config:
+ env_config[key] = kwargs[key]
+
env_config.update(overwrite)
env_inst = Env.Env(**env_config)
# prepare for xunit test results
if __name__ == '__main__':
- TinyFW.set_default_config(config_file="EnvConfigTemplate.yml")
+ TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml")
test_examples_protocol_https_request()
if __name__ == '__main__':
- TinyFW.set_default_config(config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
+ TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
test_examples_protocol_https_request()