Merge branch 'bugfix/dfu_split_large_bins_v4.2' into 'release/v4.2'

tools: Split up large binaries into smaller chunks in the DFU binary (v4.2)

See merge request espressif/esp-idf!13794
This commit is contained in:
Ivan Grokhotkov
2021-06-07 06:05:46 +00:00
6 changed files with 175 additions and 107 deletions

View File

@@ -6,9 +6,9 @@ def action_extensions(base_actions, project_path):
SUPPORTED_TARGETS = ['esp32s2'] SUPPORTED_TARGETS = ['esp32s2']
def dfu_target(target_name, ctx, args): def dfu_target(target_name, ctx, args, part_size):
ensure_build_directory(args, ctx.info_name) ensure_build_directory(args, ctx.info_name)
run_target(target_name, args) run_target(target_name, args, {'ESP_DFU_PART_SIZE': part_size} if part_size else {})
def dfu_flash_target(target_name, ctx, args): def dfu_flash_target(target_name, ctx, args):
ensure_build_directory(args, ctx.info_name) ensure_build_directory(args, ctx.info_name)
@@ -22,16 +22,24 @@ def action_extensions(base_actions, project_path):
raise raise
dfu_actions = { dfu_actions = {
"actions": { 'actions': {
"dfu": { 'dfu': {
"callback": dfu_target, 'callback': dfu_target,
"short_help": "Build the DFU binary", 'short_help': 'Build the DFU binary',
"dependencies": ["all"], 'dependencies': ['all'],
'options': [
{
'names': ['--part-size'],
'help': 'Large files are split up into smaller partitions in order to avoid timeout during '
'erasing flash. This option allows to overwrite the default partition size of '
'mkdfu.py.'
}
],
}, },
"dfu-flash": { 'dfu-flash': {
"callback": dfu_flash_target, 'callback': dfu_flash_target,
"short_help": "Flash the DFU binary", 'short_help': 'Flash the DFU binary',
"order_dependencies": ["dfu"], 'order_dependencies': ['dfu'],
}, },
} }
} }

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# #
# Copyright 2020 Espressif Systems (Shanghai) PTE LTD # Copyright 2020-2021 Espressif Systems (Shanghai) CO LTD
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@@ -21,14 +21,18 @@
# This file must be the first one in the archive. It contains binary structures describing each # This file must be the first one in the archive. It contains binary structures describing each
# subsequent file (for example, where the file needs to be flashed/loaded). # subsequent file (for example, where the file needs to be flashed/loaded).
from collections import namedtuple from __future__ import print_function, unicode_literals
from future.utils import iteritems
import argparse import argparse
import hashlib import hashlib
import json import json
import os import os
import struct import struct
import zlib import zlib
from collections import namedtuple
from functools import partial
from future.utils import iteritems
try: try:
import typing import typing
@@ -43,28 +47,28 @@ except ImportError:
pass pass
# CPIO ("new ASCII") format related things # CPIO ("new ASCII") format related things
CPIO_MAGIC = b"070701" CPIO_MAGIC = b'070701'
CPIO_STRUCT = b"=6s" + b"8s" * 13 CPIO_STRUCT = b'=6s' + b'8s' * 13
CPIOHeader = namedtuple( CPIOHeader = namedtuple(
"CPIOHeader", 'CPIOHeader',
[ [
"magic", 'magic',
"ino", 'ino',
"mode", 'mode',
"uid", 'uid',
"gid", 'gid',
"nlink", 'nlink',
"mtime", 'mtime',
"filesize", 'filesize',
"devmajor", 'devmajor',
"devminor", 'devminor',
"rdevmajor", 'rdevmajor',
"rdevminor", 'rdevminor',
"namesize", 'namesize',
"check", 'check',
], ],
) )
CPIO_TRAILER = "TRAILER!!!" CPIO_TRAILER = 'TRAILER!!!'
def make_cpio_header( def make_cpio_header(
@@ -73,7 +77,7 @@ def make_cpio_header(
""" Returns CPIOHeader for the given file name and file size """ """ Returns CPIOHeader for the given file name and file size """
def as_hex(val): # type: (int) -> bytes def as_hex(val): # type: (int) -> bytes
return "{:08x}".format(val).encode("ascii") return '{:08x}'.format(val).encode('ascii')
hex_0 = as_hex(0) hex_0 = as_hex(0)
mode = hex_0 if is_trailer else as_hex(0o0100644) mode = hex_0 if is_trailer else as_hex(0o0100644)
@@ -98,19 +102,17 @@ def make_cpio_header(
# DFU format related things # DFU format related things
# Structure of one entry in dfuinfo0.dat # Structure of one entry in dfuinfo0.dat
DFUINFO_STRUCT = b"<I I 64s 16s" DFUINFO_STRUCT = b'<I I 64s 16s'
DFUInfo = namedtuple("DFUInfo", ["address", "flags", "name", "md5"]) DFUInfo = namedtuple('DFUInfo', ['address', 'flags', 'name', 'md5'])
DFUINFO_FILE = "dfuinfo0.dat" DFUINFO_FILE = 'dfuinfo0.dat'
# Structure which gets added at the end of the entire DFU file # Structure which gets added at the end of the entire DFU file
DFUSUFFIX_STRUCT = b"<H H H H 3s B" DFUSUFFIX_STRUCT = b'<H H H H 3s B'
DFUSuffix = namedtuple( DFUSuffix = namedtuple(
"DFUSuffix", ["bcd_device", "pid", "vid", "bcd_dfu", "sig", "len"] 'DFUSuffix', ['bcd_device', 'pid', 'vid', 'bcd_dfu', 'sig', 'len']
) )
ESPRESSIF_VID = 12346 ESPRESSIF_VID = 12346
# TODO: set PID based on the chip type (add a command line argument)
DFUSUFFIX_DEFAULT = DFUSuffix(0xFFFF, 0xFFFF, ESPRESSIF_VID, 0x0100, b"UFD", 16)
# This CRC32 gets added after DFUSUFFIX_STRUCT # This CRC32 gets added after DFUSUFFIX_STRUCT
DFUCRC_STRUCT = b"<I" DFUCRC_STRUCT = b'<I'
def dfu_crc(data, crc=0): # type: (bytes, int) -> int def dfu_crc(data, crc=0): # type: (bytes, int) -> int
@@ -119,39 +121,56 @@ def dfu_crc(data, crc=0): # type: (bytes, int) -> int
return uint32_max - (zlib.crc32(data, crc) & uint32_max) return uint32_max - (zlib.crc32(data, crc) & uint32_max)
def pad_bytes(b, multiple, padding=b"\x00"): # type: (bytes, int, bytes) -> bytes def pad_bytes(b, multiple, padding=b'\x00'): # type: (bytes, int, bytes) -> bytes
""" Pad 'b' to a length divisible by 'multiple' """ """ Pad 'b' to a length divisible by 'multiple' """
padded_len = (len(b) + multiple - 1) // multiple * multiple padded_len = (len(b) + multiple - 1) // multiple * multiple
return b + padding * (padded_len - len(b)) return b + padding * (padded_len - len(b))
class EspDfuWriter(object): class EspDfuWriter(object):
def __init__(self, dest_file): # type: (typing.BinaryIO) -> None def __init__(self, dest_file, pid, part_size): # type: (typing.BinaryIO, int, int) -> None
self.dest = dest_file self.dest = dest_file
self.pid = pid
self.part_size = part_size
self.entries = [] # type: typing.List[bytes] self.entries = [] # type: typing.List[bytes]
self.index = [] # type: typing.List[DFUInfo] self.index = [] # type: typing.List[DFUInfo]
def add_file(self, flash_addr, path): # type: (int, str) -> None def add_file(self, flash_addr, path): # type: (int, str) -> None
""" Add file to be written into flash at given address """ """
with open(path, "rb") as f: Add file to be written into flash at given address
self._add_cpio_flash_entry(os.path.basename(path), flash_addr, f.read())
Files are split up into chunks in order avoid timing-out during erasing large regions. Instead of adding
"app.bin" at flash_addr it will add:
1. app.bin at flash_addr # sizeof(app.bin) == self.part_size
2. app.bin.1 at flash_addr + self.part_size
3. app.bin.2 at flash_addr + 2 * self.part_size
...
"""
f_name = os.path.basename(path)
with open(path, 'rb') as f:
for i, chunk in enumerate(iter(partial(f.read, self.part_size), b'')):
n = f_name if i == 0 else '.'.join([f_name, str(i)])
self._add_cpio_flash_entry(n, flash_addr, chunk)
flash_addr += len(chunk)
def finish(self): # type: () -> None def finish(self): # type: () -> None
""" Write DFU file """ """ Write DFU file """
# Prepare and add dfuinfo0.dat file # Prepare and add dfuinfo0.dat file
dfuinfo = b"".join([struct.pack(DFUINFO_STRUCT, *item) for item in self.index]) dfuinfo = b''.join([struct.pack(DFUINFO_STRUCT, *item) for item in self.index])
self._add_cpio_entry(DFUINFO_FILE, dfuinfo, first=True) self._add_cpio_entry(DFUINFO_FILE, dfuinfo, first=True)
# Add CPIO archive trailer # Add CPIO archive trailer
self._add_cpio_entry(CPIO_TRAILER, b"", trailer=True) self._add_cpio_entry(CPIO_TRAILER, b'', trailer=True)
# Combine all the entries and pad the file # Combine all the entries and pad the file
out_data = b"".join(self.entries) out_data = b''.join(self.entries)
cpio_block_size = 10240 cpio_block_size = 10240
out_data = pad_bytes(out_data, cpio_block_size) out_data = pad_bytes(out_data, cpio_block_size)
# Add DFU suffix and CRC # Add DFU suffix and CRC
out_data += struct.pack(DFUSUFFIX_STRUCT, *DFUSUFFIX_DEFAULT) dfu_suffix = DFUSuffix(0xFFFF, self.pid, ESPRESSIF_VID, 0x0100, b'UFD', 16)
out_data += struct.pack(DFUSUFFIX_STRUCT, *dfu_suffix)
out_data += struct.pack(DFUCRC_STRUCT, dfu_crc(out_data)) out_data += struct.pack(DFUCRC_STRUCT, dfu_crc(out_data))
# Finally write the entire binary # Finally write the entire binary
@@ -166,7 +185,7 @@ class EspDfuWriter(object):
DFUInfo( DFUInfo(
address=flash_addr, address=flash_addr,
flags=0, flags=0,
name=filename.encode("utf-8"), name=filename.encode('utf-8'),
md5=md5.digest(), md5=md5.digest(),
) )
) )
@@ -175,7 +194,7 @@ class EspDfuWriter(object):
def _add_cpio_entry( def _add_cpio_entry(
self, filename, data, first=False, trailer=False self, filename, data, first=False, trailer=False
): # type: (str, bytes, bool, bool) -> None ): # type: (str, bytes, bool, bool) -> None
filename_b = filename.encode("utf-8") + b"\x00" filename_b = filename.encode('utf-8') + b'\x00'
cpio_header = make_cpio_header(len(filename_b), len(data), is_trailer=trailer) cpio_header = make_cpio_header(len(filename_b), len(data), is_trailer=trailer)
entry = pad_bytes( entry = pad_bytes(
struct.pack(CPIO_STRUCT, *cpio_header) + filename_b, 4 struct.pack(CPIO_STRUCT, *cpio_header) + filename_b, 4
@@ -186,30 +205,41 @@ class EspDfuWriter(object):
self.entries.insert(0, entry) self.entries.insert(0, entry)
def action_write(args): def action_write(args): # type: (typing.Mapping[str, typing.Any]) -> None
writer = EspDfuWriter(args['output_file']) writer = EspDfuWriter(args['output_file'], args['pid'], args['part_size'])
for addr, f in args['files']: for addr, f in args['files']:
print('Adding {} at {:#x}'.format(f, addr)) print('Adding {} at {:#x}'.format(f, addr))
writer.add_file(addr, f) writer.add_file(addr, f)
writer.finish() writer.finish()
print('"{}" has been written. You may proceed with DFU flashing.'.format(args['output_file'].name)) print('"{}" has been written. You may proceed with DFU flashing.'.format(args['output_file'].name))
if args['part_size'] % (4 * 1024) != 0:
print('WARNING: Partition size of DFU is not multiple of 4k (4096). You might get unexpected behavior.')
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
# Provision to add "info" command # Provision to add "info" command
subparsers = parser.add_subparsers(dest="command") subparsers = parser.add_subparsers(dest='command')
write_parser = subparsers.add_parser("write") write_parser = subparsers.add_parser('write')
write_parser.add_argument("-o", "--output-file", write_parser.add_argument('-o', '--output-file',
help='Filename for storing the output DFU image', help='Filename for storing the output DFU image',
required=True, required=True,
type=argparse.FileType("wb")) type=argparse.FileType('wb'))
write_parser.add_argument("--json", write_parser.add_argument('--pid',
required=False, # This ESP-IDF release supports one compatible target only
default=2, # ESP32-S2
type=lambda h: int(h, 16),
help='Hexa-decimal product indentificator')
write_parser.add_argument('--json',
help='Optional file for loading "flash_files" dictionary with <address> <file> items') help='Optional file for loading "flash_files" dictionary with <address> <file> items')
write_parser.add_argument("files", write_parser.add_argument('--part-size',
metavar="<address> <file>", help='Add <file> at <address>', default=os.environ.get('ESP_DFU_PART_SIZE', 512 * 1024),
nargs="*") type=lambda x: int(x, 0),
help='Larger files are split-up into smaller partitions of this size')
write_parser.add_argument('files',
metavar='<address> <file>', help='Add <file> at <address>',
nargs='*')
args = parser.parse_args() args = parser.parse_args()
@@ -236,16 +266,18 @@ def main():
files += [(int(addr, 0), files += [(int(addr, 0),
process_json_file(f_name)) for addr, f_name in iteritems(json.load(f)['flash_files'])] process_json_file(f_name)) for addr, f_name in iteritems(json.load(f)['flash_files'])]
files = sorted([(addr, f_name) for addr, f_name in iteritems(dict(files))], files = sorted([(addr, f_name.decode('utf-8') if isinstance(f_name, type(b'')) else f_name) for addr, f_name in iteritems(dict(files))],
key=lambda x: x[0]) # remove possible duplicates and sort based on the address key=lambda x: x[0]) # remove possible duplicates and sort based on the address
cmd_args = {'output_file': args.output_file, cmd_args = {'output_file': args.output_file,
'files': files, 'files': files,
'pid': args.pid,
'part_size': args.part_size,
} }
{'write': action_write {'write': action_write
}[args.command](cmd_args) }[args.command](cmd_args)
if __name__ == "__main__": if __name__ == '__main__':
main() main()

Binary file not shown.

View File

@@ -1,7 +0,0 @@
{
"flash_files" : {
"0x8000" : "2.bin",
"0x1000" : "1.bin",
"0x10000" : "3.bin"
}
}

BIN
tools/test_mkdfu/2/dfu.bin Normal file

Binary file not shown.

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# Copyright 2020 Espressif Systems (Shanghai) CO LTD # Copyright 2020-2021 Espressif Systems (Shanghai) CO LTD
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@@ -16,33 +16,49 @@
# limitations under the License. # limitations under the License.
from __future__ import unicode_literals from __future__ import unicode_literals
import collections
import filecmp import filecmp
import json
import os import os
import pexpect
import shutil import shutil
import sys import sys
import tempfile import tempfile
import time import time
import unittest import unittest
import pexpect
current_dir = os.path.dirname(os.path.realpath(__file__)) current_dir = os.path.dirname(os.path.realpath(__file__))
mkdfu_path = os.path.join(current_dir, '..', 'mkdfu.py') mkdfu_path = os.path.join(current_dir, '..', 'mkdfu.py')
class TestHelloWorldExample(unittest.TestCase): class TestMkDFU(unittest.TestCase):
def common_test(self, add_args): def common_test(self, json_input=None, file_args=[], output_to_compare=None, part_size=None):
with tempfile.NamedTemporaryFile(delete=False) as f: '''
self.addCleanup(os.unlink, f.name) - json_input - input JSON file compatible with mkdfu.py - used when not None
cmd = ' '.join([sys.executable, mkdfu_path, 'write', - file_args - list of (address, path_to_file) tuples
'-o', f.name, - output_to_compare - path to the file containing the expected output - tested when not None
add_args]) - part_size - partition size - used when not None
p = pexpect.spawn(cmd, timeout=10) '''
with tempfile.NamedTemporaryFile(delete=False) as f_out:
self.addCleanup(os.unlink, f_out.name)
args = [mkdfu_path, 'write',
'-o', f_out.name,
'--pid', '2']
if part_size:
args += ['--part-size', str(part_size)]
if json_input:
args += ['--json', json_input]
for addr, f_path in file_args:
args += [str(addr), f_path]
p = pexpect.spawn(sys.executable, args, timeout=10, encoding='utf-8')
self.addCleanup(p.terminate, force=True) self.addCleanup(p.terminate, force=True)
p.expect_exact(['Adding 1/bootloader.bin at 0x1000', for addr, f_path in sorted(file_args, key=lambda e: e[0]):
'Adding 1/partition-table.bin at 0x8000', p.expect_exact('Adding {} at {}'.format(f_path, hex(addr)))
'Adding 1/hello-world.bin at 0x10000',
'"{}" has been written. You may proceed with DFU flashing.'.format(f.name)]) p.expect_exact('"{}" has been written. You may proceed with DFU flashing.'.format(f_out.name))
# Need to wait for the process to end because the output file is closed when mkdfu exits. # Need to wait for the process to end because the output file is closed when mkdfu exits.
# Do non-blocking wait instead of the blocking p.wait(): # Do non-blocking wait instead of the blocking p.wait():
@@ -53,25 +69,34 @@ class TestHelloWorldExample(unittest.TestCase):
else: else:
p.terminate() p.terminate()
self.assertTrue(filecmp.cmp(f.name, os.path.join(current_dir, '1','dfu.bin')), 'Output files are different') if output_to_compare:
self.assertTrue(filecmp.cmp(f_out.name, os.path.join(current_dir, output_to_compare)), 'Output files are different')
class TestHelloWorldExample(TestMkDFU):
'''
tests with images prepared in the "1" subdirectory
'''
def test_with_json(self): def test_with_json(self):
self.common_test(' '.join(['--json', os.path.join(current_dir, '1', 'flasher_args.json')])) with tempfile.NamedTemporaryFile(mode='w', dir=os.path.join(current_dir, '1'), delete=False) as f:
self.addCleanup(os.unlink, f.name)
bins = [('0x1000', '1.bin'), ('0x8000', '2.bin'), ('0x10000', '3.bin')]
json.dump({'flash_files': collections.OrderedDict(bins)}, f)
self.common_test(json_input=f.name, output_to_compare='1/dfu.bin')
def test_without_json(self): def test_without_json(self):
self.common_test(' '.join(['0x1000', os.path.join(current_dir, '1', '1.bin'), self.common_test(file_args=[(0x1000, '1/1.bin'),
'0x8000', os.path.join(current_dir, '1', '2.bin'), (0x8000, '1/2.bin'),
'0x10000', os.path.join(current_dir, '1', '3.bin') (0x10000, '1/3.bin')],
])) output_to_compare='1/dfu.bin')
def test_filenames(self): def test_filenames(self):
temp_dir = tempfile.mkdtemp(prefix='very_long_directory_name' * 8) temp_dir = tempfile.mkdtemp(prefix='very_long_directory_name' * 8)
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True) self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
with tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) as f:
output = f.name
with tempfile.NamedTemporaryFile(prefix='ľščťžýáíéěř\u0420\u043e\u0441\u0441\u0438\u044f', with tempfile.NamedTemporaryFile(prefix='ľščťžýáíéěř\u0420\u043e\u0441\u0441\u0438\u044f',
dir=temp_dir, dir=temp_dir,
delete=False) as f: delete=False) as f:
@@ -79,20 +104,30 @@ class TestHelloWorldExample(unittest.TestCase):
shutil.copyfile(os.path.join(current_dir, '1', '1.bin'), bootloader) shutil.copyfile(os.path.join(current_dir, '1', '1.bin'), bootloader)
cmd = ' '.join([sys.executable, mkdfu_path, 'write', self.common_test(file_args=[(0x1000, bootloader),
'-o', output, (0x8000, os.path.join(current_dir, '1', '2.bin')),
' '.join(['0x1000', bootloader, (0x10000, os.path.join(current_dir, '1', '3.bin'))])
'0x8000', os.path.join(current_dir, '1', '2.bin'),
'0x10000', os.path.join(current_dir, '1', '3.bin')
])
])
p = pexpect.spawn(cmd, timeout=10, encoding='utf-8')
self.addCleanup(p.terminate, force=True)
p.expect_exact(['Adding {} at 0x1000'.format(bootloader),
'Adding 1/2.bin at 0x8000', class TestSplit(TestMkDFU):
'Adding 1/3.bin at 0x10000', '''
'"{}" has been written. You may proceed with DFU flashing.'.format(output)]) tests with images prepared in the "2" subdirectory
"2/dfu.bin" was prepared with:
mkdfu.py write --part-size 5 --pid 2 -o 2/dfu.bin 0 bin
where the content of "bin" is b"\xce" * 10
'''
def test_split(self):
temp_dir = tempfile.mkdtemp(dir=current_dir)
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
with open(os.path.join(temp_dir, 'bin'), 'wb') as f:
self.addCleanup(os.unlink, f.name)
f.write(b'\xce' * 10)
self.common_test(file_args=[(0, f.name)],
part_size=5,
output_to_compare='2/dfu.bin')
if __name__ == '__main__': if __name__ == '__main__':