self.data_cache = self.data_cache[index:]
+class _LogThread(threading.Thread, _queue.Queue):
+ """
+ We found some SD card on Raspberry Pi could have very bad performance.
+ It could take seconds to save small amount of data.
+ If the DUT receives data and save it as log, then it stops receiving data until log is saved.
+ This could lead to expect timeout.
+ As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs.
+ Then data will be passed to ``expect`` as soon as received.
+ """
+ def __init__(self):
+ threading.Thread.__init__(self, name="LogThread")
+ _queue.Queue.__init__(self, maxsize=0)
+ self.setDaemon(True)
+ self.flush_lock = threading.Lock()
+
+ def save_log(self, filename, data):
+ """
+ :param filename: log file name
+ :param data: log data. Must be ``bytes``.
+ """
+ self.put({"filename": filename, "data": data})
+
+ def flush_data(self):
+ with self.flush_lock:
+ data_cache = dict()
+ while True:
+ # move all data from queue to data cache
+ try:
+ log = self.get_nowait()
+ try:
+ data_cache[log["filename"]] += log["data"]
+ except KeyError:
+ data_cache[log["filename"]] = log["data"]
+ except _queue.Empty:
+ break
+ # flush data
+ for filename in data_cache:
+ with open(filename, "ab+") as f:
+ f.write(data_cache[filename])
+
+ def run(self):
+ while True:
+ time.sleep(1)
+ self.flush_data()
+
+
class _RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
DEFAULT_EXPECT_TIMEOUT = 5
MAX_EXPECT_FAILURES_TO_SAVED = 10
+ LOG_THREAD = _LogThread()
+ LOG_THREAD.start()
+
def __init__(self, name, port, log_file, app, **kwargs):
self.expect_lock = threading.Lock()
return "DUT({}: {})".format(self.name, str(self.port))
def _save_expect_failure(self, pattern, data, start_time):
+ """
+ Save expect failure. If the test fails, then it will print the expect failures.
+ In some cases, user will handle expect exceptions.
+ The expect failures could be false alarm, and test case might generate a lot of such failures.
+ Therefore, we don't print the failure immediately and limit the max size of failure list.
+ """
self.expect_failures.insert(0, {"pattern": pattern, "data": data,
"start": start_time, "end": time.time()})
self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
+ def _save_dut_log(self, data):
+ """
+ Save DUT log into file using another thread.
+ This is a workaround for some devices takes long time for file system operations.
+
+ See descriptions in ``_LogThread`` for details.
+ """
+ self.LOG_THREAD.save_log(self.log_file, data)
+
# define for methods need to be overwritten by Port
@classmethod
def list_available_ports(cls):
if self.receive_thread:
self.receive_thread.exit()
self._port_close()
+ self.LOG_THREAD.flush_data()
def write(self, data, eol="\r\n", flush=True):
"""
def _port_read(self, size=1):
data = self.port_inst.read(size)
if data:
- with open(self.log_file, "ab+") as _log_file:
- _log_file.write(self._format_data(data))
+ self._save_dut_log(self._format_data(data))
return data
def _port_write(self, data):