1 # Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http:#www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 DUT provides 3 major groups of features:
18 * DUT port feature, provide basic open/close/read/write features
19 * DUT tools, provide extra methods to control the device, like download and start app
20 * DUT expect method, provide features for users to check DUT outputs
22 The current design of DUT have 3 classes for one DUT: BaseDUT, DUTPort, DUTTool.
25 * defines methods DUT port and DUT tool need to overwrite
26 * provide the expect methods and some other methods based on DUTPort
28 * inherent from BaseDUT class
29 * implements the port features by overwriting port methods defined in BaseDUT
31 * inherent from one of the DUTPort class
32 * implements the tools features by overwriting tool methods defined in BaseDUT
33 * could add some new methods provided by the tool
35 This module implements the BaseDUT class and one of the port class SerialDUT.
36 User should implement their DUTTool classes.
37 If they using different port then need to implement their DUTPort class as well.
40 from __future__ import print_function
49 from serial.tools import list_ports
53 if sys.version_info[0] == 2:
54 import Queue as _queue
56 import queue as _queue
59 class ExpectTimeout(ValueError):
60 """ timeout for expect method """
64 class UnsupportedExpectItem(ValueError):
65 """ expect item not supported by the expect method """
69 def _expect_lock(func):
70 @functools.wraps(func)
71 def handler(self, *args, **kwargs):
72 with self.expect_lock:
73 ret = func(self, *args, **kwargs)
78 def _decode_data(data):
79 """ for python3, if the data is bytes, then decode it to string """
80 if isinstance(data, bytes):
81 # convert bytes to string
83 data = data.decode("utf-8", "ignore")
84 except UnicodeDecodeError:
85 data = data.decode("iso8859-1", )
89 def _pattern_to_string(pattern):
91 ret = "RegEx: " + pattern.pattern
92 except AttributeError:
97 class _DataCache(_queue.Queue):
99 Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
102 def __init__(self, maxsize=0):
103 _queue.Queue.__init__(self, maxsize=maxsize)
104 self.data_cache = str()
106 def _move_from_queue_to_cache(self):
108 move all of the available data in the queue to cache
110 :return: True if moved any item from queue to data cache, else False
115 self.data_cache += _decode_data(self.get(0))
121 def get_data(self, timeout=0.0):
123 get a copy of data from cache.
125 :param timeout: timeout for waiting new queue item
126 :return: copy of data cache
128 # make sure timeout is non-negative
132 ret = self._move_from_queue_to_cache()
135 # we only wait for new data if we can't provide a new data_cache
137 data = self.get(timeout=timeout)
138 self.data_cache += _decode_data(data)
140 # don't do anything when on update for cache
142 return copy.deepcopy(self.data_cache)
144 def flush(self, index=0xFFFFFFFF):
146 flush data from cache.
148 :param index: if < 0 then don't do flush, otherwise flush data before index
151 # first add data in queue to cache
155 self.data_cache = self.data_cache[index:]
158 class _LogThread(threading.Thread, _queue.Queue):
160 We found some SD card on Raspberry Pi could have very bad performance.
161 It could take seconds to save small amount of data.
162 If the DUT receives data and save it as log, then it stops receiving data until log is saved.
163 This could lead to expect timeout.
164 As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs.
165 Then data will be passed to ``expect`` as soon as received.
168 threading.Thread.__init__(self, name="LogThread")
169 _queue.Queue.__init__(self, maxsize=0)
171 self.flush_lock = threading.Lock()
173 def save_log(self, filename, data):
175 :param filename: log file name
176 :param data: log data. Must be ``bytes``.
178 self.put({"filename": filename, "data": data})
180 def flush_data(self):
181 with self.flush_lock:
184 # move all data from queue to data cache
186 log = self.get_nowait()
188 data_cache[log["filename"]] += log["data"]
190 data_cache[log["filename"]] = log["data"]
194 for filename in data_cache:
195 with open(filename, "ab+") as f:
196 f.write(data_cache[filename])
204 class _RecvThread(threading.Thread):
206 PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
208 def __init__(self, read, data_cache):
209 super(_RecvThread, self).__init__()
210 self.exit_event = threading.Event()
213 self.data_cache = data_cache
214 # cache the last line of recv data for collecting performance
215 self._line_cache = str()
217 def collect_performance(self, data):
218 """ collect performance """
220 decoded_data = _decode_data(data)
222 matches = self.PERFORMANCE_PATTERN.findall(self._line_cache + decoded_data)
223 for match in matches:
224 Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
227 # cache incomplete line to later process
228 lines = decoded_data.splitlines(True)
229 last_line = lines[-1]
231 if last_line[-1] != "\n":
233 # only one line and the line is not finished, then append this to cache
234 self._line_cache += lines[-1]
236 # more than one line and not finished, replace line cache
237 self._line_cache = lines[-1]
239 # line finishes, flush cache
240 self._line_cache = str()
243 while not self.exit_event.isSet():
244 data = self.read(1000)
246 self.data_cache.put(data)
247 self.collect_performance(data)
250 self.exit_event.set()
254 class BaseDUT(object):
256 :param name: application defined name for port
257 :param port: comport name, used to create DUT port
258 :param log_file: log file name
259 :param app: test app instance
260 :param kwargs: extra args for DUT to create ports
263 DEFAULT_EXPECT_TIMEOUT = 10
264 MAX_EXPECT_FAILURES_TO_SAVED = 10
266 LOG_THREAD = _LogThread()
269 def __init__(self, name, port, log_file, app, **kwargs):
271 self.expect_lock = threading.Lock()
274 self.log_file = log_file
276 self.data_cache = _DataCache()
277 self.receive_thread = None
278 self.expect_failures = []
279 # open and start during init
283 return "DUT({}: {})".format(self.name, str(self.port))
285 def _save_expect_failure(self, pattern, data, start_time):
287 Save expect failure. If the test fails, then it will print the expect failures.
288 In some cases, user will handle expect exceptions.
289 The expect failures could be false alarm, and test case might generate a lot of such failures.
290 Therefore, we don't print the failure immediately and limit the max size of failure list.
292 self.expect_failures.insert(0, {"pattern": pattern, "data": data,
293 "start": start_time, "end": time.time()})
294 self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
296 def _save_dut_log(self, data):
298 Save DUT log into file using another thread.
299 This is a workaround for some devices takes long time for file system operations.
301 See descriptions in ``_LogThread`` for details.
303 self.LOG_THREAD.save_log(self.log_file, data)
305 # define for methods need to be overwritten by Port
307 def list_available_ports(cls):
309 list all available ports.
311 subclass (port) must overwrite this method.
313 :return: list of available comports
317 def _port_open(self):
321 subclass (port) must overwrite this method.
327 def _port_read(self, size=1):
329 read form port. This method should not blocking for long time, otherwise receive thread can not exit.
331 subclass (port) must overwrite this method.
333 :param size: max size to read.
338 def _port_write(self, data):
342 subclass (port) must overwrite this method.
344 :param data: data to write
349 def _port_close(self):
353 subclass (port) must overwrite this method.
359 # methods that need to be overwritten by Tool
361 def confirm_dut(cls, port, app, **kwargs):
363 confirm if it's a DUT, usually used by auto detecting DUT in by Env config.
365 subclass (tool) must overwrite this method.
368 :param app: app instance
369 :return: True or False
375 usually after we got DUT, we need to do some extra works to let App start.
376 For example, we need to reset->download->reset to let IDF application start on DUT.
378 subclass (tool) must overwrite this method.
384 # methods that features raw port methods
387 open port and create thread to receive data.
392 self.receive_thread = _RecvThread(self._port_read, self.data_cache)
393 self.receive_thread.start()
397 close receive thread and then close port.
401 if self.receive_thread:
402 self.receive_thread.exit()
404 self.LOG_THREAD.flush_data()
407 def u_to_bytearray(data):
409 if data is not bytearray then it tries to convert it
411 :param data: data which needs to be checked and maybe transformed
413 if type(data) is type(u''):
415 data = data.encode('utf-8')
417 print(u'Cannot encode {} of type {}'.format(data, type(data)))
421 def write(self, data, eol="\r\n", flush=True):
424 :param eol: end of line pattern.
425 :param flush: if need to flush received data cache before write data.
426 usually we need to flush data before write,
427 make sure processing outputs generated by wrote.
430 # do flush before write
432 self.data_cache.flush()
435 self._port_write(self.u_to_bytearray(data) + self.u_to_bytearray(eol) if eol else self.u_to_bytearray(data))
438 def read(self, size=0xFFFFFFFF):
440 read(size=0xFFFFFFFF)
441 read raw data. NOT suggested to use this method.
442 Only use it if expect method doesn't meet your requirement.
444 :param size: read size. default read all data
447 data = self.data_cache.get_data(0)[:size]
448 self.data_cache.flush(size)
451 # expect related methods
454 def _expect_str(data, pattern):
456 protected method. check if string is matched in data cache.
458 :param data: data to process
459 :param pattern: string
460 :return: pattern if match succeed otherwise None
462 index = data.find(pattern)
465 index += len(pattern)
471 def _expect_re(data, pattern):
473 protected method. check if re pattern is matched in data cache
475 :param data: data to process
476 :param pattern: compiled RegEx pattern
477 :return: match groups if match succeed otherwise None
480 if type(pattern.pattern) is type(u''):
481 pattern = re.compile(BaseDUT.u_to_bytearray(pattern.pattern))
482 if type(data) is type(u''):
483 data = BaseDUT.u_to_bytearray(data)
484 match = pattern.search(data)
486 ret = tuple(x.decode() for x in match.groups())
493 [type(re.compile("")), "_expect_re"],
494 [type(b''), "_expect_str"], # Python 2 & 3 hook to work without 'from builtins import str' from future
495 [type(u''), "_expect_str"],
498 def _get_expect_method(self, pattern):
500 protected method. get expect method according to pattern type.
502 :param pattern: expect pattern, string or compiled RegEx
503 :return: ``_expect_str`` or ``_expect_re``
505 for expect_method in self.EXPECT_METHOD:
506 if isinstance(pattern, expect_method[0]):
507 method = expect_method[1]
510 raise UnsupportedExpectItem()
511 return self.__getattribute__(method)
514 def expect(self, pattern, timeout=DEFAULT_EXPECT_TIMEOUT):
516 expect(pattern, timeout=DEFAULT_EXPECT_TIMEOUT)
517 expect received data on DUT match the pattern. will raise exception when expect timeout.
519 :raise ExpectTimeout: failed to find the pattern before timeout
520 :raise UnsupportedExpectItem: pattern is not string or compiled RegEx
522 :param pattern: string or compiled RegEx(string pattern)
523 :param timeout: timeout for expect
524 :return: string if pattern is string; matched groups if pattern is RegEx
526 method = self._get_expect_method(pattern)
528 # non-blocking get data for first time
529 data = self.data_cache.get_data(0)
530 start_time = time.time()
532 ret, index = method(data, pattern)
534 self.data_cache.flush(index)
536 time_remaining = start_time + timeout - time.time()
537 if time_remaining < 0:
539 # wait for new data from cache
540 data = self.data_cache.get_data(time_remaining)
543 pattern = _pattern_to_string(pattern)
544 self._save_expect_failure(pattern, data, start_time)
545 raise ExpectTimeout(self.name + ": " + pattern)
548 def _expect_multi(self, expect_all, expect_item_list, timeout):
550 protected method. internal logical for expect multi.
552 :param expect_all: True or False, expect all items in the list or any in the list
553 :param expect_item_list: expect item list
554 :param timeout: timeout
557 def process_expected_item(item_raw):
558 # convert item raw data to standard dict
560 "pattern": item_raw[0] if isinstance(item_raw, tuple) else item_raw,
561 "method": self._get_expect_method(item_raw[0] if isinstance(item_raw, tuple)
563 "callback": item_raw[1] if isinstance(item_raw, tuple) else None,
569 expect_items = [process_expected_item(x) for x in expect_item_list]
571 # non-blocking get data for first time
572 data = self.data_cache.get_data(0)
574 start_time = time.time()
575 matched_expect_items = list()
577 for expect_item in expect_items:
578 if expect_item not in matched_expect_items:
579 # exclude those already matched
580 expect_item["ret"], expect_item["index"] = \
581 expect_item["method"](data, expect_item["pattern"])
582 if expect_item["ret"] is not None:
583 # match succeed for one item
584 matched_expect_items.append(expect_item)
586 # if expect all, then all items need to be matched,
587 # else only one item need to matched
589 match_succeed = len(matched_expect_items) == len(expect_items)
591 match_succeed = True if matched_expect_items else False
593 time_remaining = start_time + timeout - time.time()
594 if time_remaining < 0 or match_succeed:
597 data = self.data_cache.get_data(time_remaining)
600 # sort matched items according to order of appearance in the input data,
601 # so that the callbacks are invoked in correct order
602 matched_expect_items = sorted(matched_expect_items, key=lambda it: it["index"])
603 # invoke callbacks and flush matched data cache
605 for expect_item in matched_expect_items:
607 if expect_item["callback"]:
608 expect_item["callback"](expect_item["ret"])
609 slice_index = max(slice_index, expect_item["index"])
610 # flush already matched data
611 self.data_cache.flush(slice_index)
613 pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
614 self._save_expect_failure(pattern, data, start_time)
615 raise ExpectTimeout(self.name + ": " + pattern)
618 def expect_any(self, *expect_items, **timeout):
620 expect_any(*expect_items, timeout=DEFAULT_TIMEOUT)
621 expect any of the patterns.
622 will call callback (if provided) if pattern match succeed and then return.
623 will pass match result to the callback.
625 :raise ExpectTimeout: failed to match any one of the expect items before timeout
626 :raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
628 :arg expect_items: one or more expect items.
629 string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
630 :keyword timeout: timeout for expect
633 # to be compatible with python2
634 # in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
635 if "timeout" not in timeout:
636 timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
637 return self._expect_multi(False, expect_items, **timeout)
640 def expect_all(self, *expect_items, **timeout):
642 expect_all(*expect_items, timeout=DEFAULT_TIMEOUT)
643 expect all of the patterns.
644 will call callback (if provided) if all pattern match succeed and then return.
645 will pass match result to the callback.
647 :raise ExpectTimeout: failed to match all of the expect items before timeout
648 :raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
650 :arg expect_items: one or more expect items.
651 string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
652 :keyword timeout: timeout for expect
655 # to be compatible with python2
656 # in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
657 if "timeout" not in timeout:
658 timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
659 return self._expect_multi(True, expect_items, **timeout)
663 return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
665 def print_debug_info(self):
667 Print debug info of current DUT. Currently we will print debug info for expect failures.
669 Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
671 for failure in self.expect_failures:
672 Utility.console_log(u"\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
673 .format(failure["pattern"], failure["data"],
674 self._format_ts(failure["start"]), self._format_ts(failure["end"])),
678 class SerialDUT(BaseDUT):
679 """ serial with logging received data feature """
681 DEFAULT_UART_CONFIG = {
683 "bytesize": serial.EIGHTBITS,
684 "parity": serial.PARITY_NONE,
685 "stopbits": serial.STOPBITS_ONE,
691 def __init__(self, name, port, log_file, app, **kwargs):
692 self.port_inst = None
693 self.serial_configs = self.DEFAULT_UART_CONFIG.copy()
694 self.serial_configs.update(kwargs)
695 super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
697 def _format_data(self, data):
699 format data for logging. do decode and add timestamp.
701 :param data: raw data from read
702 :return: formatted data (str)
704 timestamp = "[{}]".format(self._format_ts(time.time()))
705 formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
706 return formatted_data
708 def _port_open(self):
709 self.port_inst = serial.Serial(self.port, **self.serial_configs)
711 def _port_close(self):
712 self.port_inst.close()
714 def _port_read(self, size=1):
715 data = self.port_inst.read(size)
717 self._save_dut_log(self._format_data(data))
720 def _port_write(self, data):
721 if isinstance(data, str):
723 self.port_inst.write(data)
726 def list_available_ports(cls):
727 return [x.device for x in list_ports.comports()]