import os
import sys
import binascii
-import subprocess
import tempfile
import collections
import struct
-__version__ = '1.0'
+try:
+ from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET
+except ImportError:
+ COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
+ PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, "partition_table")
-IDF_COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
+ sys.path.append(PARTTOOL_DIR)
+ from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET
-PARTTOOL_PY = os.path.join(IDF_COMPONENTS_PATH, "partition_table", "parttool.py")
+__version__ = '2.0'
SPI_FLASH_SEC_SIZE = 0x2000
print(msg)
-def _invoke_parttool(parttool_args, args, output=False, partition=None):
- invoke_args = []
+class OtatoolTarget():
- if partition:
- invoke_args += [sys.executable, PARTTOOL_PY] + partition
- else:
- invoke_args += [sys.executable, PARTTOOL_PY, "--partition-type", "data", "--partition-subtype", "ota"]
-
- if quiet:
- invoke_args += ["-q"]
-
- if args.port != "":
- invoke_args += ["--port", args.port]
-
- if args.partition_table_file:
- invoke_args += ["--partition-table-file", args.partition_table_file]
-
- if args.partition_table_offset:
- invoke_args += ["--partition-table-offset", args.partition_table_offset]
+ OTADATA_PARTITION = PartitionType("data", "ota")
- invoke_args += parttool_args
-
- if output:
- return subprocess.check_output(invoke_args)
- else:
- return subprocess.check_call(invoke_args)
-
-
-def _get_otadata_contents(args, check=True):
- global quiet
-
- if check:
- check_args = ["get_partition_info", "--info", "offset", "size"]
-
- quiet = True
- output = _invoke_parttool(check_args, args, True).split(b" ")
- quiet = args.quiet
-
- if not output:
- raise RuntimeError("No ota_data partition found")
-
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f_name = f.name
-
- try:
- invoke_args = ["read_partition", "--output", f_name]
- _invoke_parttool(invoke_args, args)
- with open(f_name, "rb") as f:
- contents = f.read()
- finally:
- os.unlink(f_name)
+ def __init__(self, port=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None, spi_flash_sec_size=SPI_FLASH_SEC_SIZE):
+ self.target = ParttoolTarget(port, partition_table_offset, partition_table_file)
+ self.spi_flash_sec_size = spi_flash_sec_size
- return contents
-
-
-def _get_otadata_status(otadata_contents):
- status = []
-
- otadata_status = collections.namedtuple("otadata_status", "seq crc")
-
- for i in range(2):
- start = i * (SPI_FLASH_SEC_SIZE >> 1)
-
- seq = bytearray(otadata_contents[start:start + 4])
- crc = bytearray(otadata_contents[start + 28:start + 32])
+ temp_file = tempfile.NamedTemporaryFile(delete=False)
+ temp_file.close()
+ try:
+ self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
+ with open(temp_file.name, "rb") as f:
+ self.otadata = f.read()
+ except Exception:
+ self.otadata = None
+ finally:
+ os.unlink(temp_file.name)
- seq = struct.unpack('>I', seq)
- crc = struct.unpack('>I', crc)
+ def _check_otadata_partition(self):
+ if not self.otadata:
+ raise Exception("No otadata partition found")
- status.append(otadata_status(seq[0], crc[0]))
+ def erase_otadata(self):
+ self._check_otadata_partition()
+ self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION)
- return status
+ def _get_otadata_info(self):
+ info = []
+ otadata_info = collections.namedtuple("otadata_info", "seq crc")
-def read_otadata(args):
- status("Reading ota_data partition contents...")
- otadata_info = _get_otadata_contents(args)
- otadata_info = _get_otadata_status(otadata_info)
+ for i in range(2):
+ start = i * (self.spi_flash_sec_size >> 1)
- print(otadata_info)
+ seq = bytearray(self.otadata[start:start + 4])
+ crc = bytearray(self.otadata[start + 28:start + 32])
- print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
- print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc,
- otadata_info[1].seq, otadata_info[1].crc))
+ seq = struct.unpack('>I', seq)
+ crc = struct.unpack('>I', crc)
+ info.append(otadata_info(seq[0], crc[0]))
-def erase_otadata(args):
- status("Erasing ota_data partition contents...")
- _invoke_parttool(["erase_partition"], args)
- status("Erased ota_data partition contents")
+ return info
+ def _get_partition_id_from_ota_id(self, ota_id):
+ if isinstance(ota_id, int):
+ return PartitionType("app", "ota_" + str(ota_id))
+ else:
+ return PartitionName(ota_id)
-def switch_otadata(args):
- sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
- import gen_esp32part as gen
+ def switch_ota_partition(self, ota_id):
+ self._check_otadata_partition()
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f_name = f.name
+ sys.path.append(PARTTOOL_DIR)
+ import gen_esp32part as gen
- try:
- def is_otadata_status_valid(status):
+ def is_otadata_info_valid(status):
seq = status.seq % (1 << 32)
crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
- status("Looking for ota app partitions...")
-
- # In order to get the number of ota app partitions, we need the partition table
- partition_table = None
- invoke_args = ["get_partition_info", "--table", f_name]
-
- _invoke_parttool(invoke_args, args)
-
- partition_table = open(f_name, "rb").read()
- partition_table = gen.PartitionTable.from_binary(partition_table)
+ partition_table = self.target.partition_table
ota_partitions = list()
ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
if not ota_partitions:
- raise RuntimeError("No ota app partitions found")
-
- status("Verifying partition to switch to exists...")
+ raise Exception("No ota app partitions found")
# Look for the app partition to switch to
ota_partition_next = None
try:
- if args.name:
- ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
+ if isinstance(ota_id, int):
+ ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions)
else:
- ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
+ ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions)
ota_partition_next = list(ota_partition_next)[0]
except IndexError:
- raise RuntimeError("Partition to switch to not found")
+ raise Exception("Partition to switch to not found")
- otadata_contents = _get_otadata_contents(args)
- otadata_status = _get_otadata_status(otadata_contents)
+ otadata_info = self._get_otadata_info()
# Find the copy to base the computation for ota sequence number on
otadata_compute_base = -1
# Both are valid, take the max as computation base
- if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
- if otadata_status[0].seq >= otadata_status[1].seq:
+ if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]):
+ if otadata_info[0].seq >= otadata_info[1].seq:
otadata_compute_base = 0
else:
otadata_compute_base = 1
# Only one copy is valid, use that
- elif is_otadata_status_valid(otadata_status[0]):
+ elif is_otadata_info_valid(otadata_info[0]):
otadata_compute_base = 0
- elif is_otadata_status_valid(otadata_status[1]):
+ elif is_otadata_info_valid(otadata_info[1]):
otadata_compute_base = 1
# Both are invalid (could be initial state - all 0xFF's)
else:
# Find the next ota sequence number
if otadata_compute_base == 0 or otadata_compute_base == 1:
- base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
+ base_seq = otadata_info[otadata_compute_base].seq % (1 << 32)
i = 0
while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
- with open(f_name, "wb") as otadata_next_file:
- start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
+ temp_file = tempfile.NamedTemporaryFile(delete=False)
+ temp_file.close()
- otadata_next_file.write(otadata_contents)
+ try:
+ with open(temp_file.name, "wb") as otadata_next_file:
+ start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1)
- otadata_next_file.seek(start)
- otadata_next_file.write(ota_seq_next)
+ otadata_next_file.write(self.otadata)
- otadata_next_file.seek(start + 28)
- otadata_next_file.write(ota_seq_crc_next)
+ otadata_next_file.seek(start)
+ otadata_next_file.write(ota_seq_next)
- otadata_next_file.flush()
+ otadata_next_file.seek(start + 28)
+ otadata_next_file.write(ota_seq_crc_next)
- _invoke_parttool(["write_partition", "--input", f_name], args)
- status("Updated ota_data partition")
- finally:
- os.unlink(f_name)
+ otadata_next_file.flush()
+ self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
+ finally:
+ os.unlink(temp_file.name)
+
+ def read_ota_partition(self, ota_id, output):
+ self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output)
+
+ def write_ota_partition(self, ota_id, input):
+ self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input)
+
+ def erase_ota_partition(self, ota_id):
+ self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id))
+
+
+def _read_otadata(target):
+ target._check_otadata_partition()
+
+ otadata_info = target._get_otadata_info(target.otadata)
+
+ print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
+ print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc,
+ otadata_info[1].seq, otadata_info[1].crc))
+
+
+def _erase_otadata(target):
+ target.erase_otadata()
+ status("Erased ota_data partition contents")
-def _get_partition_specifier(args):
- if args.name:
- return ["--partition-name", args.name]
- else:
- return ["--partition-type", "app", "--partition-subtype", "ota_" + str(args.slot)]
+def _switch_ota_partition(target, ota_id):
+ target.switch_ota_partition(ota_id)
-def read_ota_partition(args):
- invoke_args = ["read_partition", "--output", args.output]
- _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
- status("Read ota partition contents to file {}".format(args.output))
+def _read_ota_partition(target, ota_id, output):
+ target.read_ota_partition(ota_id, output)
+ status("Read ota partition contents to file {}".format(output))
-def write_ota_partition(args):
- invoke_args = ["write_partition", "--input", args.input]
- _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
- status("Written contents of file {} to ota partition".format(args.input))
+def _write_ota_partition(target, ota_id, input):
+ target.write_ota_partition(ota_id, input)
+ status("Written contents of file {} to ota partition".format(input))
-def erase_ota_partition(args):
- invoke_args = ["erase_partition"]
- _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
+
+def _erase_ota_partition(target, ota_id):
+ target.erase_ota_partition(ota_id)
status("Erased contents of ota partition")
# There are two possible sources for the partition table: a device attached to the host
# or a partition table CSV/binary file. These sources are mutually exclusive.
- partition_table_info_source_args = parser.add_mutually_exclusive_group()
+ parser.add_argument("--port", "-p", help="port where the device to read the partition table from is attached")
- partition_table_info_source_args.add_argument("--port", "-p", help="port where the device to read the partition table from is attached", default="")
- partition_table_info_source_args.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from", default="")
+ parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", type=str)
- parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", default="0x8000")
+ parser.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from; \
+ overrides device attached to specified port as the partition table source when defined")
subparsers = parser.add_subparsers(dest="operation", help="run otatool -h for additional help")
+ spi_flash_sec_size = argparse.ArgumentParser(add_help=False)
+ spi_flash_sec_size.add_argument("--spi-flash-sec-size", help="value of SPI_FLASH_SEC_SIZE macro", type=str)
+
# Specify the supported operations
- subparsers.add_parser("read_otadata", help="read otadata partition")
+ subparsers.add_parser("read_otadata", help="read otadata partition", parents=[spi_flash_sec_size])
subparsers.add_parser("erase_otadata", help="erase otadata partition")
slot_or_name_parser = argparse.ArgumentParser(add_help=False)
slot_or_name_parser_args.add_argument("--slot", help="slot number of the ota partition", type=int)
slot_or_name_parser_args.add_argument("--name", help="name of the ota partition")
- subparsers.add_parser("switch_otadata", help="switch otadata partition", parents=[slot_or_name_parser])
+ subparsers.add_parser("switch_ota_partition", help="switch otadata partition", parents=[slot_or_name_parser, spi_flash_sec_size])
read_ota_partition_subparser = subparsers.add_parser("read_ota_partition", help="read contents of an ota partition", parents=[slot_or_name_parser])
read_ota_partition_subparser.add_argument("--output", help="file to write the contents of the ota partition to")
parser.print_help()
sys.exit(1)
- # Else execute the operation
- operation_func = globals()[args.operation]
+ target_args = {}
+
+ if args.port:
+ target_args["port"] = args.port
+
+ if args.partition_table_file:
+ target_args["partition_table_file"] = args.partition_table_file
+
+ if args.partition_table_offset:
+ target_args["partition_table_offset"] = int(args.partition_table_offset, 0)
+
+ try:
+ if args.spi_flash_sec_size:
+ target_args["spi_flash_sec_size"] = int(args.spi_flash_sec_size, 0)
+ except AttributeError:
+ pass
+
+ target = OtatoolTarget(**target_args)
+
+ # Create the operation table and execute the operation
+ common_args = {'target':target}
+
+ ota_id = []
+
+ try:
+ if args.name is not None:
+ ota_id = ["name"]
+ else:
+ if args.slot is not None:
+ ota_id = ["slot"]
+ except AttributeError:
+ pass
+
+ otatool_ops = {
+ 'read_otadata':(_read_otadata, []),
+ 'erase_otadata':(_erase_otadata, []),
+ 'switch_ota_partition':(_switch_ota_partition, ota_id),
+ 'read_ota_partition':(_read_ota_partition, ["output"] + ota_id),
+ 'write_ota_partition':(_write_ota_partition, ["input"] + ota_id),
+ 'erase_ota_partition':(_erase_ota_partition, ota_id)
+ }
+
+ (op, op_args) = otatool_ops[args.operation]
+
+ for op_arg in op_args:
+ common_args.update({op_arg:vars(args)[op_arg]})
+
+ try:
+ common_args['ota_id'] = common_args.pop('name')
+ except KeyError:
+ try:
+ common_args['ota_id'] = common_args.pop('slot')
+ except KeyError:
+ pass
if quiet:
# If exceptions occur, suppress and exit quietly
try:
- operation_func(args)
+ op(**common_args)
except Exception:
sys.exit(2)
else:
- operation_func(args)
+ op(**common_args)
if __name__ == '__main__':