3 # esp-idf NVS partition generation tool. Tool helps in generating NVS-compatible
4 # partition binary, with key-value pair entries provided via a CSV file.
6 # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
21 from __future__ import division, print_function #, unicode_literals
22 from future.utils import raise_
23 from builtins import int, range
35 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
36 from cryptography.hazmat.backends import default_backend
38 """ Class for standard NVS page structure """
42 "max_old_blob_size": 1984,
43 "max_new_blob_size": 4000,
61 BITMAPARRAY_OFFSET = 32
62 BITMAPARRAY_SIZE_IN_BYTES = 32
63 FIRST_ENTRY_OFFSET = 64
64 SINGLE_ENTRY_SIZE = 32
71 def __init__(self, page_num, is_rsrv_page=False):
73 self.is_encrypt = False
75 self.bitmap_array = array.array('B')
76 self.version = Page.VERSION2
77 self.page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"]
79 self.bitmap_array = self.create_bitmap_array()
80 self.set_header(page_num)
82 def set_header(self, page_num):
85 # set page state to active
86 page_header= bytearray(b'\xff') *32
87 page_state_active_seq = Page.ACTIVE
88 struct.pack_into('<I', page_header, 0, page_state_active_seq)
89 # set page sequence number
90 struct.pack_into('<I', page_header, 4, page_num)
92 if version == Page.VERSION2:
93 page_header[8] = Page.VERSION2
94 elif version == Page.VERSION1:
95 page_header[8] = Page.VERSION1
97 crc_data = bytes(page_header[4:28])
98 crc = zlib.crc32(crc_data, 0xFFFFFFFF)
99 struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF)
100 self.page_buf[0:len(page_header)] = page_header
103 def create_bitmap_array(self):
104 bitarray = array.array('B')
105 charsize = 32 # bitmaparray has 256 bits, hence 32 bytes
106 fill = 255 # Fill all 8 bits with 1's
107 bitarray.extend((fill,) * charsize)
111 def write_bitmaparray(self):
112 bitnum = self.entry_num * 2
113 byte_idx = bitnum // 8 # Find byte index in the array
114 bit_offset = bitnum & 7 # Find bit offset in given byte index
115 mask = ~(1 << bit_offset)
116 self.bitmap_array[byte_idx] &= mask
117 start_idx = Page.BITMAPARRAY_OFFSET
118 end_idx = Page.BITMAPARRAY_OFFSET + Page.BITMAPARRAY_SIZE_IN_BYTES
119 self.page_buf[start_idx:end_idx] = self.bitmap_array
122 def encrypt_entry(self, data_arr, tweak_arr, encr_key):
123 # Encrypt 32 bytes of data using AES-XTS encryption
124 backend = default_backend()
125 plain_text = codecs.decode(data_arr, 'hex')
126 tweak = codecs.decode(tweak_arr, 'hex')
128 cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend)
129 encryptor = cipher.encryptor()
130 encrypted_data = encryptor.update(plain_text)
132 return encrypted_data
135 def reverse_hexbytes(self, addr_tmp):
138 for i in range(0, len(addr_tmp), 2):
139 addr.append(addr_tmp[i:i+2])
140 reversed_bytes = "".join(reversed(addr))
142 return reversed_bytes
145 def encrypt_data(self, data_input, no_of_entries, nvs_obj):
146 # Set values needed for encryption and encrypt data byte wise
147 encr_data_to_write = bytearray()
148 data_len_needed = 64 #in hex
149 tweak_len_needed = 32 #in hex
153 encr_key_input = None
156 # Extract encryption key and tweak key from given key input
157 encr_key_input = codecs.decode(self.encr_key, 'hex')
159 rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS["max_size"] + Page.FIRST_ENTRY_OFFSET
161 if not isinstance(data_input, bytearray):
162 byte_arr = bytearray(b'\xff') * 32
163 byte_arr[0:len(data_input)] = data_input
164 data_input = byte_arr
166 data_input = binascii.hexlify(data_input)
168 entry_no = self.entry_num
170 end_idx = start_idx + 64
172 for _ in range(0, no_of_entries):
174 offset = entry_no * Page.SINGLE_ENTRY_SIZE
175 addr = hex(rel_addr + offset)[2:]
180 tweak_tmp = self.reverse_hexbytes(addr_tmp)
181 tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp))))
183 addr_tmp = init_tweak_val + addr
184 tweak_tmp = self.reverse_hexbytes(addr_tmp)
185 tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp))))
187 tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr)))
190 data_bytes = data_input[start_idx:end_idx]
191 if type(data_bytes) == bytes:
192 data_bytes = data_bytes.decode()
194 data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
195 encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input)
196 encr_data_to_write = encr_data_to_write + encr_data_ret
197 # Update values for encrypting next set of data bytes
199 end_idx = start_idx + 64
202 return encr_data_to_write
205 def write_entry_to_buf(self, data, entrycount,nvs_obj):
206 encr_data = bytearray()
209 encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj)
210 if sys.version_info[0] < 3:
211 encr_data[0:len(encr_data_ret)] = encr_data_ret
213 encr_data[0:len(encr_data_ret)] = encr_data_ret
217 data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num)
218 start_idx = data_offset
219 end_idx = data_offset + len(data)
220 if not sys.version_info[0] < 3:
221 if type(data) == str:
222 self.page_buf[start_idx:end_idx] = data
224 self.page_buf[start_idx:end_idx] = data
226 self.page_buf[start_idx:end_idx] = data
229 # Set bitmap array for entries in current page
230 for i in range(0, entrycount):
231 self.write_bitmaparray()
235 def set_crc_header(self, entry_struct):
236 crc_data = bytearray(b'28')
237 crc_data[0:4] = entry_struct[0:4]
238 crc_data[4:28] = entry_struct[8:32]
239 crc_data = bytes(crc_data)
240 crc = zlib.crc32(crc_data, 0xFFFFFFFF)
241 struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
245 def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count,nvs_obj):
248 chunk_index = Page.CHUNK_ANY
250 remaining_size = data_size
256 # Get the size available in current page
257 tailroom = (Page.PAGE_PARAMS["max_entries"] - self.entry_num - 1) * Page.SINGLE_ENTRY_SIZE
258 assert tailroom >=0, "Page overflow!!"
260 # Split the binary data into two and store a chunk of available size onto curr page
261 if tailroom < remaining_size:
262 chunk_size = tailroom
264 chunk_size = remaining_size
266 remaining_size = remaining_size - chunk_size
268 # Change type of data to BLOB_DATA
269 entry_struct[1] = Page.BLOB_DATA
271 # Calculate no. of entries data chunk will require
272 datachunk_rounded_size = (chunk_size + 31) & ~31
273 datachunk_entry_count = datachunk_rounded_size // 32
274 datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header
277 entry_struct[2] = datachunk_total_entry_count
279 # Update the chunkIndex
280 chunk_index = chunk_start + chunk_count
281 entry_struct[3] = chunk_index
284 data_chunk = data[offset:offset + chunk_size]
286 # Compute CRC of data chunk
287 struct.pack_into('<H', entry_struct, 24, chunk_size)
288 data_chunk = bytes(data_chunk)
289 crc = zlib.crc32(data_chunk, 0xFFFFFFFF)
290 struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
292 # compute crc of entry header
293 entry_struct = self.set_crc_header(entry_struct)
296 self.write_entry_to_buf(entry_struct, 1,nvs_obj)
298 self.write_entry_to_buf(data_chunk, datachunk_entry_count,nvs_obj)
300 chunk_count = chunk_count + 1
302 if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE:
303 if page_header[0:4] != Page.FULL:
304 page_state_full_seq = Page.FULL
305 struct.pack_into('<I', page_header, 0, page_state_full_seq)
306 nvs_obj.create_new_page()
307 self = nvs_obj.cur_page
309 offset = offset + chunk_size
312 # All chunks are stored, now store the index
313 if not remaining_size:
314 # Initialise data field to 0xff
315 data_array = bytearray(b'\xff')*8
316 entry_struct[24:32] = data_array
318 # change type of data to BLOB_IDX
319 entry_struct[1] = Page.BLOB_IDX
324 # Update the chunkIndex
325 chunk_index = Page.CHUNK_ANY
326 entry_struct[3] = chunk_index
328 struct.pack_into('<I', entry_struct, 24, data_size)
329 entry_struct[28] = chunk_count
330 entry_struct[29] = chunk_start
332 # compute crc of entry header
333 entry_struct = self.set_crc_header(entry_struct)
336 self.write_entry_to_buf(entry_struct, 1,nvs_obj)
342 def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj):
343 # compute CRC of data
344 struct.pack_into('<H', entry_struct, 24, datalen)
345 if not type(data) == bytes:
347 crc = zlib.crc32(data, 0xFFFFFFFF)
348 struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
350 # compute crc of entry header
351 entry_struct = self.set_crc_header(entry_struct)
354 self.write_entry_to_buf(entry_struct, 1, nvs_obj)
356 self.write_entry_to_buf(data, data_entry_count, nvs_obj)
360 Low-level function to write variable length data into page buffer. Data should be formatted
361 according to encoding specified.
363 def write_varlen_data(self, key, data, encoding, ns_index,nvs_obj):
367 if version == Page.VERSION1:
368 if datalen > Page.PAGE_PARAMS["max_old_blob_size"]:
369 raise InputError("%s: Size exceeds max allowed length." % key)
371 if version == Page.VERSION2:
372 if encoding == "string":
373 if datalen > Page.PAGE_PARAMS["max_new_blob_size"]:
374 raise InputError("%s: Size exceeds max allowed length." % key)
376 # Calculate no. of entries data will require
377 rounded_size = (datalen + 31) & ~31
378 data_entry_count = rounded_size // 32
379 total_entry_count = data_entry_count + 1 # +1 for the entry header
381 # Check if page is already full and new page is needed to be created right away
382 if encoding == "string":
383 if (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]:
384 raise PageFullError()
387 entry_struct = bytearray(b'\xff')*32
388 # Set Namespace Index
389 entry_struct[0] = ns_index
391 if version == Page.VERSION2:
392 if encoding == "string":
393 entry_struct[2] = data_entry_count + 1
395 chunk_index = Page.CHUNK_ANY
396 entry_struct[3] = chunk_index
398 entry_struct[2] = data_entry_count + 1
401 key_array = b'\x00' * 16
402 entry_struct[8:24] = key_array
403 entry_struct[8:8 + len(key)] = key.encode()
406 if encoding == "string":
407 entry_struct[1] = Page.SZ
408 elif encoding in ["hex2bin", "binary", "base64"]:
409 entry_struct[1] = Page.BLOB
411 if version == Page.VERSION2 and (encoding in ["hex2bin", "binary", "base64"]):
412 entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data,\
413 datalen,total_entry_count, nvs_obj)
415 self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj)
419 """ Low-level function to write data of primitive type into page buffer. """
420 def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj):
421 # Check if entry exceeds max number of entries allowed per page
422 if self.entry_num >= Page.PAGE_PARAMS["max_entries"]:
423 raise PageFullError()
425 entry_struct = bytearray(b'\xff')*32
426 entry_struct[0] = ns_index # namespace index
427 entry_struct[2] = 0x01 # Span
428 chunk_index = Page.CHUNK_ANY
429 entry_struct[3] = chunk_index
432 key_array = b'\x00' *16
433 entry_struct[8:24] = key_array
434 entry_struct[8:8 + len(key)] = key.encode()
437 entry_struct[1] = Page.U8
438 struct.pack_into('<B', entry_struct, 24, data)
439 elif encoding == "i8":
440 entry_struct[1] = Page.I8
441 struct.pack_into('<b', entry_struct, 24, data)
442 elif encoding == "u16":
443 entry_struct[1] = Page.U16
444 struct.pack_into('<H', entry_struct, 24, data)
445 elif encoding == "u32":
446 entry_struct[1] = Page.U32
447 struct.pack_into('<I', entry_struct, 24, data)
448 elif encoding == "i32":
449 entry_struct[1] = Page.I32
450 struct.pack_into('<i', entry_struct, 24, data)
453 crc_data = bytearray(b'28')
454 crc_data[0:4] = entry_struct[0:4]
455 crc_data[4:28] = entry_struct[8:32]
456 crc_data = bytes(crc_data)
457 crc = zlib.crc32(crc_data, 0xFFFFFFFF)
458 struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
461 self.write_entry_to_buf(entry_struct, 1,nvs_obj)
463 """ Get page buffer data of a given page """
468 NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs. Binary can later be flashed onto device via a flashing utility.
471 def __init__(self, fout, input_size):
472 self.size = input_size
473 self.namespace_idx = 0
476 self.cur_page = self.create_new_page()
482 def __exit__(self, exc_type, exc_value, traceback):
483 if exc_type == None and exc_value == None:
484 # Create pages for remaining available size
487 new_page = self.create_new_page()
488 except InsufficientSizeError:
490 # Creating the last reserved page
491 self.create_new_page(is_rsrv_page=True)
494 result = self.get_binary_data()
495 self.fout.write(result)
497 def create_new_page(self, is_rsrv_page=False):
498 # Update available size as each page is created
500 raise InsufficientSizeError("Size parameter is is less than the size of data in csv.Please increase size.")
502 self.size = self.size - Page.PAGE_PARAMS["max_size"]
504 new_page = Page(self.page_num, is_rsrv_page)
505 new_page.version = version
506 new_page.is_encrypt = is_encrypt_data
507 if new_page.is_encrypt:
508 new_page.encr_key = key_input
509 self.pages.append(new_page)
510 self.cur_page = new_page
514 Write namespace entry and subsequently increase namespace count so that all upcoming entries
515 will be mapped to a new namespace.
517 def write_namespace(self, key):
518 self.namespace_idx += 1
520 self.cur_page.write_primitive_data(key, self.namespace_idx, "u8", 0,self)
521 except PageFullError:
522 new_page = self.create_new_page()
523 new_page.write_primitive_data(key, self.namespace_idx, "u8", 0,self)
526 Write key-value pair. Function accepts value in the form of ascii character and converts
527 it into appropriate format before calling Page class's functions to write entry into NVS format.
528 Function handles PageFullError and creates a new page and re-invokes the function on a new page.
529 We don't have to guard re-invocation with try-except since no entry can span multiple pages.
531 def write_entry(self, key, value, encoding):
532 if encoding == "hex2bin":
533 if len(value) % 2 != 0:
534 raise InputError("%s: Invalid data length. Should be multiple of 2." % key)
535 value = binascii.a2b_hex(value)
537 if encoding == "base64":
538 value = binascii.a2b_base64(value)
540 if encoding == "string":
541 if type(value) == bytes:
542 value = value.decode()
545 encoding = encoding.lower()
546 varlen_encodings = ["string", "binary", "hex2bin", "base64"]
547 primitive_encodings = ["u8", "i8", "u16", "u32", "i32"]
548 if encoding in varlen_encodings:
550 self.cur_page.write_varlen_data(key, value, encoding, self.namespace_idx,self)
551 except PageFullError:
552 new_page = self.create_new_page()
553 new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self)
554 elif encoding in primitive_encodings:
556 self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
557 except PageFullError:
558 new_page = self.create_new_page()
559 new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
561 raise InputError("%s: Unsupported encoding" % encoding)
563 """ Return accumulated data of all pages """
564 def get_binary_data(self):
566 for page in self.pages:
567 data += page.get_data()
570 class PageFullError(RuntimeError):
572 Represents error when current page doesn't have sufficient entries left
573 to accommodate current request
576 super(PageFullError, self).__init__()
578 class InputError(RuntimeError):
580 Represents error on the input
582 def __init__(self, e):
583 super(InputError, self).__init__(e)
585 class InsufficientSizeError(RuntimeError):
587 Represents error when NVS Partition size given is insufficient
588 to accomodate the data in the given csv file
590 def __init__(self, e):
591 super(InsufficientSizeError, self).__init__(e)
593 def nvs_open(result_obj, input_size):
594 """ Wrapper to create and NVS class object. This object can later be used to set key-value pairs
596 :param result_obj: File/Stream object to dump resultant binary. If data is to be dumped into memory, one way is to use BytesIO object
597 :param input_size: Size of Partition
598 :return: NVS class instance
600 return NVS(result_obj, input_size)
602 def write_entry(nvs_instance, key, datatype, encoding, value):
603 """ Wrapper to set key-value pair in NVS format
605 :param nvs_instance: Instance of an NVS class returned by nvs_open()
606 :param key: Key of the data
607 :param datatype: Data type. Valid values are "file", "data" and "namespace"
608 :param encoding: Data encoding. Valid values are "u8", "i8", "u16", "u32", "i32", "string", "binary", "hex2bin" and "base64"
609 :param value: Data value in ascii encoded string format for "data" datatype and filepath for "file" datatype
613 if datatype == "file":
614 abs_file_path = value
615 if os.path.isabs(value) == False:
616 script_dir = os.path.dirname(__file__)
617 abs_file_path = os.path.join(script_dir, value)
619 with open(abs_file_path, 'rb') as f:
622 if datatype == "namespace":
623 nvs_instance.write_namespace(key)
625 nvs_instance.write_entry(key, value, encoding)
627 def nvs_close(nvs_instance):
628 """ Wrapper to finish writing to NVS and write data to file/stream object provided to nvs_open method
630 :param nvs_instance: Instance of NVS class returned by nvs_open()
633 nvs_instance.__exit__(None, None, None)
635 def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key_gen=None, encrypt_mode=None, key_file=None, version_no=None):
636 """ Wrapper to generate nvs partition binary
638 :param input_filename: Name of input file containing data
639 :param output_filename: Name of output file to store generated binary
640 :param input_size: Size of partition in bytes (must be multiple of 4096)
641 :param key_gen: Enable encryption key generation in encryption mode
642 :param encrypt_mode: Enable/Disable encryption mode
643 :param key_file: Input file having encryption keys in encryption mode
646 global version, is_encrypt_data, key_input
649 is_encrypt_data = encrypt_mode
652 input_size = int(input_size, 0)
654 if input_size % 4096 !=0:
655 sys.exit("Size of partition (must be multiple of 4096)")
658 version = Page.VERSION1
659 elif version == 'v2':
660 version = Page.VERSION2
662 # Update size as a page needs to be reserved of size 4KB
663 input_size = input_size - Page.PAGE_PARAMS["max_size"]
666 sys.exit("Size parameter is insufficient.")
668 if is_encrypt_data == 'True':
669 is_encrypt_data = True
670 elif is_encrypt_data == 'False':
671 is_encrypt_data = False
673 if key_gen == 'True':
675 elif key_gen == 'False':
678 if is_encrypt_data and not key_gen and not key_file:
679 sys.exit("Missing parameter. Enter --keyfile or --keygen.")
681 if is_encrypt_data and key_gen and key_file:
682 sys.exit("Only one input allowed. Enter --keyfile or --keygen.")
684 if not is_encrypt_data and key_gen:
685 sys.exit("Invalid. Cannot give --key_gen as --encrypt is set to False.")
687 if not is_encrypt_data and key_file:
688 sys.exit("Invalid. Cannot give --key_file as --encrypt is set to False.")
691 key_input = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip()
693 with open(key_file, 'rt', encoding='utf8') as key_f:
694 key_input = key_f.readline()
695 key_input = key_input.strip()
697 input_file = open(input_filename, 'rt', encoding='utf8')
698 output_file = open(output_filename, 'wb')
700 with nvs_open(output_file, input_size) as nvs_obj:
701 reader = csv.DictReader(input_file, delimiter=',')
704 write_entry(nvs_obj, row["key"], row["type"], row["encoding"], row["value"])
705 except (InputError) as e:
715 keys_page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"]
716 key_bytes = bytearray()
718 key_bytes = codecs.decode(key_input, 'hex')
719 key_len = len(key_bytes)
720 keys_page_buf[0:key_len] = key_bytes
722 crc_data = keys_page_buf[0:key_len]
723 crc_data = bytes(crc_data)
724 crc = zlib.crc32(crc_data, 0xFFFFFFFF)
726 struct.pack_into('<I', keys_page_buf, key_len, crc & 0xFFFFFFFF)
728 with open("encryption_keys.bin",'wb') as output_keys_file:
729 output_keys_file.write(keys_page_buf)
734 parser = argparse.ArgumentParser(description="ESP32 NVS partition generation utility")
737 help="Path to CSV file to parse. Will use stdin if omitted",
742 help='Path to output converted binary file. Will use stdout if omitted',
747 help='Size of NVS Partition in hex (must be multiple of 4096). Eg. 0x1000')
751 help='Set version. Default: v2',
757 help='Generate keys for encryption. Default: False (Applicable only if encryption mode is true)',
758 choices=['True','False'],
763 help='Set encryption mode. Default: False',
764 choices=['True','False'],
769 help='File having key for encryption (Applicable only if encryption mode is true)',
772 args = parser.parse_args()
773 input_filename = args.input
774 output_filename = args.output
775 input_size = args.size
776 version_no = args.version
778 key_gen = args.keygen
779 is_encrypt_data = args.encrypt
780 key_file = args.keyfile
782 nvs_part_gen(input_filename, output_filename, input_size, key_gen, is_encrypt_data, key_file, version_no)
786 if __name__ == "__main__":