]> granicus.if.org Git - esp-idf/commitdiff
otatool, parttool: Fix permission denied error on Windows
authorRenz Christian Bagaporo <renz@espressif.com>
Tue, 8 Jan 2019 07:47:44 +0000 (15:47 +0800)
committerRenz Christian Bagaporo <renz@espressif.com>
Sun, 13 Jan 2019 22:05:39 +0000 (06:05 +0800)
components/app_update/otatool.py
components/partition_table/parttool.py

index f7216b0dc1766301347181cd7059ad601ead0155..aead479d921259bff25b910c27749332ad1c1f6c 100755 (executable)
@@ -83,10 +83,18 @@ def _get_otadata_contents(args, check=True):
         if not output:
             raise RuntimeError("No ota_data partition found")
 
-    with tempfile.NamedTemporaryFile() as otadata_file:
-        invoke_args = ["read_partition", "--output", otadata_file.name]
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+        f_name = f.name
+
+    try:
+        invoke_args = ["read_partition", "--output", f_name]
         _invoke_parttool(invoke_args, args)
-        return otadata_file.read()
+        with open(f_name, "rb") as f:
+            contents = f.read()
+    finally:
+        os.unlink(f_name)
+
+    return contents
 
 
 def _get_otadata_status(otadata_contents):
@@ -130,112 +138,116 @@ def switch_otadata(args):
     sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
     import gen_esp32part as gen
 
-    def is_otadata_status_valid(status):
-        seq = status.seq % (1 << 32)
-        crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
-        return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+        f_name = f.name
 
-    status("Looking for ota app partitions...")
+    try:
+        def is_otadata_status_valid(status):
+            seq = status.seq % (1 << 32)
+            crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
+            return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
 
-    # In order to get the number of ota app partitions, we need the partition table
-    partition_table = None
+        status("Looking for ota app partitions...")
 
-    with tempfile.NamedTemporaryFile() as partition_table_file:
-        invoke_args = ["get_partition_info", "--table", partition_table_file.name]
+        # In order to get the number of ota app partitions, we need the partition table
+        partition_table = None
+        invoke_args = ["get_partition_info", "--table", f_name]
 
         _invoke_parttool(invoke_args, args)
 
-        partition_table = partition_table_file.read()
+        partition_table = open(f_name, "rb").read()
         partition_table = gen.PartitionTable.from_binary(partition_table)
 
-    ota_partitions = list()
-
-    for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
-        ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
-
-        try:
-            ota_partitions.append(list(ota_partition)[0])
-        except IndexError:
-            break
-
-    ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
+        ota_partitions = list()
 
-    if not ota_partitions:
-        raise RuntimeError("No ota app partitions found")
+        for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
+            ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
 
-    status("Verifying partition to switch to exists...")
+            try:
+                ota_partitions.append(list(ota_partition)[0])
+            except IndexError:
+                break
 
-    # Look for the app partition to switch to
-    ota_partition_next = None
+        ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
 
-    try:
-        if args.name:
-            ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
-        else:
-            ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA  == args.slot, ota_partitions)
+        if not ota_partitions:
+            raise RuntimeError("No ota app partitions found")
 
-        ota_partition_next = list(ota_partition_next)[0]
-    except IndexError:
-        raise RuntimeError("Partition to switch to not found")
+        status("Verifying partition to switch to exists...")
 
-    otadata_contents = _get_otadata_contents(args)
-    otadata_status = _get_otadata_status(otadata_contents)
+        # Look for the app partition to switch to
+        ota_partition_next = None
 
-    # Find the copy to base the computation for ota sequence number on
-    otadata_compute_base = -1
+        try:
+            if args.name:
+                ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
+            else:
+                ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA  == args.slot, ota_partitions)
 
-    # Both are valid, take the max as computation base
-    if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
-        if otadata_status[0].seq >= otadata_status[1].seq:
+            ota_partition_next = list(ota_partition_next)[0]
+        except IndexError:
+            raise RuntimeError("Partition to switch to not found")
+
+        otadata_contents = _get_otadata_contents(args)
+        otadata_status = _get_otadata_status(otadata_contents)
+
+        # Find the copy to base the computation for ota sequence number on
+        otadata_compute_base = -1
+
+        # Both are valid, take the max as computation base
+        if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
+            if otadata_status[0].seq >= otadata_status[1].seq:
+                otadata_compute_base = 0
+            else:
+                otadata_compute_base = 1
+        # Only one copy is valid, use that
+        elif is_otadata_status_valid(otadata_status[0]):
             otadata_compute_base = 0
-        else:
+        elif is_otadata_status_valid(otadata_status[1]):
             otadata_compute_base = 1
-    # Only one copy is valid, use that
-    elif is_otadata_status_valid(otadata_status[0]):
-        otadata_compute_base = 0
-    elif is_otadata_status_valid(otadata_status[1]):
-        otadata_compute_base = 1
-    # Both are invalid (could be initial state - all 0xFF's)
-    else:
-        pass
+        # Both are invalid (could be initial state - all 0xFF's)
+        else:
+            pass
 
-    ota_seq_next = 0
-    ota_partitions_num = len(ota_partitions)
+        ota_seq_next = 0
+        ota_partitions_num = len(ota_partitions)
 
-    target_seq = (ota_partition_next.subtype & 0x0F) + 1
+        target_seq = (ota_partition_next.subtype & 0x0F) + 1
 
-    # Find the next ota sequence number
-    if otadata_compute_base == 0 or otadata_compute_base == 1:
-        base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
+        # Find the next ota sequence number
+        if otadata_compute_base == 0 or otadata_compute_base == 1:
+            base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
 
-        i = 0
-        while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
-            i += 1
+            i = 0
+            while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
+                i += 1
 
-        ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
-    else:
-        ota_seq_next = target_seq
+            ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
+        else:
+            ota_seq_next = target_seq
 
-    # Create binary data from computed values
-    ota_seq_next = struct.pack("I", ota_seq_next)
-    ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
-    ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
+        # Create binary data from computed values
+        ota_seq_next = struct.pack("I", ota_seq_next)
+        ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
+        ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
 
-    with tempfile.NamedTemporaryFile() as otadata_next_file:
-        start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
+        with open(f_name, "wb") as otadata_next_file:
+            start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
 
-        otadata_next_file.write(otadata_contents)
+            otadata_next_file.write(otadata_contents)
 
-        otadata_next_file.seek(start)
-        otadata_next_file.write(ota_seq_next)
+            otadata_next_file.seek(start)
+            otadata_next_file.write(ota_seq_next)
 
-        otadata_next_file.seek(start + 28)
-        otadata_next_file.write(ota_seq_crc_next)
+            otadata_next_file.seek(start + 28)
+            otadata_next_file.write(ota_seq_crc_next)
 
-        otadata_next_file.flush()
+            otadata_next_file.flush()
 
-        _invoke_parttool(["write_partition", "--input", otadata_next_file.name], args)
+        _invoke_parttool(["write_partition", "--input", f_name], args)
         status("Updated ota_data partition")
+    finally:
+        os.unlink(f_name)
 
 
 def _get_partition_specifier(args):
index 1155e136f1dd3377f566ed3a48f2ba2a8653f4e2..14fcdf4fe75413b7f1bfe08f1e1095f7f72971ca 100755 (executable)
@@ -74,11 +74,19 @@ def _get_partition_table(args):
     else:
         port_info = (" on port " + args.port if args.port else "")
         status("Reading partition table from device{}...".format(port_info))
-        with tempfile.NamedTemporaryFile() as partition_table_file:
-            invoke_args = ["read_flash", str(gen.offset_part_table), str(gen.MAX_PARTITION_LENGTH), partition_table_file.name]
+
+        f_name = None
+        with tempfile.NamedTemporaryFile(delete=False) as f:
+            f_name = f.name
+
+        try:
+            invoke_args = ["read_flash", str(gen.offset_part_table), str(gen.MAX_PARTITION_LENGTH), f_name]
             _invoke_esptool(invoke_args, args)
-            partition_table = gen.PartitionTable.from_binary(partition_table_file.read())
-        status("Partition table read from device" + port_info)
+            with open(f_name, "rb") as f:
+                partition_table = gen.PartitionTable.from_binary(f.read())
+                status("Partition table read from device" + port_info)
+        finally:
+            os.unlink(f_name)
 
     return partition_table