import array
import csv
import zlib
+import codecs
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# set page state to active
page_header= bytearray(b'\xff')*32
page_state_active_seq = Page.ACTIVE
- page_header[0:4] = struct.pack('<I', page_state_active_seq)
+ struct.pack_into('<I', page_header, 0, page_state_active_seq)
# set page sequence number
- page_header[4:8] = struct.pack('<I', page_num)
+ struct.pack_into('<I', page_header, 4, page_num)
# set version
if version == Page.VERSION2:
page_header[8] = Page.VERSION2
page_header[8] = Page.VERSION1
# set header's CRC
crc_data = page_header[4:28]
- crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
- page_header[28:32] = struct.pack('<I', crc & 0xFFFFFFFF)
+ if sys.version_info[0] < 3:
+ crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
+ else:
+ crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
+ struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF)
self.page_buf[0:len(page_header)] = page_header
def encrypt_entry(self, data_arr, tweak_arr, encr_key):
# Encrypt 32 bytes of data using AES-XTS encryption
backend = default_backend()
- plain_text = data_arr.decode('hex')
- tweak = tweak_arr.decode('hex')
+ if sys.version_info[0] < 3:
+ plain_text = data_arr.decode('hex')
+ tweak = tweak_arr.decode('hex')
+ else:
+ plain_text = codecs.decode(data_arr, 'hex')
+ tweak = codecs.decode(tweak_arr, 'hex')
+
cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend)
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plain_text)
def encrypt_data(self, data_input, no_of_entries, nvs_obj):
# Set values needed for encryption and encrypt data byte wise
- encr_data_to_write = ''
+ encr_data_to_write = bytearray()
data_len_needed = 64 #in hex
tweak_len_needed = 32 #in hex
init_tweak_val = '0'
tweak_tmp = ''
encr_key_input = None
+
# Extract encryption key and tweak key from given key input
- encr_key_input = self.encr_key.decode('hex')
+ if sys.version_info[0] < 3:
+ encr_key_input = self.encr_key.decode('hex')
+ else:
+ encr_key_input = codecs.decode(self.encr_key, 'hex')
+
rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS["max_size"] + Page.FIRST_ENTRY_OFFSET
+
if type(data_input) != bytearray:
- byte_arr = bytearray('\xff') * 32
- byte_arr[0:len(data_input)] = data_input
+ byte_arr = bytearray(b'\xff') * 32
+ if sys.version_info[0] < 3:
+ byte_arr[0:len(data_input)] = data_input
+ else:
+ if type(data_input) == str:
+ byte_arr[0:len(data_input)] = data_input
+ else:
+ byte_arr[0:len(data_input)] = data_input
+
data_input = byte_arr
- data_input = binascii.hexlify(bytearray(data_input))
+
+ if sys.version_info[0] < 3:
+ data_input = binascii.hexlify(bytearray(data_input))
+ else:
+ data_input = data_input.hex()
entry_no = self.entry_num
start_idx = 0
# Encrypt data
data_bytes = data_input[start_idx:end_idx]
- data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
+ if sys.version_info[0] < 3:
+ data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
+ else:
+ data_val = str(data_bytes) + (init_data_val * (data_len_needed - len(data_bytes)))
+
encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input)
+ #print("\n<<<\n")
encr_data_to_write = encr_data_to_write + encr_data_ret
# Update values for encrypting next set of data bytes
start_idx = end_idx
encr_data = bytearray()
if self.is_encrypt:
encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj)
- encr_data[0:len(encr_data_ret)] = encr_data_ret
+ if sys.version_info[0] < 3:
+ encr_data[0:len(encr_data_ret)] = encr_data_ret
+ else:
+ encr_data[0:len(encr_data_ret)] = encr_data_ret
+
data = encr_data
data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num)
start_idx = data_offset
end_idx = data_offset + len(data)
- self.page_buf[start_idx:end_idx] = data
+ if not sys.version_info[0] < 3:
+ if type(data) == str:
+ self.page_buf[start_idx:end_idx] = data
+ else:
+ self.page_buf[start_idx:end_idx] = data
+ else:
+ self.page_buf[start_idx:end_idx] = data
+
# Set bitmap array for entries in current page
for i in range(0, entrycount):
def set_crc_header(self, entry_struct):
- crc_data = bytearray(28)
+ crc_data = bytearray(b'28')
crc_data[0:4] = entry_struct[0:4]
crc_data[4:28] = entry_struct[8:32]
- crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
- entry_struct[4:8] = struct.pack('<I', crc & 0xFFFFFFFF)
+ if sys.version_info[0] < 3:
+ crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
+ else:
+ crc = zlib.crc32(crc_data, 0xFFFFFFFF)
+ struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
return entry_struct
def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count,nvs_obj):
data_chunk = data[offset:offset + chunk_size]
# Compute CRC of data chunk
- entry_struct[24:26] = struct.pack('<H', chunk_size)
+ struct.pack_into('<H', entry_struct, 24, chunk_size)
+ if not sys.version_info[0] < 3:
+ if type(data_chunk) == str:
+ data_chunk = codecs.encode(data_chunk, 'utf8')
+
crc = zlib.crc32(data_chunk, 0xFFFFFFFF)
- entry_struct[28:32] = struct.pack('<I', crc & 0xFFFFFFFF)
+ struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
# compute crc of entry header
entry_struct = self.set_crc_header(entry_struct)
if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE:
if page_header[0:4] != Page.FULL:
page_state_full_seq = Page.FULL
- page_header[0:4] = struct.pack('<I', page_state_full_seq)
+ struct.pack_into('<I', page_header, 0, page_state_full_seq)
nvs_obj.create_new_page()
self = nvs_obj.cur_page
# All chunks are stored, now store the index
if not remaining_size:
# Initialise data field to 0xff
- data_array = bytearray('\xff')*8
+ data_array = bytearray(b'\xff')*8
entry_struct[24:32] = data_array
# change type of data to BLOB_IDX
chunk_index = Page.CHUNK_ANY
entry_struct[3] = chunk_index
- entry_struct[24:28] = struct.pack('<I', data_size)
+ struct.pack_into('<I', entry_struct, 24, data_size)
entry_struct[28] = chunk_count
entry_struct[29] = chunk_start
def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj):
# compute CRC of data
- entry_struct[24:26] = struct.pack('<H', datalen)
- crc = zlib.crc32(data, 0xFFFFFFFF)
- entry_struct[28:32] = struct.pack('<I', crc & 0xFFFFFFFF)
+ struct.pack_into('<H', entry_struct, 24, datalen)
+ if sys.version_info[0] < 3:
+ crc = zlib.crc32(data, 0xFFFFFFFF)
+ else:
+ if (type(data)) == str:
+ data = data.encode('utf8')
+ crc = zlib.crc32(data, 0xFFFFFFFF)
+ else:
+ crc = zlib.crc32(data, 0xFFFFFFFF)
+
+ struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
# compute crc of entry header
entry_struct = self.set_crc_header(entry_struct)
raise PageFullError()
# Entry header
- entry_struct = bytearray('\xff')*32
+ entry_struct = bytearray(b'\xff')*32
# Set Namespace Index
entry_struct[0] = ns_index
# Set Span
entry_struct[2] = data_entry_count + 1
# set key
- key_array = bytearray('\x00')*16
+ key_array = bytearray(b'\x00')*16
entry_struct[8:24] = key_array
- entry_struct[8:8 + len(key)] = key
+ if sys.version_info[0] < 3:
+ entry_struct[8:8 + len(key)] = key
+ else:
+ entry_struct[8:8 + len(key)] = key.encode('utf8')
# set Type
if encoding == "string":
if self.entry_num >= Page.PAGE_PARAMS["max_entries"]:
raise PageFullError()
- entry_struct = bytearray('\xff')*32
+ entry_struct = bytearray(b'\xff')*32
entry_struct[0] = ns_index # namespace index
entry_struct[2] = 0x01 # Span
chunk_index = Page.CHUNK_ANY
entry_struct[3] = chunk_index
# write key
- key_array = bytearray('\x00')*16
+ key_array = bytearray(b'\x00')*16
entry_struct[8:24] = key_array
- entry_struct[8:8 + len(key)] = key
+
+ if sys.version_info[0] < 3:
+ entry_struct[8:8 + len(key)] = key
+ else:
+ entry_struct[8:8 + len(key)] = key.encode('utf8')
+
if encoding == "u8":
entry_struct[1] = Page.U8
- entry_struct[24] = struct.pack('<B', data)
+ struct.pack_into('<B', entry_struct, 24, data)
elif encoding == "i8":
entry_struct[1] = Page.I8
- entry_struct[24] = struct.pack('<b', data)
+ struct.pack_into('<b', entry_struct, 24, data)
elif encoding == "u16":
entry_struct[1] = Page.U16
- entry_struct[24:26] = struct.pack('<H', data)
+ struct.pack_into('<H', entry_struct, 24, data)
elif encoding == "u32":
entry_struct[1] = Page.U32
- entry_struct[24:28] = struct.pack('<I', data)
+ struct.pack_into('<I', entry_struct, 24, data)
elif encoding == "i32":
entry_struct[1] = Page.I32
- entry_struct[24:28] = struct.pack('<i', data)
+ struct.pack_into('<i', entry_struct, 24, data)
# Compute CRC
- crc_data = bytearray(28)
+ crc_data = bytearray(b'28')
crc_data[0:4] = entry_struct[0:4]
crc_data[4:28] = entry_struct[8:32]
- crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
- entry_struct[4:8] = struct.pack('<I', crc & 0xFFFFFFFF)
+ if sys.version_info[0] < 3:
+ crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
+ else:
+ crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
+ struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
# write to file
self.write_entry_to_buf(entry_struct, 1,nvs_obj)
new_page = Page(self.page_num, is_rsrv_page)
new_page.version = version
new_page.is_encrypt = is_encrypt_data
- new_page.encr_key = key_input
+ if new_page.is_encrypt:
+ new_page.encr_key = key_input
self.pages.append(new_page)
self.cur_page = new_page
return new_page
if os.path.isabs(value) == False:
script_dir = os.path.dirname(__file__)
abs_file_path = os.path.join(script_dir, value)
- with open(abs_file_path, 'rb') as f:
- value = f.read()
+ if sys.version_info[0] < 3:
+ with open(abs_file_path, 'rb') as f:
+ value = f.read()
+ else:
+ with open(abs_file_path, 'r', newline='') as f:
+ value = f.read()
if datatype == "namespace":
nvs_instance.write_namespace(key)
:param input_filename: Name of input file containing data
:param output_filename: Name of output file to store generated binary
- :param input_size: Size of partition
+ :param input_size: Size of partition (must be multiple of 4096)
:param key_gen: Enable encryption key generation in encryption mode
- :param encryption_mode: Enable/Disable encryption mode
+ :param encrypt_mode: Enable/Disable encryption mode
:param key_file: Input file having encryption keys in encryption mode
:return: None
"""
is_encrypt_data = encrypt_mode
# Set size
- input_size = int(input_size.split('KB')[0]) * 1024
+ input_size = int(input_size, 16)
if input_size % 4096 !=0:
- sys.exit("Size parameter should be a multiple of 4KB.")
+ sys.exit("Size of partition (must be multiple of 4096)")
if version == 'v1':
version = Page.VERSION1
elif version == 'v2':
version = Page.VERSION2
-
+
# Update size as a page needs to be reserved of size 4KB
input_size = input_size - Page.PAGE_PARAMS["max_size"]
sys.exit("Invalid. Cannot give --key_file as --encrypt is set to False.")
if key_gen:
- key_input = ''.join(random.choice('0123456789abcdef') for _ in xrange(128))
+ if sys.version_info[0] < 3:
+ key_input = ''.join(random.choice('0123456789abcdef') for _ in xrange(128))
+ else:
+ key_input = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip()
elif key_file:
with open(key_file) as key_f:
key_input = key_f.readline()
key_input = key_input.strip()
- input_file = open(input_filename, 'rb')
+ if sys.version_info[0] < 3:
+ input_file = open(input_filename, 'rb')
+ else:
+ input_file = open(input_filename, 'r', newline='')
+
output_file = open(output_filename, 'wb')
with nvs_open(output_file, input_size) as nvs_obj:
if is_encrypt_data:
output_keys_file = open("encryption_keys.bin",'wb')
keys_page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"]
- key_bytes = key_input.decode('hex')
+
+ if sys.version_info[0] < 3:
+ key_bytes = key_input.decode('hex')
+ else:
+ key_bytes = bytearray()
+ key_bytes = codecs.decode(key_input, 'hex')
+
+
key_len = len(key_bytes)
keys_page_buf[0:key_len] = key_bytes
+
crc_data = keys_page_buf[0:key_len]
- crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
- keys_page_buf[64:68] = struct.pack('<I', crc & 0xFFFFFFFF)
+
+ if sys.version_info[0] < 3:
+ crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
+ else:
+ crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
+
+ struct.pack_into('<I', keys_page_buf, key_len, crc & 0xFFFFFFFF)
output_keys_file.write(keys_page_buf)
parser.add_argument(
"size",
- help='Size of NVS Partition in KB. Eg. 12KB')
+ help='Size of NVS Partition in hex (must be multiple of 4096). Eg. 0x1000')
parser.add_argument(
"--version",