if not output:
raise RuntimeError("No ota_data partition found")
- with tempfile.NamedTemporaryFile() as otadata_file:
- invoke_args = ["read_partition", "--output", otadata_file.name]
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f_name = f.name
+
+ try:
+ invoke_args = ["read_partition", "--output", f_name]
_invoke_parttool(invoke_args, args)
- return otadata_file.read()
+ with open(f_name, "rb") as f:
+ contents = f.read()
+ finally:
+ os.unlink(f_name)
+
+ return contents
def _get_otadata_status(otadata_contents):
sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
import gen_esp32part as gen
- def is_otadata_status_valid(status):
- seq = status.seq % (1 << 32)
- crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
- return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f_name = f.name
- status("Looking for ota app partitions...")
+ try:
+ def is_otadata_status_valid(status):
+ seq = status.seq % (1 << 32)
+ crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
+ return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
- # In order to get the number of ota app partitions, we need the partition table
- partition_table = None
+ status("Looking for ota app partitions...")
- with tempfile.NamedTemporaryFile() as partition_table_file:
- invoke_args = ["get_partition_info", "--table", partition_table_file.name]
+ # In order to get the number of ota app partitions, we need the partition table
+ partition_table = None
+ invoke_args = ["get_partition_info", "--table", f_name]
_invoke_parttool(invoke_args, args)
- partition_table = partition_table_file.read()
+ partition_table = open(f_name, "rb").read()
partition_table = gen.PartitionTable.from_binary(partition_table)
- ota_partitions = list()
-
- for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
- ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
-
- try:
- ota_partitions.append(list(ota_partition)[0])
- except IndexError:
- break
-
- ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
+ ota_partitions = list()
- if not ota_partitions:
- raise RuntimeError("No ota app partitions found")
+ for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
+ ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
- status("Verifying partition to switch to exists...")
+ try:
+ ota_partitions.append(list(ota_partition)[0])
+ except IndexError:
+ break
- # Look for the app partition to switch to
- ota_partition_next = None
+ ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
- try:
- if args.name:
- ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
- else:
- ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
+ if not ota_partitions:
+ raise RuntimeError("No ota app partitions found")
- ota_partition_next = list(ota_partition_next)[0]
- except IndexError:
- raise RuntimeError("Partition to switch to not found")
+ status("Verifying partition to switch to exists...")
- otadata_contents = _get_otadata_contents(args)
- otadata_status = _get_otadata_status(otadata_contents)
+ # Look for the app partition to switch to
+ ota_partition_next = None
- # Find the copy to base the computation for ota sequence number on
- otadata_compute_base = -1
+ try:
+ if args.name:
+ ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
+ else:
+ ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
- # Both are valid, take the max as computation base
- if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
- if otadata_status[0].seq >= otadata_status[1].seq:
+ ota_partition_next = list(ota_partition_next)[0]
+ except IndexError:
+ raise RuntimeError("Partition to switch to not found")
+
+ otadata_contents = _get_otadata_contents(args)
+ otadata_status = _get_otadata_status(otadata_contents)
+
+ # Find the copy to base the computation for ota sequence number on
+ otadata_compute_base = -1
+
+ # Both are valid, take the max as computation base
+ if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
+ if otadata_status[0].seq >= otadata_status[1].seq:
+ otadata_compute_base = 0
+ else:
+ otadata_compute_base = 1
+ # Only one copy is valid, use that
+ elif is_otadata_status_valid(otadata_status[0]):
otadata_compute_base = 0
- else:
+ elif is_otadata_status_valid(otadata_status[1]):
otadata_compute_base = 1
- # Only one copy is valid, use that
- elif is_otadata_status_valid(otadata_status[0]):
- otadata_compute_base = 0
- elif is_otadata_status_valid(otadata_status[1]):
- otadata_compute_base = 1
- # Both are invalid (could be initial state - all 0xFF's)
- else:
- pass
+ # Both are invalid (could be initial state - all 0xFF's)
+ else:
+ pass
- ota_seq_next = 0
- ota_partitions_num = len(ota_partitions)
+ ota_seq_next = 0
+ ota_partitions_num = len(ota_partitions)
- target_seq = (ota_partition_next.subtype & 0x0F) + 1
+ target_seq = (ota_partition_next.subtype & 0x0F) + 1
- # Find the next ota sequence number
- if otadata_compute_base == 0 or otadata_compute_base == 1:
- base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
+ # Find the next ota sequence number
+ if otadata_compute_base == 0 or otadata_compute_base == 1:
+ base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
- i = 0
- while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
- i += 1
+ i = 0
+ while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
+ i += 1
- ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
- else:
- ota_seq_next = target_seq
+ ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
+ else:
+ ota_seq_next = target_seq
- # Create binary data from computed values
- ota_seq_next = struct.pack("I", ota_seq_next)
- ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
- ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
+ # Create binary data from computed values
+ ota_seq_next = struct.pack("I", ota_seq_next)
+ ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
+ ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
- with tempfile.NamedTemporaryFile() as otadata_next_file:
- start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
+ with open(f_name, "wb") as otadata_next_file:
+ start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
- otadata_next_file.write(otadata_contents)
+ otadata_next_file.write(otadata_contents)
- otadata_next_file.seek(start)
- otadata_next_file.write(ota_seq_next)
+ otadata_next_file.seek(start)
+ otadata_next_file.write(ota_seq_next)
- otadata_next_file.seek(start + 28)
- otadata_next_file.write(ota_seq_crc_next)
+ otadata_next_file.seek(start + 28)
+ otadata_next_file.write(ota_seq_crc_next)
- otadata_next_file.flush()
+ otadata_next_file.flush()
- _invoke_parttool(["write_partition", "--input", otadata_next_file.name], args)
+ _invoke_parttool(["write_partition", "--input", f_name], args)
status("Updated ota_data partition")
+ finally:
+ os.unlink(f_name)
def _get_partition_specifier(args):