forked from espressif/esp-idf
fatfsparse.py: limit the file content size to correct number
This commit is contained in:
@@ -34,8 +34,9 @@ class Entry:
|
|||||||
# one entry can hold 13 characters with size 2 bytes distributed in three regions of the 32 bytes entry
|
# one entry can hold 13 characters with size 2 bytes distributed in three regions of the 32 bytes entry
|
||||||
CHARS_PER_ENTRY: int = LDIR_Name1_SIZE + LDIR_Name2_SIZE + LDIR_Name3_SIZE
|
CHARS_PER_ENTRY: int = LDIR_Name1_SIZE + LDIR_Name2_SIZE + LDIR_Name3_SIZE
|
||||||
|
|
||||||
|
# the last 16 bytes record in the LFN entry has first byte masked with the following value
|
||||||
|
LAST_RECORD_LFN_ENTRY: int = 0x40
|
||||||
SHORT_ENTRY: int = -1
|
SHORT_ENTRY: int = -1
|
||||||
|
|
||||||
# this value is used for short-like entry but with accepted lower case
|
# this value is used for short-like entry but with accepted lower case
|
||||||
SHORT_ENTRY_LN: int = 0
|
SHORT_ENTRY_LN: int = 0
|
||||||
|
|
||||||
@@ -103,7 +104,7 @@ class Entry:
|
|||||||
00002040: 54 48 49 53 49 53 7E 31 54 58 54 20 00 00 00 00 THISIS~1TXT.....
|
00002040: 54 48 49 53 49 53 7E 31 54 58 54 20 00 00 00 00 THISIS~1TXT.....
|
||||||
00002050: 21 00 00 00 00 00 00 00 21 00 02 00 15 00 00 00 !.......!.......
|
00002050: 21 00 00 00 00 00 00 00 21 00 02 00 15 00 00 00 !.......!.......
|
||||||
"""
|
"""
|
||||||
order |= (0x40 if is_last else 0x00)
|
order |= (Entry.LAST_RECORD_LFN_ENTRY if is_last else 0x00)
|
||||||
long_entry: bytes = (Int8ul.build(order) + # order of the long name entry (possibly masked with 0x40)
|
long_entry: bytes = (Int8ul.build(order) + # order of the long name entry (possibly masked with 0x40)
|
||||||
names[0] + # first 5 characters (10 bytes) of the name part
|
names[0] + # first 5 characters (10 bytes) of the name part
|
||||||
Int8ul.build(Entry.ATTR_LONG_NAME) + # one byte entity type ATTR_LONG_NAME
|
Int8ul.build(Entry.ATTR_LONG_NAME) + # one byte entity type ATTR_LONG_NAME
|
||||||
@@ -124,7 +125,13 @@ class Entry:
|
|||||||
return {}
|
return {}
|
||||||
names1 = entry_bytes_[14:26]
|
names1 = entry_bytes_[14:26]
|
||||||
names2 = entry_bytes_[28:32]
|
names2 = entry_bytes_[28:32]
|
||||||
return {'order': order_, 'name1': names0, 'name2': names1, 'name3': names2, 'is_last': bool(order_ & 0x40 == 0x40)}
|
return {
|
||||||
|
'order': order_,
|
||||||
|
'name1': names0,
|
||||||
|
'name2': names1,
|
||||||
|
'name3': names2,
|
||||||
|
'is_last': bool(order_ & Entry.LAST_RECORD_LFN_ENTRY == Entry.LAST_RECORD_LFN_ENTRY)
|
||||||
|
}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def entry_bytes(self) -> bytes:
|
def entry_bytes(self) -> bytes:
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||||
# SPDX-License-Identifier: Apache-2.0
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
from typing import List
|
from typing import List, Optional
|
||||||
|
|
||||||
from .cluster import Cluster
|
from .cluster import Cluster
|
||||||
from .exceptions import NoFreeClusterException
|
from .exceptions import NoFreeClusterException
|
||||||
@@ -39,22 +39,25 @@ class FAT:
|
|||||||
is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1
|
is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1
|
||||||
return is_cluster_last_
|
return is_cluster_last_
|
||||||
|
|
||||||
def chain_content(self, cluster_id_: int) -> bytearray:
|
def get_chained_content(self, cluster_id_: int, size: Optional[int] = None) -> bytearray:
|
||||||
bin_im: bytearray = self.boot_sector_state.binary_image
|
"""
|
||||||
if self.is_cluster_last(cluster_id_):
|
The purpose of the method is retrieving the content from chain of clusters when the FAT FS partition
|
||||||
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
|
is analyzed. The file entry provides the reference to the first cluster, this method
|
||||||
content_: bytearray = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size]
|
traverses linked list of clusters and append partial results to the content.
|
||||||
return content_
|
"""
|
||||||
fat_value_: int = self.get_cluster_value(cluster_id_)
|
binary_image: bytearray = self.boot_sector_state.binary_image
|
||||||
|
|
||||||
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
|
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
|
||||||
content_ = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size]
|
content_ = binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size]
|
||||||
|
|
||||||
while not self.is_cluster_last(cluster_id_):
|
while not self.is_cluster_last(cluster_id_):
|
||||||
cluster_id_ = fat_value_
|
cluster_id_ = self.get_cluster_value(cluster_id_)
|
||||||
fat_value_ = self.get_cluster_value(cluster_id_)
|
|
||||||
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
|
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
|
||||||
content_ += bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size]
|
content_ += binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size]
|
||||||
return content_
|
# the size is None if the object is directory
|
||||||
|
if size is None:
|
||||||
|
return content_
|
||||||
|
return content_[:size]
|
||||||
|
|
||||||
def find_free_cluster(self) -> Cluster:
|
def find_free_cluster(self) -> Cluster:
|
||||||
# finds first empty cluster and allocates it
|
# finds first empty cluster and allocates it
|
||||||
|
@@ -5,8 +5,9 @@ from textwrap import dedent
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from .exceptions import InconsistentFATAttributes
|
from .exceptions import InconsistentFATAttributes
|
||||||
from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS, FATDefaults,
|
from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS,
|
||||||
get_fat_sectors_count, get_fatfs_type, get_non_data_sectors_cnt, number_of_clusters)
|
RESERVED_CLUSTERS_COUNT, FATDefaults, get_fat_sectors_count, get_fatfs_type,
|
||||||
|
get_non_data_sectors_cnt, number_of_clusters)
|
||||||
|
|
||||||
|
|
||||||
class FATFSState:
|
class FATFSState:
|
||||||
@@ -133,7 +134,13 @@ class BootSectorState:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def clusters(self) -> int:
|
def clusters(self) -> int:
|
||||||
clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster)
|
"""
|
||||||
|
The actual number of clusters is calculated by `number_of_clusters`,
|
||||||
|
however, the initial two blocks of FAT are reserved (device type and root directory),
|
||||||
|
despite they don't refer to the data region.
|
||||||
|
Since that, two clusters are added to use the full potential of the FAT file system partition.
|
||||||
|
"""
|
||||||
|
clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster) + RESERVED_CLUSTERS_COUNT
|
||||||
return clusters_cnt_
|
return clusters_cnt_
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@@ -12,6 +12,7 @@ from construct import BitsInteger, BitStruct, Int16ul
|
|||||||
|
|
||||||
FAT12_MAX_CLUSTERS: int = 4085
|
FAT12_MAX_CLUSTERS: int = 4085
|
||||||
FAT16_MAX_CLUSTERS: int = 65525
|
FAT16_MAX_CLUSTERS: int = 65525
|
||||||
|
RESERVED_CLUSTERS_COUNT: int = 2
|
||||||
PAD_CHAR: int = 0x20
|
PAD_CHAR: int = 0x20
|
||||||
FAT12: int = 12
|
FAT12: int = 12
|
||||||
FAT16: int = 16
|
FAT16: int = 16
|
||||||
|
@@ -67,14 +67,15 @@ def traverse_folder_tree(directory_bytes_: bytes,
|
|||||||
entry_position_=i,
|
entry_position_=i,
|
||||||
lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext']))
|
lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext']))
|
||||||
if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE:
|
if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE:
|
||||||
content_ = fat_.chain_content(cluster_id_=Entry.get_cluster_id(obj_)).rstrip(chr(0x00).encode())
|
content_ = fat_.get_chained_content(cluster_id_=Entry.get_cluster_id(obj_),
|
||||||
|
size=obj_['DIR_FileSize'])
|
||||||
with open(os.path.join(name, obj_name_), 'wb') as new_file:
|
with open(os.path.join(name, obj_name_), 'wb') as new_file:
|
||||||
new_file.write(content_)
|
new_file.write(content_)
|
||||||
elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY:
|
elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY:
|
||||||
# avoid creating symlinks to itself and parent folder
|
# avoid creating symlinks to itself and parent folder
|
||||||
if obj_name_ in ('.', '..'):
|
if obj_name_ in ('.', '..'):
|
||||||
continue
|
continue
|
||||||
child_directory_bytes_ = fat_.chain_content(cluster_id_=obj_['DIR_FstClusLO'])
|
child_directory_bytes_ = fat_.get_chained_content(cluster_id_=obj_['DIR_FstClusLO'])
|
||||||
traverse_folder_tree(directory_bytes_=child_directory_bytes_,
|
traverse_folder_tree(directory_bytes_=child_directory_bytes_,
|
||||||
name=os.path.join(name, obj_name_),
|
name=os.path.join(name, obj_name_),
|
||||||
state_=state_,
|
state_=state_,
|
||||||
|
Reference in New Issue
Block a user