]> granicus.if.org Git - esp-idf/commitdiff
unit tests: Keep serial port open when running esptool
authorAngus Gratton <angus@espressif.com>
Wed, 5 Dec 2018 00:13:33 +0000 (11:13 +1100)
committerAngus Gratton <gus@projectgus.com>
Tue, 18 Dec 2018 03:09:29 +0000 (14:09 +1100)
* Call esptool directly not via subprocess
* Use the same serial port instance for listener thread and esptool
* Includes some refactoring for encapsulation of App vs DUT members

components/esptool_py/flasher_args.json.in
tools/tiny-test-fw/DUT.py
tools/tiny-test-fw/IDF/IDFApp.py
tools/tiny-test-fw/IDF/IDFDUT.py
tools/tiny-test-fw/docs/index.rst

index 1a56d5cedd7c01974b133403331b2af2305beb02..61e03f38870772b7337619e21b64f98eadfbf69f 100644 (file)
@@ -2,6 +2,11 @@
     "write_flash_args" : [ "--flash_mode", "${ESPFLASHMODE}",
                            "--flash_size", "${ESPFLASHSIZE}",
                            "--flash_freq", "${ESPFLASHFREQ}" ],
+    "flash_settings" : {
+        "flash_mode": "${ESPFLASHMODE}",
+        "flash_size": "${ESPFLASHSIZE}",
+        "flash_freq": "${ESPFLASHFREQ}"
+    },
     "flash_files" : {
         "${BOOTLOADER_OFFSET}" : "bootloader/bootloader.bin",
         "${PARTITION_TABLE_OFFSET}" : "partition_table/partition-table.bin",
index 899c7e885085e0854cb1ef468fbfe4e39183764c..343328ad74d06fe1fb2cc2366fca6d603e6022ba 100644 (file)
@@ -286,8 +286,8 @@ class BaseDUT(object):
         self.record_data_lock = threading.RLock()
         self.receive_thread = None
         self.expect_failures = []
-        # open and start during init
-        self.open()
+        self._port_open()
+        self.start_receive()
 
     def __str__(self):
         return "DUT({}: {})".format(self.name, str(self.port))
@@ -392,27 +392,32 @@ class BaseDUT(object):
         pass
 
     # methods that features raw port methods
-    def open(self):
+    def start_receive(self):
         """
-        open port and create thread to receive data.
+        Start thread to receive data.
 
         :return: None
         """
-        self._port_open()
         self.receive_thread = _RecvThread(self._port_read, self.data_cache,
                                           self.recorded_data, self.record_data_lock)
         self.receive_thread.start()
 
-    def close(self):
+    def stop_receive(self):
         """
-        close receive thread and then close port.
-
+        stop the receiving thread for the port
         :return: None
         """
         if self.receive_thread:
             self.receive_thread.exit()
-        self._port_close()
         self.LOG_THREAD.flush_data()
+        self.receive_thread = None
+
+    def close(self):
+        """
+        permanently close the port
+        """
+        self.stop_receive()
+        self._port_close()
 
     @staticmethod
     def u_to_bytearray(data):
index 54972137a2537342579a76b2201a7a60724cfa42..83ff31e93228255891082e2c5369db1eba1faf0a 100644 (file)
@@ -16,6 +16,7 @@
 import subprocess
 
 import os
+import json
 import App
 
 
@@ -26,7 +27,7 @@ class IDFApp(App.BaseApp):
     """
 
     IDF_DOWNLOAD_CONFIG_FILE = "download.config"
-    IDF_FLASH_ARGS_FILE = "flash_project_args"
+    IDF_FLASH_ARGS_FILE = "flasher_args.json"
 
     def __init__(self, app_path):
         super(IDFApp, self).__init__(app_path)
@@ -43,7 +44,8 @@ class IDFApp(App.BaseApp):
                              self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
                 raise AssertionError(msg)
 
-        self.esptool, self.partition_tool = self.get_tools()
+        self.flash_files, self.flash_settings = self._parse_flash_download_config()
+        self.partition_table =  self._parse_partition_table()
 
     @classmethod
     def get_sdk_path(cls):
@@ -52,16 +54,6 @@ class IDFApp(App.BaseApp):
         assert os.path.exists(idf_path)
         return idf_path
 
-    @classmethod
-    def get_tools(cls):
-        idf_path = cls.get_sdk_path()
-        # get esptool and partition tool for esp-idf
-        esptool = os.path.join(idf_path, "components",
-                               "esptool_py", "esptool", "esptool.py")
-        partition_tool = os.path.join(idf_path, "components",
-                                      "partition_table", "gen_esp32part.py")
-        assert os.path.exists(esptool) and os.path.exists(partition_tool)
-        return esptool, partition_tool
 
     def get_binary_path(self, app_path):
         """
@@ -74,47 +66,64 @@ class IDFApp(App.BaseApp):
         """
         pass
 
-    def process_arg(self, arg):
-        """
-        process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs.
+    def _parse_flash_download_config(self):
         """
-        if ".bin" in arg:
-            ret = os.path.join(self.binary_path, arg)
-        else:
-            ret = arg
-        return ret.strip("\r\n ")
+        Parse flash download config from build metadata files
 
-    def process_app_info(self):
-        """
-        get app download config and partition info from a specific app path
+        Sets self.flash_files, self.flash_settings
+
+        (Called from constructor)
 
-        :return: download config, partition info
+        Returns (flash_files, flash_settings)
         """
 
         if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path):
+            # CMake version using build metadata file
             with open(os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE), "r") as f:
-                configs = []
-                for line in f:
-                    line = line.strip()
-                    if len(line) > 0:
-                        configs += line.split()
+                args = json.load(f)
+                flash_files = [ (offs,file) for (offs,file) in args["flash_files"].items() if offs != "" ]
+                flash_settings = args["flash_settings"]
         else:
+            # GNU Make version uses download.config arguments file
             with open(os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE), "r") as f:
-                configs = f.read().split(" ")
+                args = f.readlines()[-1].split(" ")
+                flash_files = []
+                flash_settings = {}
+                for idx in range(0, len(args), 2):  # process arguments in pairs
+                    if args[idx].startswith("--"):
+                        # strip the -- from the command line argument
+                        flash_settings[args[idx][2:]] = args[idx+1]
+                    else:
+                        # offs, filename
+                        flash_files.append( (args[idx], args[idx+1]) )
 
-        download_configs = ["--chip", "auto", "--before", "default_reset",
-                            "--after", "hard_reset", "write_flash", "-z"]
-        download_configs += [self.process_arg(x) for x in configs]
+        # make file offsets into integers, make paths absolute
+        flash_files = [ (int(offs, 0), os.path.join(self.binary_path, path.strip())) for (offs, path) in flash_files ]
 
-        # handle partition table
-        for partition_file in download_configs:
-            if "partition" in partition_file:
-                partition_file = os.path.join(self.binary_path, partition_file)
+        return (flash_files, flash_settings)
+
+    def _parse_partition_table(self):
+        """
+        Parse partition table contents based on app binaries
+
+        Returns partition_table data
+
+        (Called from constructor)
+        """
+        partition_tool = os.path.join(self.idf_path,
+                                      "components",
+                                      "partition_table",
+                                      "gen_esp32part.py")
+        assert os.path.exists(partition_tool)
+
+        for (_, path) in self.flash_files:
+            if "partition" in path:
+                partition_file = os.path.join(self.binary_path, path)
                 break
         else:
             raise ValueError("No partition table found for IDF binary path: {}".format(self.binary_path))
 
-        process = subprocess.Popen(["python", self.partition_tool, partition_file],
+        process = subprocess.Popen(["python", partition_tool, partition_file],
                                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
         raw_data = process.stdout.read()
         if isinstance(raw_data, bytes):
@@ -140,7 +149,8 @@ class IDFApp(App.BaseApp):
                     "size": _size,
                     "flags": _flags
                 }
-        return download_configs, partition_table
+
+        return partition_table
 
 
 class Example(IDFApp):
index 1a660931b58f60282994635bcb9b1ff7df810337..418b9c1671d90da222b11200c225b25801a79d2f 100644 (file)
 
 """ DUT for IDF applications """
 import os
+import os.path
 import sys
 import re
 import subprocess
 import functools
 import random
 import tempfile
+import time
 
 from serial.tools import list_ports
 
+from collections import namedtuple
+
 import DUT
 
+try:
+    import esptool
+except ImportError:  # cheat and use IDF's copy of esptool if available
+    idf_path = os.getenv("IDF_PATH")
+    if not idf_path or not os.path.exists(idf_path):
+        raise
+    sys.path.insert(0, os.path.join(idf_path, "components", "esptool_py", "esptool"))
+    import esptool
+
 
 class IDFToolError(OSError):
     pass
 
 
-def _tool_method(func):
-    """ close port, execute tool method and then reopen port """
+def _uses_esptool(func):
+    """ Suspend listener thread, connect with esptool,
+    call target function with esptool instance,
+    then resume listening for output
+    """
     @functools.wraps(func)
     def handler(self, *args, **kwargs):
-        self.close()
-        ret = func(self, *args, **kwargs)
-        self.open()
+        self.stop_receive()
+
+        settings = self.port_inst.get_settings()
+
+        rom = esptool.ESP32ROM(self.port_inst)
+        rom.connect('hard_reset')
+        esp = rom.run_stub()
+
+        ret = func(self, esp, *args, **kwargs)
+
+        self.port_inst.apply_settings(settings)
+        self.start_receive()
         return ret
     return handler
 
 
 class IDFDUT(DUT.SerialDUT):
-    """ IDF DUT, extends serial with ESPTool methods """
+    """ IDF DUT, extends serial with esptool methods
+
+    (Becomes aware of IDFApp instance which holds app-specific data)
+    """
 
-    CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
     # /dev/ttyAMA0 port is listed in Raspberry Pi
     # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
     INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
@@ -52,88 +79,109 @@ class IDFDUT(DUT.SerialDUT):
     ERASE_NVS = True
 
     def __init__(self, name, port, log_file, app, **kwargs):
-        self.download_config, self.partition_table = app.process_app_info()
         super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
 
     @classmethod
-    def get_chip(cls, app, port):
+    def get_mac(cls, app, port):
         """
-        get chip id via esptool
+        get MAC address via esptool
 
         :param app: application instance (to get tool)
-        :param port: comport
-        :return: chip ID or None
+        :param port: serial port as string
+        :return: MAC address or None
         """
         try:
-            output = subprocess.check_output(["python", app.esptool, "--port", port, "chip_id"])
-        except subprocess.CalledProcessError:
-            output = bytes()
-        if isinstance(output, bytes):
-            output = output.decode()
-        chip_type = cls.CHIP_TYPE_PATTERN.search(output)
-        return chip_type.group(1) if chip_type else None
+            esp = esptool.ESP32ROM(port)
+            esp.connect()
+            return esp.read_mac()
+        except RuntimeError as e:
+            return None
+        finally:
+            esp._port.close()
 
     @classmethod
     def confirm_dut(cls, port, app, **kwargs):
-        return cls.get_chip(app, port) is not None
+        return cls.get_mac(app, port) is not None
 
-    @_tool_method
-    def start_app(self, erase_nvs=ERASE_NVS):
+    @_uses_esptool
+    def start_app(self, esp, erase_nvs=ERASE_NVS):
         """
         download and start app.
 
         :param: erase_nvs: whether erase NVS partition during flash
         :return: None
         """
+        flash_files = [ (offs, open(path, "rb")) for (offs, path) in self.app.flash_files ]
+
         if erase_nvs:
-            address = self.partition_table["nvs"]["offset"]
-            size = self.partition_table["nvs"]["size"]
-            nvs_file = tempfile.NamedTemporaryFile()
+            address = self.app.partition_table["nvs"]["offset"]
+            size = self.app.partition_table["nvs"]["size"]
+            nvs_file = tempfile.TemporaryFile()
             nvs_file.write(b'\xff' * size)
-            nvs_file.flush()
-            download_config = self.download_config + [address, nvs_file.name]
-        else:
-            download_config = self.download_config
+            nvs_file.seek(0)
+            flash_files.append( (int(address, 0), nvs_file) )
+
+        # fake flasher args object, this is a hack until
+        # esptool Python API is improved
+        Flash_Args = namedtuple('write_flash_args',
+                                ['flash_size',
+                                 'flash_mode',
+                                 'flash_freq',
+                                 'addr_filename',
+                                 'no_stub',
+                                 'compress',
+                                 'verify',
+                                 'encrypt'])
+
+        flash_args = Flash_Args(
+            self.app.flash_settings["flash_size"],
+            self.app.flash_settings["flash_mode"],
+            self.app.flash_settings["flash_freq"],
+            flash_files,
+            False,
+            True,
+            False,
+            False
+        )
 
-        retry_baud_rates = ["921600", "115200"]
-        error = IDFToolError()
         try:
-            for baud_rate in retry_baud_rates:
+            for baud_rate in [ 921600, 115200 ]:
                 try:
-                    subprocess.check_output(["python", self.app.esptool,
-                                             "--port", self.port, "--baud", baud_rate]
-                                            + download_config)
+                    esp.change_baud(baud_rate)
+                    esptool.write_flash(esp, flash_args)
                     break
-                except subprocess.CalledProcessError as error:
+                except RuntimeError:
                     continue
             else:
-                raise error
+                raise IDFToolError()
         finally:
-            if erase_nvs:
-                nvs_file.close()
+            for (_,f) in flash_files:
+                f.close()
 
-    @_tool_method
-    def reset(self):
+    @_uses_esptool
+    def reset(self, esp):
         """
-        reset DUT with esptool
+        hard reset DUT
 
         :return: None
         """
-        subprocess.check_output(["python", self.app.esptool, "--port", self.port, "run"])
+        esp.hard_reset()
 
-    @_tool_method
-    def erase_partition(self, partition):
+    @_uses_esptool
+    def erase_partition(self, esp, partition):
         """
         :param partition: partition name to erase
         :return: None
         """
-        address = self.partition_table[partition]["offset"]
-        size = self.partition_table[partition]["size"]
+        raise NotImplementedError()  # TODO: implement this
+        address = self.app.partition_table[partition]["offset"]
+        size = self.app.partition_table[partition]["size"]
+        # TODO can use esp.erase_region() instead of this, I think
         with open(".erase_partition.tmp", "wb") as f:
             f.write(chr(0xFF) * size)
 
-    @_tool_method
-    def dump_flush(self, output_file, **kwargs):
+    @_uses_esptool
+    def dump_flush(self, esp, output_file, **kwargs):
         """
         dump flush
 
@@ -147,7 +195,7 @@ class IDFDUT(DUT.SerialDUT):
         if os.path.isabs(output_file) is False:
             output_file = os.path.relpath(output_file, self.app.get_log_folder())
         if "partition" in kwargs:
-            partition = self.partition_table[kwargs["partition"]]
+            partition = self.app.partition_table[kwargs["partition"]]
             _address = partition["offset"]
             _size = partition["size"]
         elif "address" in kwargs and "size" in kwargs:
@@ -155,11 +203,10 @@ class IDFDUT(DUT.SerialDUT):
             _size = kwargs["size"]
         else:
             raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
-        subprocess.check_output(
-            ["python", self.app.esptool, "--port", self.port, "--baud", "921600",
-             "--before", "default_reset", "--after", "hard_reset", "read_flash",
-             _address, _size, output_file]
-        )
+
+        content = esp.read_flash(_address, _size)
+        with open(output_file, "wb") as f:
+            f.write(content)
 
     @classmethod
     def list_available_ports(cls):
index fac61f1015ab2ac08411f3b88dca4738b3117b2f..fd3bab06a744fb06e4638c8d1c86d7cdb1bdd2ad 100644 (file)
@@ -122,7 +122,8 @@ Class Diagram
    {method} expect_all
    {method} read
    {method} write
-   {method} open
+   {method} start_receive
+   {method} stop_receive
    {method} close
    }
    class SerialDUT {
@@ -137,12 +138,12 @@ Class Diagram
    }
    class BaseApp {
    {method} get_sdk_path
-   {method} get_tools
-   {method} process_app_info
    {method} get_log_folder
    }
    class IDFApp {
-   {method} process_app_info
+   {field} flash_files
+   {field} flash_settings
+   {field} partition_table
    }
    class Example {
    {method} get_binary_path