| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  | #!/usr/bin/env python | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  | # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD | 
					
						
							|  |  |  | # SPDX-License-Identifier: Apache-2.0 | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  | import argparse | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  | import os | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  | import construct | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  | from fatfs_utils.boot_sector import BootSector | 
					
						
							|  |  |  | from fatfs_utils.entry import Entry | 
					
						
							|  |  |  | from fatfs_utils.fat import FAT | 
					
						
							|  |  |  | from fatfs_utils.fatfs_state import BootSectorState | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  | from fatfs_utils.utils import FULL_BYTE, LONG_NAMES_ENCODING, PAD_CHAR, FATDefaults, lfn_checksum, read_filesystem | 
					
						
							| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  | from wl_fatfsgen import remove_wl | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def build_file_name(name1: bytes, name2: bytes, name3: bytes) -> str: | 
					
						
							|  |  |  |     full_name_ = name1 + name2 + name3 | 
					
						
							|  |  |  |     # need to strip empty bytes and null-terminating char ('\x00') | 
					
						
							|  |  |  |     return full_name_.rstrip(FULL_BYTE).decode(LONG_NAMES_ENCODING).rstrip('\x00') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_obj_name(obj_: dict, directory_bytes_: bytes, entry_position_: int, lfn_checksum_: int) -> str: | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  |     obj_ext_ = obj_['DIR_Name_ext'].rstrip(chr(PAD_CHAR)) | 
					
						
							|  |  |  |     ext_ = f'.{obj_ext_}' if len(obj_ext_) > 0 else '' | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |     obj_name_: str = obj_['DIR_Name'].rstrip(chr(PAD_CHAR)) + ext_  # short entry name | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-14 08:58:27 +02:00
										 |  |  |     # if LFN was detected, the record is considered as single SFN record only if DIR_NTRes == 0x18 (LDIR_DIR_NTRES) | 
					
						
							|  |  |  |     # if LFN was not detected, the record cannot be part of the LFN, no matter the value of DIR_NTRes | 
					
						
							|  |  |  |     if not args.long_name_support or obj_['DIR_NTRes'] == Entry.LDIR_DIR_NTRES: | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |         return obj_name_ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     full_name = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for pos in range(entry_position_ - 1, -1, -1):  # loop from the current entry back to the start | 
					
						
							|  |  |  |         obj_address_: int = FATDefaults.ENTRY_SIZE * pos | 
					
						
							|  |  |  |         entry_bytes_: bytes = directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE] | 
					
						
							|  |  |  |         struct_ = Entry.parse_entry_long(entry_bytes_, lfn_checksum_) | 
					
						
							|  |  |  |         if len(struct_.items()) > 0: | 
					
						
							|  |  |  |             full_name[struct_['order']] = build_file_name(struct_['name1'], struct_['name2'], struct_['name3']) | 
					
						
							|  |  |  |             if struct_['is_last']: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  |     return ''.join(map(lambda x: x[1], sorted(full_name.items()))) or obj_name_ | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  | def traverse_folder_tree(directory_bytes_: bytes, | 
					
						
							|  |  |  |                          name: str, | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |                          state_: BootSectorState, | 
					
						
							|  |  |  |                          fat_: FAT, | 
					
						
							| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  |                          binary_array_: bytes) -> None: | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |     os.makedirs(name) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert len(directory_bytes_) % FATDefaults.ENTRY_SIZE == 0 | 
					
						
							|  |  |  |     entries_count_: int = len(directory_bytes_) // FATDefaults.ENTRY_SIZE | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for i in range(entries_count_): | 
					
						
							|  |  |  |         obj_address_: int = FATDefaults.ENTRY_SIZE * i | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             obj_: dict = Entry.ENTRY_FORMAT_SHORT_NAME.parse( | 
					
						
							|  |  |  |                 directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE]) | 
					
						
							| 
									
										
										
										
											2022-09-06 16:08:18 +02:00
										 |  |  |         except (construct.core.ConstError, UnicodeDecodeError): | 
					
						
							|  |  |  |             args.long_name_support = True | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if obj_['DIR_Attr'] == 0:  # empty entry | 
					
						
							|  |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj_name_: str = get_obj_name(obj_, | 
					
						
							|  |  |  |                                       directory_bytes_, | 
					
						
							|  |  |  |                                       entry_position_=i, | 
					
						
							|  |  |  |                                       lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext'])) | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  |         if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE: | 
					
						
							| 
									
										
										
										
											2022-09-29 14:54:23 +02:00
										 |  |  |             content_ = b'' | 
					
						
							|  |  |  |             if obj_['DIR_FileSize'] > 0: | 
					
						
							|  |  |  |                 content_ = fat_.get_chained_content(cluster_id_=Entry.get_cluster_id(obj_), | 
					
						
							|  |  |  |                                                     size=obj_['DIR_FileSize']) | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  |             with open(os.path.join(name, obj_name_), 'wb') as new_file: | 
					
						
							|  |  |  |                 new_file.write(content_) | 
					
						
							|  |  |  |         elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY: | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |             # avoid creating symlinks to itself and parent folder | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  |             if obj_name_ in ('.', '..'): | 
					
						
							|  |  |  |                 continue | 
					
						
							| 
									
										
										
										
											2022-09-08 14:28:12 +02:00
										 |  |  |             child_directory_bytes_ = fat_.get_chained_content(cluster_id_=obj_['DIR_FstClusLO']) | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  |             traverse_folder_tree(directory_bytes_=child_directory_bytes_, | 
					
						
							|  |  |  |                                  name=os.path.join(name, obj_name_), | 
					
						
							|  |  |  |                                  state_=state_, | 
					
						
							|  |  |  |                                  fat_=fat_, | 
					
						
							|  |  |  |                                  binary_array_=binary_array_) | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-09 11:32:21 +02:00
										 |  |  | def remove_wear_levelling_if_exists(fs_: bytes) -> bytes: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Detection of the wear levelling layer is performed in two steps: | 
					
						
							|  |  |  |     1) check if the first sector is a valid boot sector | 
					
						
							|  |  |  |     2) check if the size defined in the boot sector is the same as the partition size: | 
					
						
							|  |  |  |         - if it is, there is no wear levelling layer | 
					
						
							|  |  |  |         - otherwise, we need to remove wl for further processing | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         boot_sector__ = BootSector() | 
					
						
							|  |  |  |         boot_sector__.parse_boot_sector(fs_) | 
					
						
							|  |  |  |         if boot_sector__.boot_sector_state.size == len(fs_): | 
					
						
							|  |  |  |             return fs_ | 
					
						
							|  |  |  |     except construct.core.ConstError: | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  |     plain_fs: bytes = remove_wl(fs_) | 
					
						
							|  |  |  |     return plain_fs | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  | if __name__ == '__main__': | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |     desc = 'Tool for parsing fatfs image and extracting directory structure on host.' | 
					
						
							|  |  |  |     argument_parser: argparse.ArgumentParser = argparse.ArgumentParser(description=desc) | 
					
						
							|  |  |  |     argument_parser.add_argument('input_image', | 
					
						
							|  |  |  |                                  help='Path to the image that will be parsed and extracted.') | 
					
						
							|  |  |  |     argument_parser.add_argument('--long-name-support', | 
					
						
							|  |  |  |                                  action='store_true', | 
					
						
							| 
									
										
										
										
											2022-09-06 16:08:18 +02:00
										 |  |  |                                  help=argparse.SUPPRESS) | 
					
						
							| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-09 11:32:21 +02:00
										 |  |  |     # ensures backward compatibility | 
					
						
							| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  |     argument_parser.add_argument('--wear-leveling', | 
					
						
							|  |  |  |                                  action='store_true', | 
					
						
							| 
									
										
										
										
											2022-09-09 11:32:21 +02:00
										 |  |  |                                  help=argparse.SUPPRESS) | 
					
						
							|  |  |  |     argument_parser.add_argument('--wl-layer', | 
					
						
							|  |  |  |                                  choices=['detect', 'enabled', 'disabled'], | 
					
						
							|  |  |  |                                  default=None, | 
					
						
							|  |  |  |                                  help="If detection doesn't work correctly, " | 
					
						
							|  |  |  |                                       'you can force analyzer to or not to assume WL.') | 
					
						
							| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |     args = argument_parser.parse_args() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-09 11:32:21 +02:00
										 |  |  |     # if wear levelling is detected or user explicitly sets the parameter `--wl_layer enabled` | 
					
						
							|  |  |  |     # the partition with wear levelling is transformed to partition without WL for convenient parsing | 
					
						
							|  |  |  |     # in some cases the partitions with and without wear levelling can be 100% equivalent | 
					
						
							|  |  |  |     # and only user can break this tie by explicitly setting | 
					
						
							|  |  |  |     # the parameter --wl-layer to enabled, respectively disabled | 
					
						
							|  |  |  |     if args.wear_leveling and args.wl_layer: | 
					
						
							|  |  |  |         raise NotImplementedError('Argument --wear-leveling cannot be combined with --wl-layer!') | 
					
						
							|  |  |  |     if args.wear_leveling: | 
					
						
							|  |  |  |         args.wl_layer = 'enabled' | 
					
						
							|  |  |  |     args.wl_layer = args.wl_layer or 'detect' | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |     fs = read_filesystem(args.input_image) | 
					
						
							| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # An algorithm for removing wear levelling: | 
					
						
							|  |  |  |     # 1. find an remove dummy sector: | 
					
						
							|  |  |  |     #    a) dummy sector is at the position defined by the number of records in the state sector | 
					
						
							|  |  |  |     #    b) dummy may not be placed in state nor cfg sectors | 
					
						
							|  |  |  |     #    c) first (boot) sector position (boot_s_pos) is calculated using value of move count | 
					
						
							|  |  |  |     #    boot_s_pos = - mc | 
					
						
							|  |  |  |     # 2. remove state sectors (trivial) | 
					
						
							|  |  |  |     # 3. remove cfg sector (trivial) | 
					
						
							|  |  |  |     # 4. valid fs is then old_fs[-mc:] + old_fs[:-mc] | 
					
						
							| 
									
										
										
										
											2022-09-09 11:32:21 +02:00
										 |  |  |     if args.wl_layer == 'enabled': | 
					
						
							| 
									
										
										
										
											2022-06-29 15:37:06 +02:00
										 |  |  |         fs = remove_wl(fs) | 
					
						
							| 
									
										
										
										
											2022-09-09 11:32:21 +02:00
										 |  |  |     elif args.wl_layer != 'disabled': | 
					
						
							|  |  |  |         # wear levelling is removed to enable parsing using common algorithm | 
					
						
							|  |  |  |         fs = remove_wear_levelling_if_exists(fs) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |     boot_sector_ = BootSector() | 
					
						
							|  |  |  |     boot_sector_.parse_boot_sector(fs) | 
					
						
							|  |  |  |     fat = FAT(boot_sector_.boot_sector_state, init_=False) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     boot_dir_start_ = boot_sector_.boot_sector_state.root_directory_start | 
					
						
							|  |  |  |     boot_dir_sectors = boot_sector_.boot_sector_state.root_dir_sectors_cnt | 
					
						
							|  |  |  |     full_ = fs[boot_dir_start_: boot_dir_start_ + boot_dir_sectors * boot_sector_.boot_sector_state.sector_size] | 
					
						
							| 
									
										
										
										
											2022-04-04 15:33:00 +02:00
										 |  |  |     traverse_folder_tree(full_, | 
					
						
							| 
									
										
										
										
											2022-05-23 15:10:46 +02:00
										 |  |  |                          boot_sector_.boot_sector_state.volume_label.rstrip(chr(PAD_CHAR)), | 
					
						
							|  |  |  |                          boot_sector_.boot_sector_state, fat, fs) |