]> granicus.if.org Git - esp-idf/commitdiff
ota: implement otatool functionality
authorRenz Christian Bagaporo <renz@espressif.com>
Thu, 15 Nov 2018 21:00:27 +0000 (05:00 +0800)
committerRenz Christian Bagaporo <renz@espressif.com>
Wed, 28 Nov 2018 07:57:20 +0000 (15:57 +0800)
components/app_update/dump_otadata.py [deleted file]
components/app_update/gen_empty_partition.py [deleted file]
components/app_update/otatool.py [new file with mode: 0755]
examples/system/ota/README.md
tools/ci/executable-list.txt

diff --git a/components/app_update/dump_otadata.py b/components/app_update/dump_otadata.py
deleted file mode 100755 (executable)
index 040bd2b..0000000
+++ /dev/null
@@ -1,88 +0,0 @@
-#!/usr/bin/env python
-#
-# gen_otadata prints info about the otadata partition.
-#
-# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http:#www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from __future__ import print_function, division
-import argparse
-import os
-import re
-import struct
-import sys
-import hashlib
-import binascii
-
-__version__ = '1.0'
-
-quiet = False
-
-def status(msg):
-    """ Print status message to stderr """
-    if not quiet:
-        critical(msg)
-
-def critical(msg):
-    """ Print critical message to stderr """
-    if not quiet:
-        sys.stderr.write(msg)
-        sys.stderr.write('\n')
-
-def little_endian(buff, offset):
-    data = buff[offset:offset+4]
-    data.reverse()
-    data = ''.join(data)
-    return data
-               
-def main():
-    global quiet
-    parser = argparse.ArgumentParser(description='Prints otadata partition in human readable form.')
-
-    parser.add_argument('--quiet', '-q', help="Don't print status messages to stderr", action='store_true')
-
-    search_type = parser.add_mutually_exclusive_group()
-
-    parser.add_argument('input', help='Path to binary file containing otadata partition to parse.',
-                        type=argparse.FileType('rb'))
-
-    args = parser.parse_args()
-
-    quiet = args.quiet
-
-    input = args.input.read()
-    
-    hex_input_0 = binascii.hexlify(input)
-    hex_input_0 = map(''.join, zip(*[iter(hex_input_0)]*2))
-    hex_input_1 = binascii.hexlify(input[4096:])
-    hex_input_1 = map(''.join, zip(*[iter(hex_input_1)]*2))
-    
-    print("\t%11s\t%8s    |\t%8s\t%8s" %("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
-    print("Firmware:  0x%s  \t 0x%s |\t0x%s \t 0x%s" % (little_endian(hex_input_0, 0), little_endian(hex_input_0, 28), \
-                                                        little_endian(hex_input_1, 0), little_endian(hex_input_1, 28)))
-class InputError(RuntimeError):
-    def __init__(self, e):
-        super(InputError, self).__init__(e)
-
-class ValidationError(InputError):
-    def __init__(self, partition, message):
-        super(ValidationError, self).__init__(
-            "Partition %s invalid: %s" % (partition.name, message))
-
-if __name__ == '__main__':
-    try:
-        r = main()
-        sys.exit(r)
-    except InputError as e:
-        print(e, file=sys.stderr)
-        sys.exit(2)
diff --git a/components/app_update/gen_empty_partition.py b/components/app_update/gen_empty_partition.py
deleted file mode 100755 (executable)
index 365b2ad..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-#!/usr/bin/env python
-#
-# generates an empty binary file
-# 
-# This tool generates an empty binary file of the required size.
-#
-# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http:#www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from __future__ import print_function, division
-from __future__ import unicode_literals
-import argparse
-import os
-import re
-import struct
-import sys
-import hashlib
-import binascii
-
-__version__ = '1.0'
-
-quiet = False
-
-def status(msg):
-    """ Print status message to stderr """
-    if not quiet:
-        critical(msg)
-
-def critical(msg):
-    """ Print critical message to stderr """
-    if not quiet:
-        sys.stderr.write(msg)
-        sys.stderr.write('\n')
-
-def generate_blanked_file(size, output_path):
-    output = b"\xFF" * size
-    try:
-        stdout_binary = sys.stdout.buffer  # Python 3
-    except AttributeError:
-        stdout_binary = sys.stdout
-    with stdout_binary if output_path == '-' else open(output_path, 'wb') as f:
-        f.write(output)
-
-def main():
-    global quiet
-    parser = argparse.ArgumentParser(description='Generates an empty binary file of the required size.')
-
-    parser.add_argument('--quiet', '-q', help="Don't print status messages to stderr", action='store_true')
-
-    parser.add_argument('--size', help='Size of generated the file',  type=str, required=True)
-    
-    parser.add_argument('output', help='Path for binary file.', nargs='?', default='-')
-    args = parser.parse_args()
-
-    quiet = args.quiet
-
-    size = int(args.size, 0)
-    if size > 0 :
-        generate_blanked_file(size, args.output)
-    return 0
-    
-class InputError(RuntimeError):
-    def __init__(self, e):
-        super(InputError, self).__init__(e)
-
-if __name__ == '__main__':
-    try:
-        r = main()
-        sys.exit(r)
-    except InputError as e:
-        print(e, file=sys.stderr)
-        sys.exit(2)
diff --git a/components/app_update/otatool.py b/components/app_update/otatool.py
new file mode 100755 (executable)
index 0000000..f7216b0
--- /dev/null
@@ -0,0 +1,327 @@
+#!/usr/bin/env python
+#
+# otatool is used to perform ota-level operations - flashing ota partition
+# erasing ota partition and switching ota partition
+#
+# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function, division
+import argparse
+import os
+import sys
+import binascii
+import subprocess
+import tempfile
+import collections
+import struct
+
+__version__ = '1.0'
+
+IDF_COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
+
+PARTTOOL_PY = os.path.join(IDF_COMPONENTS_PATH, "partition_table", "parttool.py")
+
+SPI_FLASH_SEC_SIZE = 0x2000
+
+quiet = False
+
+
+def status(msg):
+    if not quiet:
+        print(msg)
+
+
+def _invoke_parttool(parttool_args, args, output=False, partition=None):
+    invoke_args = []
+
+    if partition:
+        invoke_args += [sys.executable, PARTTOOL_PY] + partition
+    else:
+        invoke_args += [sys.executable, PARTTOOL_PY, "--partition-type", "data", "--partition-subtype", "ota"]
+
+    if quiet:
+        invoke_args += ["-q"]
+
+    if args.port != "":
+        invoke_args += ["--port", args.port]
+
+    if args.partition_table_file:
+        invoke_args += ["--partition-table-file", args.partition_table_file]
+
+    if args.partition_table_offset:
+        invoke_args += ["--partition-table-offset", args.partition_table_offset]
+
+    invoke_args += parttool_args
+
+    if output:
+        return subprocess.check_output(invoke_args)
+    else:
+        return subprocess.check_call(invoke_args)
+
+
+def _get_otadata_contents(args, check=True):
+    global quiet
+
+    if check:
+        check_args = ["get_partition_info", "--info", "offset", "size"]
+
+        quiet = True
+        output = _invoke_parttool(check_args, args, True).split(b" ")
+        quiet = args.quiet
+
+        if not output:
+            raise RuntimeError("No ota_data partition found")
+
+    with tempfile.NamedTemporaryFile() as otadata_file:
+        invoke_args = ["read_partition", "--output", otadata_file.name]
+        _invoke_parttool(invoke_args, args)
+        return otadata_file.read()
+
+
+def _get_otadata_status(otadata_contents):
+    status = []
+
+    otadata_status = collections.namedtuple("otadata_status", "seq crc")
+
+    for i in range(2):
+        start = i * (SPI_FLASH_SEC_SIZE >> 1)
+
+        seq = bytearray(otadata_contents[start:start + 4])
+        crc = bytearray(otadata_contents[start + 28:start + 32])
+
+        seq = struct.unpack('>I', seq)
+        crc = struct.unpack('>I', crc)
+
+        status.append(otadata_status(seq[0], crc[0]))
+
+    return status
+
+
+def read_otadata(args):
+    status("Reading ota_data partition contents...")
+    otadata_info = _get_otadata_contents(args)
+    otadata_info = _get_otadata_status(otadata_info)
+
+    print(otadata_info)
+
+    print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
+    print("Firmware:  0x{:8x}  \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc,
+                                                                        otadata_info[1].seq, otadata_info[1].crc))
+
+
+def erase_otadata(args):
+    status("Erasing ota_data partition contents...")
+    _invoke_parttool(["erase_partition"], args)
+    status("Erased ota_data partition contents")
+
+
+def switch_otadata(args):
+    sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
+    import gen_esp32part as gen
+
+    def is_otadata_status_valid(status):
+        seq = status.seq % (1 << 32)
+        crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
+        return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
+
+    status("Looking for ota app partitions...")
+
+    # In order to get the number of ota app partitions, we need the partition table
+    partition_table = None
+
+    with tempfile.NamedTemporaryFile() as partition_table_file:
+        invoke_args = ["get_partition_info", "--table", partition_table_file.name]
+
+        _invoke_parttool(invoke_args, args)
+
+        partition_table = partition_table_file.read()
+        partition_table = gen.PartitionTable.from_binary(partition_table)
+
+    ota_partitions = list()
+
+    for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
+        ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
+
+        try:
+            ota_partitions.append(list(ota_partition)[0])
+        except IndexError:
+            break
+
+    ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
+
+    if not ota_partitions:
+        raise RuntimeError("No ota app partitions found")
+
+    status("Verifying partition to switch to exists...")
+
+    # Look for the app partition to switch to
+    ota_partition_next = None
+
+    try:
+        if args.name:
+            ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
+        else:
+            ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA  == args.slot, ota_partitions)
+
+        ota_partition_next = list(ota_partition_next)[0]
+    except IndexError:
+        raise RuntimeError("Partition to switch to not found")
+
+    otadata_contents = _get_otadata_contents(args)
+    otadata_status = _get_otadata_status(otadata_contents)
+
+    # Find the copy to base the computation for ota sequence number on
+    otadata_compute_base = -1
+
+    # Both are valid, take the max as computation base
+    if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
+        if otadata_status[0].seq >= otadata_status[1].seq:
+            otadata_compute_base = 0
+        else:
+            otadata_compute_base = 1
+    # Only one copy is valid, use that
+    elif is_otadata_status_valid(otadata_status[0]):
+        otadata_compute_base = 0
+    elif is_otadata_status_valid(otadata_status[1]):
+        otadata_compute_base = 1
+    # Both are invalid (could be initial state - all 0xFF's)
+    else:
+        pass
+
+    ota_seq_next = 0
+    ota_partitions_num = len(ota_partitions)
+
+    target_seq = (ota_partition_next.subtype & 0x0F) + 1
+
+    # Find the next ota sequence number
+    if otadata_compute_base == 0 or otadata_compute_base == 1:
+        base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
+
+        i = 0
+        while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
+            i += 1
+
+        ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
+    else:
+        ota_seq_next = target_seq
+
+    # Create binary data from computed values
+    ota_seq_next = struct.pack("I", ota_seq_next)
+    ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
+    ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
+
+    with tempfile.NamedTemporaryFile() as otadata_next_file:
+        start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
+
+        otadata_next_file.write(otadata_contents)
+
+        otadata_next_file.seek(start)
+        otadata_next_file.write(ota_seq_next)
+
+        otadata_next_file.seek(start + 28)
+        otadata_next_file.write(ota_seq_crc_next)
+
+        otadata_next_file.flush()
+
+        _invoke_parttool(["write_partition", "--input", otadata_next_file.name], args)
+        status("Updated ota_data partition")
+
+
+def _get_partition_specifier(args):
+    if args.name:
+        return ["--partition-name", args.name]
+    else:
+        return ["--partition-type", "app", "--partition-subtype", "ota_" + str(args.slot)]
+
+
+def read_ota_partition(args):
+    invoke_args = ["read_partition", "--output", args.output]
+    _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
+    status("Read ota partition contents to file {}".format(args.output))
+
+
+def write_ota_partition(args):
+    invoke_args = ["write_partition", "--input", args.input]
+    _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
+    status("Written contents of file {} to ota partition".format(args.input))
+
+
+def erase_ota_partition(args):
+    invoke_args = ["erase_partition"]
+    _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
+    status("Erased contents of ota partition")
+
+
+def main():
+    global quiet
+
+    parser = argparse.ArgumentParser("ESP-IDF OTA Partitions Tool")
+
+    parser.add_argument("--quiet", "-q", help="suppress stderr messages", action="store_true")
+
+    # There are two possible sources for the partition table: a device attached to the host
+    # or a partition table CSV/binary file. These sources are mutually exclusive.
+    partition_table_info_source_args = parser.add_mutually_exclusive_group()
+
+    partition_table_info_source_args.add_argument("--port", "-p", help="port where the device to read the partition table from is attached", default="")
+    partition_table_info_source_args.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from", default="")
+
+    parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", default="0x8000")
+
+    subparsers = parser.add_subparsers(dest="operation", help="run otatool -h for additional help")
+
+    # Specify the supported operations
+    subparsers.add_parser("read_otadata", help="read otadata partition")
+    subparsers.add_parser("erase_otadata", help="erase otadata partition")
+
+    slot_or_name_parser = argparse.ArgumentParser(add_help=False)
+    slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
+    slot_or_name_parser_args.add_argument("--slot", help="slot number of the ota partition", type=int)
+    slot_or_name_parser_args.add_argument("--name", help="name of the ota partition")
+
+    subparsers.add_parser("switch_otadata", help="switch otadata partition", parents=[slot_or_name_parser])
+
+    read_ota_partition_subparser = subparsers.add_parser("read_ota_partition", help="read contents of an ota partition", parents=[slot_or_name_parser])
+    read_ota_partition_subparser.add_argument("--output", help="file to write the contents of the ota partition to")
+
+    write_ota_partition_subparser = subparsers.add_parser("write_ota_partition", help="write contents to an ota partition", parents=[slot_or_name_parser])
+    write_ota_partition_subparser.add_argument("--input", help="file whose contents to write to the ota partition")
+
+    subparsers.add_parser("erase_ota_partition", help="erase contents of an ota partition", parents=[slot_or_name_parser])
+
+    args = parser.parse_args()
+
+    quiet = args.quiet
+
+    # No operation specified, display help and exit
+    if args.operation is None:
+        if not quiet:
+            parser.print_help()
+        sys.exit(1)
+
+    # Else execute the operation
+    operation_func = globals()[args.operation]
+
+    if quiet:
+        # If exceptions occur, suppress and exit quietly
+        try:
+            operation_func(args)
+        except Exception:
+            sys.exit(2)
+    else:
+        operation_func(args)
+
+
+if __name__ == '__main__':
+    main()
index 2641419380f67a777c36a78cbe95782c984bc668..a5b77ea03d28c699c3dd3239e1bbdbddb2dbf57b 100644 (file)
@@ -97,8 +97,8 @@ It allows to run the newly loaded app from a factory partition.
 make flash
 ```
 
-After first update, if you want to return back to factory app (or the first OTA partition, if factory partition is not present) then use the command `make erase_ota`. 
-It erases ota_data partition to initial.
+After first update, if you want to return back to factory app (or the first OTA partition, if factory partition is not present) then use the command `make erase_otadata`. 
+It erases the ota_data partition to initial state. **Take note that this assumes that the partition table of this project is the one that is on the device**.
 
 ### Step 5: Run the OTA Example
 
index 6c3756fe6f0afe4d7b9dc13aead334597465aa2b..deb554c90a1c223aa17662c28d00972423d18a40 100644 (file)
@@ -8,7 +8,7 @@ components/nvs_flash/nvs_partition_generator/nvs_partition_gen.py
 components/partition_table/gen_esp32part.py
 components/partition_table/parttool.py
 components/app_update/gen_empty_partition.py
-components/app_update/dump_otadata.py
+components/app_update/otatool.py
 components/partition_table/test_gen_esp32part_host/gen_esp32part_tests.py
 components/ulp/esp32ulp_mapgen.py
 docs/check_doc_warnings.sh