def __init__(self, read, dut):
super(IDFRecvThread, self).__init__(read, dut)
self.exceptions = _queue.Queue()
+ self.performances = _queue.Queue()
def collect_performance(self, comp_data):
matches = self.PERFORMANCE_PATTERN.findall(comp_data)
for match in matches:
Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
color="orange")
+ self.performances.put((match[0], match[1]))
def detect_exception(self, comp_data):
for pattern in self.EXCEPTION_PATTERNS:
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
self.allow_dut_exception = allow_dut_exception
self.exceptions = _queue.Queue()
+ self.performances = _queue.Queue()
@classmethod
def get_mac(cls, app, port):
pass
return ret
+ @staticmethod
+ def _queue_read_all(source_queue):
+ output = []
+ while True:
+ try:
+ output.append(source_queue.get(timeout=0))
+ except _queue.Empty:
+ break
+ return output
+
+ def _queue_copy(self, source_queue, dest_queue):
+ data = self._queue_read_all(source_queue)
+ for d in data:
+ dest_queue.put(d)
+
+ def _get_from_queue(self, queue_name):
+ self_queue = getattr(self, queue_name)
+ if self.receive_thread:
+ recv_thread_queue = getattr(self.receive_thread, queue_name)
+ self._queue_copy(recv_thread_queue, self_queue)
+ return self._queue_read_all(self_queue)
+
def stop_receive(self):
if self.receive_thread:
- while True:
- try:
- self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
- except _queue.Empty:
- break
+ for name in ["performances", "exceptions"]:
+ source_queue = getattr(self.receive_thread, name)
+ dest_queue = getattr(self, name)
+ self._queue_copy(source_queue, dest_queue)
super(IDFDUT, self).stop_receive()
def get_exceptions(self):
""" Get exceptions detected by DUT receive thread. """
- if self.receive_thread:
- while True:
- try:
- self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
- except _queue.Empty:
- break
- exceptions = []
- while True:
- try:
- exceptions.append(self.exceptions.get(timeout=0))
- except _queue.Empty:
- break
- return exceptions
+ return self._get_from_queue("exceptions")
+
+ def get_performance_items(self):
+ """
+ DUT receive thread will automatic collect performance results with pattern ``[Performance][name]: value\n``.
+ This method is used to get all performance results.
+
+ :return: a list of performance items.
+ """
+ return self._get_from_queue("performances")
def close(self):
super(IDFDUT, self).close()
import time
import traceback
import functools
+import socket
+from datetime import datetime
import junit_xml
class JunitReport(object):
# wrapper for junit test report
- # TODO: Don't support by multi-thread (although not likely to be used this way).
+ # TODO: JunitReport methods are not thread safe (although not likely to be used this way).
JUNIT_FILE_NAME = "XUNIT_RESULT.xml"
JUNIT_DEFAULT_TEST_SUITE = "test-suite"
- JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE)
+ JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE,
+ hostname=socket.gethostname(),
+ timestamp=datetime.utcnow().isoformat())
JUNIT_CURRENT_TEST_CASE = None
_TEST_CASE_CREATED_TS = 0
cls._TEST_CASE_CREATED_TS = time.time()
return test_case
+ @classmethod
+ def update_performance(cls, performance_items):
+ """
+ Update performance results to ``stdout`` of current test case.
+
+ :param performance_items: a list of performance items. each performance item is a key-value pair.
+ """
+ assert cls.JUNIT_CURRENT_TEST_CASE
+
+ for item in performance_items:
+ cls.JUNIT_CURRENT_TEST_CASE.stdout += "[{}]: {}\n".format(item[0], item[1])
+
def test_method(**kwargs):
"""