import subprocess
import os
+import json
import App
"""
IDF_DOWNLOAD_CONFIG_FILE = "download.config"
- IDF_FLASH_ARGS_FILE = "flash_project_args"
+ IDF_FLASH_ARGS_FILE = "flasher_args.json"
def __init__(self, app_path):
super(IDFApp, self).__init__(app_path)
self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
raise AssertionError(msg)
- self.esptool, self.partition_tool = self.get_tools()
+ self.flash_files, self.flash_settings = self._parse_flash_download_config()
+ self.partition_table = self._parse_partition_table()
@classmethod
def get_sdk_path(cls):
assert os.path.exists(idf_path)
return idf_path
- @classmethod
- def get_tools(cls):
- idf_path = cls.get_sdk_path()
- # get esptool and partition tool for esp-idf
- esptool = os.path.join(idf_path, "components",
- "esptool_py", "esptool", "esptool.py")
- partition_tool = os.path.join(idf_path, "components",
- "partition_table", "gen_esp32part.py")
- assert os.path.exists(esptool) and os.path.exists(partition_tool)
- return esptool, partition_tool
def get_binary_path(self, app_path):
"""
"""
pass
- def process_arg(self, arg):
- """
- process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs.
+ def _parse_flash_download_config(self):
"""
- if ".bin" in arg:
- ret = os.path.join(self.binary_path, arg)
- else:
- ret = arg
- return ret.strip("\r\n ")
+ Parse flash download config from build metadata files
- def process_app_info(self):
- """
- get app download config and partition info from a specific app path
+ Sets self.flash_files, self.flash_settings
+
+ (Called from constructor)
- :return: download config, partition info
+ Returns (flash_files, flash_settings)
"""
if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path):
+ # CMake version using build metadata file
with open(os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE), "r") as f:
- configs = []
- for line in f:
- line = line.strip()
- if len(line) > 0:
- configs += line.split()
+ args = json.load(f)
+ flash_files = [ (offs,file) for (offs,file) in args["flash_files"].items() if offs != "" ]
+ flash_settings = args["flash_settings"]
else:
+ # GNU Make version uses download.config arguments file
with open(os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE), "r") as f:
- configs = f.read().split(" ")
+ args = f.readlines()[-1].split(" ")
+ flash_files = []
+ flash_settings = {}
+ for idx in range(0, len(args), 2): # process arguments in pairs
+ if args[idx].startswith("--"):
+ # strip the -- from the command line argument
+ flash_settings[args[idx][2:]] = args[idx+1]
+ else:
+ # offs, filename
+ flash_files.append( (args[idx], args[idx+1]) )
- download_configs = ["--chip", "auto", "--before", "default_reset",
- "--after", "hard_reset", "write_flash", "-z"]
- download_configs += [self.process_arg(x) for x in configs]
+ # make file offsets into integers, make paths absolute
+ flash_files = [ (int(offs, 0), os.path.join(self.binary_path, path.strip())) for (offs, path) in flash_files ]
- # handle partition table
- for partition_file in download_configs:
- if "partition" in partition_file:
- partition_file = os.path.join(self.binary_path, partition_file)
+ return (flash_files, flash_settings)
+
+ def _parse_partition_table(self):
+ """
+ Parse partition table contents based on app binaries
+
+ Returns partition_table data
+
+ (Called from constructor)
+ """
+ partition_tool = os.path.join(self.idf_path,
+ "components",
+ "partition_table",
+ "gen_esp32part.py")
+ assert os.path.exists(partition_tool)
+
+ for (_, path) in self.flash_files:
+ if "partition" in path:
+ partition_file = os.path.join(self.binary_path, path)
break
else:
raise ValueError("No partition table found for IDF binary path: {}".format(self.binary_path))
- process = subprocess.Popen(["python", self.partition_tool, partition_file],
+ process = subprocess.Popen(["python", partition_tool, partition_file],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data = process.stdout.read()
if isinstance(raw_data, bytes):
"size": _size,
"flags": _flags
}
- return download_configs, partition_table
+
+ return partition_table
class Example(IDFApp):
""" DUT for IDF applications """
import os
+import os.path
import sys
import re
import subprocess
import functools
import random
import tempfile
+import time
from serial.tools import list_ports
+from collections import namedtuple
+
import DUT
+try:
+ import esptool
+except ImportError: # cheat and use IDF's copy of esptool if available
+ idf_path = os.getenv("IDF_PATH")
+ if not idf_path or not os.path.exists(idf_path):
+ raise
+ sys.path.insert(0, os.path.join(idf_path, "components", "esptool_py", "esptool"))
+ import esptool
+
class IDFToolError(OSError):
pass
-def _tool_method(func):
- """ close port, execute tool method and then reopen port """
+def _uses_esptool(func):
+ """ Suspend listener thread, connect with esptool,
+ call target function with esptool instance,
+ then resume listening for output
+ """
@functools.wraps(func)
def handler(self, *args, **kwargs):
- self.close()
- ret = func(self, *args, **kwargs)
- self.open()
+ self.stop_receive()
+
+ settings = self.port_inst.get_settings()
+
+ rom = esptool.ESP32ROM(self.port_inst)
+ rom.connect('hard_reset')
+ esp = rom.run_stub()
+
+ ret = func(self, esp, *args, **kwargs)
+
+ self.port_inst.apply_settings(settings)
+ self.start_receive()
return ret
return handler
class IDFDUT(DUT.SerialDUT):
- """ IDF DUT, extends serial with ESPTool methods """
+ """ IDF DUT, extends serial with esptool methods
+
+ (Becomes aware of IDFApp instance which holds app-specific data)
+ """
- CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
# /dev/ttyAMA0 port is listed in Raspberry Pi
# /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
ERASE_NVS = True
def __init__(self, name, port, log_file, app, **kwargs):
- self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
@classmethod
- def get_chip(cls, app, port):
+ def get_mac(cls, app, port):
"""
- get chip id via esptool
+ get MAC address via esptool
:param app: application instance (to get tool)
- :param port: comport
- :return: chip ID or None
+ :param port: serial port as string
+ :return: MAC address or None
"""
try:
- output = subprocess.check_output(["python", app.esptool, "--port", port, "chip_id"])
- except subprocess.CalledProcessError:
- output = bytes()
- if isinstance(output, bytes):
- output = output.decode()
- chip_type = cls.CHIP_TYPE_PATTERN.search(output)
- return chip_type.group(1) if chip_type else None
+ esp = esptool.ESP32ROM(port)
+ esp.connect()
+ return esp.read_mac()
+ except RuntimeError as e:
+ return None
+ finally:
+ esp._port.close()
@classmethod
def confirm_dut(cls, port, app, **kwargs):
- return cls.get_chip(app, port) is not None
+ return cls.get_mac(app, port) is not None
- @_tool_method
- def start_app(self, erase_nvs=ERASE_NVS):
+ @_uses_esptool
+ def start_app(self, esp, erase_nvs=ERASE_NVS):
"""
download and start app.
:param: erase_nvs: whether erase NVS partition during flash
:return: None
"""
+ flash_files = [ (offs, open(path, "rb")) for (offs, path) in self.app.flash_files ]
+
if erase_nvs:
- address = self.partition_table["nvs"]["offset"]
- size = self.partition_table["nvs"]["size"]
- nvs_file = tempfile.NamedTemporaryFile()
+ address = self.app.partition_table["nvs"]["offset"]
+ size = self.app.partition_table["nvs"]["size"]
+ nvs_file = tempfile.TemporaryFile()
nvs_file.write(b'\xff' * size)
- nvs_file.flush()
- download_config = self.download_config + [address, nvs_file.name]
- else:
- download_config = self.download_config
+ nvs_file.seek(0)
+ flash_files.append( (int(address, 0), nvs_file) )
+
+ # fake flasher args object, this is a hack until
+ # esptool Python API is improved
+ Flash_Args = namedtuple('write_flash_args',
+ ['flash_size',
+ 'flash_mode',
+ 'flash_freq',
+ 'addr_filename',
+ 'no_stub',
+ 'compress',
+ 'verify',
+ 'encrypt'])
+
+ flash_args = Flash_Args(
+ self.app.flash_settings["flash_size"],
+ self.app.flash_settings["flash_mode"],
+ self.app.flash_settings["flash_freq"],
+ flash_files,
+ False,
+ True,
+ False,
+ False
+ )
- retry_baud_rates = ["921600", "115200"]
- error = IDFToolError()
try:
- for baud_rate in retry_baud_rates:
+ for baud_rate in [ 921600, 115200 ]:
try:
- subprocess.check_output(["python", self.app.esptool,
- "--port", self.port, "--baud", baud_rate]
- + download_config)
+ esp.change_baud(baud_rate)
+ esptool.write_flash(esp, flash_args)
break
- except subprocess.CalledProcessError as error:
+ except RuntimeError:
continue
else:
- raise error
+ raise IDFToolError()
finally:
- if erase_nvs:
- nvs_file.close()
+ for (_,f) in flash_files:
+ f.close()
- @_tool_method
- def reset(self):
+ @_uses_esptool
+ def reset(self, esp):
"""
- reset DUT with esptool
+ hard reset DUT
:return: None
"""
- subprocess.check_output(["python", self.app.esptool, "--port", self.port, "run"])
+ esp.hard_reset()
- @_tool_method
- def erase_partition(self, partition):
+ @_uses_esptool
+ def erase_partition(self, esp, partition):
"""
:param partition: partition name to erase
:return: None
"""
- address = self.partition_table[partition]["offset"]
- size = self.partition_table[partition]["size"]
+ raise NotImplementedError() # TODO: implement this
+ address = self.app.partition_table[partition]["offset"]
+ size = self.app.partition_table[partition]["size"]
+ # TODO can use esp.erase_region() instead of this, I think
with open(".erase_partition.tmp", "wb") as f:
f.write(chr(0xFF) * size)
- @_tool_method
- def dump_flush(self, output_file, **kwargs):
+ @_uses_esptool
+ def dump_flush(self, esp, output_file, **kwargs):
"""
dump flush
if os.path.isabs(output_file) is False:
output_file = os.path.relpath(output_file, self.app.get_log_folder())
if "partition" in kwargs:
- partition = self.partition_table[kwargs["partition"]]
+ partition = self.app.partition_table[kwargs["partition"]]
_address = partition["offset"]
_size = partition["size"]
elif "address" in kwargs and "size" in kwargs:
_size = kwargs["size"]
else:
raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
- subprocess.check_output(
- ["python", self.app.esptool, "--port", self.port, "--baud", "921600",
- "--before", "default_reset", "--after", "hard_reset", "read_flash",
- _address, _size, output_file]
- )
+
+ content = esp.read_flash(_address, _size)
+ with open(output_file, "wb") as f:
+ f.write(content)
@classmethod
def list_available_ports(cls):