Merge branch 'bugfix/fix-corrpted-content-full-partition' into 'master'

fatfsparse.py: Limit the file content size to correct number

Closes IDF-5947

See merge request espressif/esp-idf!20048
This commit is contained in:
Martin Gano
2022-09-13 05:46:26 +08:00
5 changed files with 40 additions and 21 deletions

View File

@@ -34,8 +34,9 @@ class Entry:
# one entry can hold 13 characters with size 2 bytes distributed in three regions of the 32 bytes entry # one entry can hold 13 characters with size 2 bytes distributed in three regions of the 32 bytes entry
CHARS_PER_ENTRY: int = LDIR_Name1_SIZE + LDIR_Name2_SIZE + LDIR_Name3_SIZE CHARS_PER_ENTRY: int = LDIR_Name1_SIZE + LDIR_Name2_SIZE + LDIR_Name3_SIZE
# the last 16 bytes record in the LFN entry has first byte masked with the following value
LAST_RECORD_LFN_ENTRY: int = 0x40
SHORT_ENTRY: int = -1 SHORT_ENTRY: int = -1
# this value is used for short-like entry but with accepted lower case # this value is used for short-like entry but with accepted lower case
SHORT_ENTRY_LN: int = 0 SHORT_ENTRY_LN: int = 0
@@ -103,7 +104,7 @@ class Entry:
00002040: 54 48 49 53 49 53 7E 31 54 58 54 20 00 00 00 00 THISIS~1TXT..... 00002040: 54 48 49 53 49 53 7E 31 54 58 54 20 00 00 00 00 THISIS~1TXT.....
00002050: 21 00 00 00 00 00 00 00 21 00 02 00 15 00 00 00 !.......!....... 00002050: 21 00 00 00 00 00 00 00 21 00 02 00 15 00 00 00 !.......!.......
""" """
order |= (0x40 if is_last else 0x00) order |= (Entry.LAST_RECORD_LFN_ENTRY if is_last else 0x00)
long_entry: bytes = (Int8ul.build(order) + # order of the long name entry (possibly masked with 0x40) long_entry: bytes = (Int8ul.build(order) + # order of the long name entry (possibly masked with 0x40)
names[0] + # first 5 characters (10 bytes) of the name part names[0] + # first 5 characters (10 bytes) of the name part
Int8ul.build(Entry.ATTR_LONG_NAME) + # one byte entity type ATTR_LONG_NAME Int8ul.build(Entry.ATTR_LONG_NAME) + # one byte entity type ATTR_LONG_NAME
@@ -124,7 +125,13 @@ class Entry:
return {} return {}
names1 = entry_bytes_[14:26] names1 = entry_bytes_[14:26]
names2 = entry_bytes_[28:32] names2 = entry_bytes_[28:32]
return {'order': order_, 'name1': names0, 'name2': names1, 'name3': names2, 'is_last': bool(order_ & 0x40 == 0x40)} return {
'order': order_,
'name1': names0,
'name2': names1,
'name3': names2,
'is_last': bool(order_ & Entry.LAST_RECORD_LFN_ENTRY == Entry.LAST_RECORD_LFN_ENTRY)
}
@property @property
def entry_bytes(self) -> bytes: def entry_bytes(self) -> bytes:

View File

@@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from typing import List from typing import List, Optional
from .cluster import Cluster from .cluster import Cluster
from .exceptions import NoFreeClusterException from .exceptions import NoFreeClusterException
@@ -39,22 +39,25 @@ class FAT:
is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1 is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1
return is_cluster_last_ return is_cluster_last_
def chain_content(self, cluster_id_: int) -> bytearray: def get_chained_content(self, cluster_id_: int, size: Optional[int] = None) -> bytearray:
bin_im: bytearray = self.boot_sector_state.binary_image """
if self.is_cluster_last(cluster_id_): The purpose of the method is retrieving the content from chain of clusters when the FAT FS partition
is analyzed. The file entry provides the reference to the first cluster, this method
traverses linked list of clusters and append partial results to the content.
"""
binary_image: bytearray = self.boot_sector_state.binary_image
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_) data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
content_: bytearray = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size] content_ = binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size]
return content_
fat_value_: int = self.get_cluster_value(cluster_id_)
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
content_ = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size]
while not self.is_cluster_last(cluster_id_): while not self.is_cluster_last(cluster_id_):
cluster_id_ = fat_value_ cluster_id_ = self.get_cluster_value(cluster_id_)
fat_value_ = self.get_cluster_value(cluster_id_)
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_) data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
content_ += bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size] content_ += binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size]
# the size is None if the object is directory
if size is None:
return content_ return content_
return content_[:size]
def find_free_cluster(self) -> Cluster: def find_free_cluster(self) -> Cluster:
# finds first empty cluster and allocates it # finds first empty cluster and allocates it

View File

@@ -5,8 +5,9 @@ from textwrap import dedent
from typing import Optional from typing import Optional
from .exceptions import InconsistentFATAttributes from .exceptions import InconsistentFATAttributes
from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS, FATDefaults, from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS,
get_fat_sectors_count, get_fatfs_type, get_non_data_sectors_cnt, number_of_clusters) RESERVED_CLUSTERS_COUNT, FATDefaults, get_fat_sectors_count, get_fatfs_type,
get_non_data_sectors_cnt, number_of_clusters)
class FATFSState: class FATFSState:
@@ -133,7 +134,13 @@ class BootSectorState:
@property @property
def clusters(self) -> int: def clusters(self) -> int:
clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster) """
The actual number of clusters is calculated by `number_of_clusters`,
however, the initial two blocks of FAT are reserved (device type and root directory),
despite they don't refer to the data region.
Since that, two clusters are added to use the full potential of the FAT file system partition.
"""
clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster) + RESERVED_CLUSTERS_COUNT
return clusters_cnt_ return clusters_cnt_
@property @property

View File

@@ -12,6 +12,7 @@ from construct import BitsInteger, BitStruct, Int16ul
FAT12_MAX_CLUSTERS: int = 4085 FAT12_MAX_CLUSTERS: int = 4085
FAT16_MAX_CLUSTERS: int = 65525 FAT16_MAX_CLUSTERS: int = 65525
RESERVED_CLUSTERS_COUNT: int = 2
PAD_CHAR: int = 0x20 PAD_CHAR: int = 0x20
FAT12: int = 12 FAT12: int = 12
FAT16: int = 16 FAT16: int = 16

View File

@@ -67,14 +67,15 @@ def traverse_folder_tree(directory_bytes_: bytes,
entry_position_=i, entry_position_=i,
lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext'])) lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext']))
if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE: if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE:
content_ = fat_.chain_content(cluster_id_=Entry.get_cluster_id(obj_)).rstrip(chr(0x00).encode()) content_ = fat_.get_chained_content(cluster_id_=Entry.get_cluster_id(obj_),
size=obj_['DIR_FileSize'])
with open(os.path.join(name, obj_name_), 'wb') as new_file: with open(os.path.join(name, obj_name_), 'wb') as new_file:
new_file.write(content_) new_file.write(content_)
elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY: elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY:
# avoid creating symlinks to itself and parent folder # avoid creating symlinks to itself and parent folder
if obj_name_ in ('.', '..'): if obj_name_ in ('.', '..'):
continue continue
child_directory_bytes_ = fat_.chain_content(cluster_id_=obj_['DIR_FstClusLO']) child_directory_bytes_ = fat_.get_chained_content(cluster_id_=obj_['DIR_FstClusLO'])
traverse_folder_tree(directory_bytes_=child_directory_bytes_, traverse_folder_tree(directory_bytes_=child_directory_bytes_,
name=os.path.join(name, obj_name_), name=os.path.join(name, obj_name_),
state_=state_, state_=state_,