mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-05 13:44:32 +02:00
Merge branch 'feature/add-python-implementation-fatfsgen' into 'master'
Script for generating FATFS on a host Closes IDF-4093 See merge request espressif/esp-idf!15290
This commit is contained in:
@@ -103,6 +103,7 @@
|
|||||||
/components/esptool_py/ @esp-idf-codeowners/tools
|
/components/esptool_py/ @esp-idf-codeowners/tools
|
||||||
/components/expat/ @esp-idf-codeowners/app-utilities
|
/components/expat/ @esp-idf-codeowners/app-utilities
|
||||||
/components/fatfs/ @esp-idf-codeowners/storage
|
/components/fatfs/ @esp-idf-codeowners/storage
|
||||||
|
/components/fatfs/**/*.py @esp-idf-codeowners/tools
|
||||||
/components/freemodbus/ @esp-idf-codeowners/peripherals
|
/components/freemodbus/ @esp-idf-codeowners/peripherals
|
||||||
/components/freertos/ @esp-idf-codeowners/system
|
/components/freertos/ @esp-idf-codeowners/system
|
||||||
/components/hal/ @esp-idf-codeowners/peripherals
|
/components/hal/ @esp-idf-codeowners/peripherals
|
||||||
|
@@ -115,6 +115,12 @@ test_spiffs_on_host:
|
|||||||
- cd ../test_spiffsgen
|
- cd ../test_spiffsgen
|
||||||
- ./test_spiffsgen.py
|
- ./test_spiffsgen.py
|
||||||
|
|
||||||
|
test_fatfsgen_on_host:
|
||||||
|
extends: .host_test_template
|
||||||
|
script:
|
||||||
|
- cd components/fatfs/test_fatfsgen/
|
||||||
|
- ./test_fatfsgen.py
|
||||||
|
|
||||||
test_multi_heap_on_host:
|
test_multi_heap_on_host:
|
||||||
extends: .host_test_template
|
extends: .host_test_template
|
||||||
script:
|
script:
|
||||||
|
216
components/fatfs/fatfsgen.py
Executable file
216
components/fatfs/fatfsgen.py
Executable file
@@ -0,0 +1,216 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import uuid
|
||||||
|
from typing import Any, List, Optional
|
||||||
|
|
||||||
|
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
|
||||||
|
from fatfsgen_utils.fat import FAT
|
||||||
|
from fatfsgen_utils.fatfs_state import FATFSState
|
||||||
|
from fatfsgen_utils.fs_object import Directory
|
||||||
|
from fatfsgen_utils.utils import pad_string
|
||||||
|
|
||||||
|
|
||||||
|
class FATFS:
|
||||||
|
"""
|
||||||
|
The class FATFS provides API for generating FAT file system.
|
||||||
|
It contains reference to the FAT table and to the root directory.
|
||||||
|
"""
|
||||||
|
MAX_VOL_LAB_SIZE = 11
|
||||||
|
MAX_OEM_NAME_SIZE = 8
|
||||||
|
MAX_FS_TYPE_SIZE = 8
|
||||||
|
BOOT_HEADER_SIZE = 512
|
||||||
|
|
||||||
|
BOOT_SECTOR_HEADER = Struct(
|
||||||
|
'BS_jmpBoot' / Const(b'\xeb\xfe\x90'),
|
||||||
|
'BS_OEMName' / PaddedString(MAX_OEM_NAME_SIZE, 'utf-8'),
|
||||||
|
'BPB_BytsPerSec' / Int16ul,
|
||||||
|
'BPB_SecPerClus' / Int8ul,
|
||||||
|
'BPB_RsvdSecCnt' / Int16ul,
|
||||||
|
'BPB_NumFATs' / Int8ul,
|
||||||
|
'BPB_RootEntCnt' / Int16ul,
|
||||||
|
'BPB_TotSec16' / Int16ul,
|
||||||
|
'BPB_Media' / Int8ul,
|
||||||
|
'BPB_FATSz16' / Int16ul,
|
||||||
|
'BPB_SecPerTrk' / Int16ul,
|
||||||
|
'BPB_NumHeads' / Int16ul,
|
||||||
|
'BPB_HiddSec' / Int32ul,
|
||||||
|
'BPB_TotSec32' / Int32ul,
|
||||||
|
'BS_DrvNum' / Const(b'\x80'),
|
||||||
|
'BS_Reserved1' / Const(b'\x00'),
|
||||||
|
'BS_BootSig' / Const(b'\x29'),
|
||||||
|
'BS_VolID' / Int32ul,
|
||||||
|
'BS_VolLab' / PaddedString(MAX_VOL_LAB_SIZE, 'utf-8'),
|
||||||
|
'BS_FilSysType' / PaddedString(MAX_FS_TYPE_SIZE, 'utf-8'),
|
||||||
|
'BS_EMPTY' / Const(448 * b'\x00'),
|
||||||
|
'Signature_word' / Const(b'\x55\xAA')
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
binary_image_path: Optional[str] = None,
|
||||||
|
size: int = 1024 * 1024,
|
||||||
|
reserved_sectors_cnt: int = 1,
|
||||||
|
fat_tables_cnt: int = 1,
|
||||||
|
sectors_per_cluster: int = 1,
|
||||||
|
sector_size: int = 0x1000,
|
||||||
|
sectors_per_fat: int = 1,
|
||||||
|
root_dir_sectors_cnt: int = 4,
|
||||||
|
hidden_sectors: int = 0,
|
||||||
|
long_names_enabled: bool = False,
|
||||||
|
entry_size: int = 32,
|
||||||
|
wl_sectors: int = 0,
|
||||||
|
num_heads: int = 0xff,
|
||||||
|
oem_name: str = 'MSDOS5.0',
|
||||||
|
sec_per_track: int = 0x3f,
|
||||||
|
volume_label: str = 'Espressif',
|
||||||
|
file_sys_type: str = 'FAT',
|
||||||
|
media_type: int = 0xf8) -> None:
|
||||||
|
|
||||||
|
self.state = FATFSState(entry_size=entry_size,
|
||||||
|
sector_size=sector_size,
|
||||||
|
reserved_sectors_cnt=reserved_sectors_cnt,
|
||||||
|
root_dir_sectors_cnt=root_dir_sectors_cnt,
|
||||||
|
size=size,
|
||||||
|
file_sys_type=file_sys_type,
|
||||||
|
num_heads=num_heads,
|
||||||
|
fat_tables_cnt=fat_tables_cnt,
|
||||||
|
sectors_per_fat=sectors_per_fat,
|
||||||
|
sectors_per_cluster=sectors_per_cluster,
|
||||||
|
media_type=media_type,
|
||||||
|
hidden_sectors=hidden_sectors,
|
||||||
|
sec_per_track=sec_per_track,
|
||||||
|
long_names_enabled=long_names_enabled,
|
||||||
|
volume_label=volume_label,
|
||||||
|
wl_sectors=wl_sectors,
|
||||||
|
oem_name=oem_name)
|
||||||
|
binary_image = bytearray(
|
||||||
|
self.read_filesystem(binary_image_path) if binary_image_path else self.create_empty_fatfs())
|
||||||
|
self.state.binary_image = binary_image
|
||||||
|
|
||||||
|
self.fat = FAT(fatfs_state=self.state,
|
||||||
|
reserved_sectors_cnt=self.state.reserved_sectors_cnt)
|
||||||
|
|
||||||
|
self.root_directory = Directory(name='A', # the name is not important
|
||||||
|
size=self.state.root_dir_sectors_cnt * self.state.sector_size,
|
||||||
|
fat=self.fat,
|
||||||
|
cluster=self.fat.clusters[1],
|
||||||
|
fatfs_state=self.state)
|
||||||
|
self.root_directory.init_directory()
|
||||||
|
|
||||||
|
def create_file(self, name: str, extension: str = '', path_from_root: Optional[List[str]] = None) -> None:
|
||||||
|
# when path_from_root is None the dir is root
|
||||||
|
self.root_directory.new_file(name=name, extension=extension, path_from_root=path_from_root)
|
||||||
|
|
||||||
|
def create_directory(self, name: str, path_from_root: Optional[List[str]] = None) -> None:
|
||||||
|
# when path_from_root is None the dir is root
|
||||||
|
parent_dir = self.root_directory
|
||||||
|
if path_from_root:
|
||||||
|
parent_dir = self.root_directory.recursive_search(path_from_root, self.root_directory)
|
||||||
|
self.root_directory.new_directory(name=name, parent=parent_dir, path_from_root=path_from_root)
|
||||||
|
|
||||||
|
def write_content(self, path_from_root: List[str], content: str) -> None:
|
||||||
|
"""
|
||||||
|
fat fs invokes root directory to recursively find the required file and writes the content
|
||||||
|
"""
|
||||||
|
self.root_directory.write_to_file(path_from_root, content)
|
||||||
|
|
||||||
|
def create_empty_fatfs(self) -> Any:
|
||||||
|
sectors_count = self.state.size // self.state.sector_size
|
||||||
|
volume_uuid = uuid.uuid4().int & 0xFFFFFFFF
|
||||||
|
return (
|
||||||
|
FATFS.BOOT_SECTOR_HEADER.build(
|
||||||
|
dict(BS_OEMName=pad_string(self.state.oem_name, size=FATFS.MAX_OEM_NAME_SIZE),
|
||||||
|
BPB_BytsPerSec=self.state.sectors_per_cluster * self.state.sector_size,
|
||||||
|
BPB_SecPerClus=self.state.sectors_per_cluster,
|
||||||
|
BPB_RsvdSecCnt=self.state.reserved_sectors_cnt,
|
||||||
|
BPB_NumFATs=self.state.fat_tables_cnt,
|
||||||
|
BPB_RootEntCnt=self.state.entries_root_count,
|
||||||
|
BPB_TotSec16=0x00 if self.state.fatfs_type == FATFSState.FAT32 else sectors_count,
|
||||||
|
BPB_Media=self.state.media_type,
|
||||||
|
BPB_FATSz16=self.state.sectors_per_fat_cnt,
|
||||||
|
BPB_SecPerTrk=self.state.sec_per_track,
|
||||||
|
BPB_NumHeads=self.state.num_heads,
|
||||||
|
BPB_HiddSec=self.state.hidden_sectors,
|
||||||
|
BPB_TotSec32=sectors_count if self.state.fatfs_type == FATFSState.FAT32 else 0x00,
|
||||||
|
BS_VolID=volume_uuid,
|
||||||
|
BS_VolLab=pad_string(self.state.volume_label, size=FATFS.MAX_VOL_LAB_SIZE),
|
||||||
|
BS_FilSysType=pad_string(self.state.file_sys_type, size=FATFS.MAX_FS_TYPE_SIZE)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
+ (self.state.sector_size - FATFS.BOOT_HEADER_SIZE) * b'\x00'
|
||||||
|
+ self.state.sectors_per_fat_cnt * self.state.fat_tables_cnt * self.state.sector_size * b'\x00'
|
||||||
|
+ self.state.root_dir_sectors_cnt * self.state.sector_size * b'\x00'
|
||||||
|
+ self.state.data_sectors * self.state.sector_size * b'\xff'
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def read_filesystem(path: str) -> bytearray:
|
||||||
|
with open(path, 'rb') as fs_file:
|
||||||
|
return bytearray(fs_file.read())
|
||||||
|
|
||||||
|
def write_filesystem(self, output_path: str) -> None:
|
||||||
|
with open(output_path, 'wb') as output:
|
||||||
|
output.write(bytearray(self.state.binary_image))
|
||||||
|
|
||||||
|
def _generate_partition_from_folder(self,
|
||||||
|
folder_relative_path: str,
|
||||||
|
folder_path: str = '',
|
||||||
|
is_dir: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Given path to folder and folder name recursively encodes folder into binary image.
|
||||||
|
Used by method generate
|
||||||
|
"""
|
||||||
|
real_path = os.path.join(folder_path, folder_relative_path)
|
||||||
|
smaller_path = folder_relative_path
|
||||||
|
|
||||||
|
folder_relative_path = folder_relative_path.upper()
|
||||||
|
|
||||||
|
normal_path = os.path.normpath(folder_relative_path)
|
||||||
|
split_path = normal_path.split(os.sep)
|
||||||
|
if os.path.isfile(real_path):
|
||||||
|
with open(real_path) as file:
|
||||||
|
content = file.read()
|
||||||
|
file_name, extension = os.path.splitext(split_path[-1])
|
||||||
|
extension = extension[1:] # remove the dot from the extension
|
||||||
|
self.create_file(name=file_name, extension=extension, path_from_root=split_path[1:-1] or None)
|
||||||
|
self.write_content(split_path[1:], content)
|
||||||
|
elif os.path.isdir(real_path):
|
||||||
|
if not is_dir:
|
||||||
|
self.create_directory(split_path[-1], split_path[1:-1])
|
||||||
|
|
||||||
|
# sorting files for better testability
|
||||||
|
dir_content = list(sorted(os.listdir(real_path)))
|
||||||
|
for path in dir_content:
|
||||||
|
self._generate_partition_from_folder(os.path.join(smaller_path, path), folder_path=folder_path)
|
||||||
|
|
||||||
|
def generate(self, input_directory: str) -> None:
|
||||||
|
"""
|
||||||
|
Normalize path to folder and recursively encode folder to binary image
|
||||||
|
"""
|
||||||
|
path_to_folder, folder_name = os.path.split(input_directory)
|
||||||
|
self._generate_partition_from_folder(folder_name, folder_path=path_to_folder, is_dir=True)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(description='Create a FAT filesystem and populate it with directory content')
|
||||||
|
parser.add_argument('input_directory',
|
||||||
|
help='Path to the directory that will be encoded into fatfs image')
|
||||||
|
parser.add_argument('--output_file',
|
||||||
|
default='fatfs_image.img',
|
||||||
|
help='Filename of the generated fatfs image')
|
||||||
|
parser.add_argument('--partition_size',
|
||||||
|
default=1024 * 1024,
|
||||||
|
help='Size of the partition in bytes')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
input_dir = args.input_directory
|
||||||
|
try:
|
||||||
|
partition_size = eval(args.partition_size)
|
||||||
|
except ValueError:
|
||||||
|
partition_size = args.partition_size
|
||||||
|
fatfs = FATFS(size=partition_size)
|
||||||
|
fatfs.generate(input_dir)
|
||||||
|
fatfs.write_filesystem(args.output_file)
|
0
components/fatfs/fatfsgen_utils/__init__.py
Normal file
0
components/fatfs/fatfsgen_utils/__init__.py
Normal file
112
components/fatfs/fatfsgen_utils/cluster.py
Normal file
112
components/fatfs/fatfsgen_utils/cluster.py
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from .fatfs_state import FATFSState
|
||||||
|
from .utils import build_byte, clean_first_half_byte, clean_second_half_byte, split_by_half_byte_12_bit_little_endian
|
||||||
|
|
||||||
|
|
||||||
|
class Cluster:
|
||||||
|
"""
|
||||||
|
class Cluster handles values in FAT table and allocates sectors in data region.
|
||||||
|
"""
|
||||||
|
RESERVED_BLOCK_ID = 0
|
||||||
|
ROOT_BLOCK_ID = 1
|
||||||
|
ALLOCATED_BLOCK_VALUE = 0xFFF # for fat 12
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
cluster_id: int,
|
||||||
|
fatfs_state: FATFSState,
|
||||||
|
is_empty: bool = True) -> None:
|
||||||
|
|
||||||
|
self.id = cluster_id
|
||||||
|
self.fatfs_state = fatfs_state
|
||||||
|
|
||||||
|
self._next_cluster = None # type: Optional[Cluster]
|
||||||
|
if self.id == Cluster.RESERVED_BLOCK_ID:
|
||||||
|
self.is_empty = False
|
||||||
|
self.set_in_fat(0xff8)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.cluster_data_address = self._compute_cluster_data_address()
|
||||||
|
self.is_empty = is_empty
|
||||||
|
|
||||||
|
assert self.cluster_data_address or self.is_empty
|
||||||
|
|
||||||
|
@property
|
||||||
|
def next_cluster(self): # type: () -> Optional[Cluster]
|
||||||
|
return self._next_cluster
|
||||||
|
|
||||||
|
@next_cluster.setter
|
||||||
|
def next_cluster(self, value): # type: (Optional[Cluster]) -> None
|
||||||
|
self._next_cluster = value
|
||||||
|
|
||||||
|
def _cluster_id_to_logical_position_in_bits(self, _id: int) -> int:
|
||||||
|
# computes address of the cluster in fat table
|
||||||
|
return self.fatfs_state.fatfs_type * _id # type: ignore
|
||||||
|
|
||||||
|
def _compute_cluster_data_address(self) -> int:
|
||||||
|
if self.id == Cluster.ROOT_BLOCK_ID:
|
||||||
|
return self.fatfs_state.root_directory_start # type: ignore
|
||||||
|
# the first data cluster id is 2 (we have to subtract reserved cluster and cluster for root)
|
||||||
|
return self.fatfs_state.sector_size * (self.id - 2) + self.fatfs_state.data_region_start # type: ignore
|
||||||
|
|
||||||
|
def _set_first_half_byte(self, address: int, value: int) -> None:
|
||||||
|
clean_second_half_byte(self.fatfs_state.binary_image, address)
|
||||||
|
self.fatfs_state.binary_image[address] |= value << 4
|
||||||
|
|
||||||
|
def _set_second_half_byte(self, address: int, value: int) -> None:
|
||||||
|
clean_first_half_byte(self.fatfs_state.binary_image, address)
|
||||||
|
self.fatfs_state.binary_image[address] |= value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fat_cluster_address(self) -> int:
|
||||||
|
"""Determines how many bits precede the first bit of the cluster in FAT"""
|
||||||
|
return self._cluster_id_to_logical_position_in_bits(self.id)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def real_cluster_address(self) -> int:
|
||||||
|
return self.fatfs_state.start_address + self.fat_cluster_address // 8 # type: ignore
|
||||||
|
|
||||||
|
def set_in_fat(self, value: int) -> None:
|
||||||
|
"""
|
||||||
|
Sets cluster in FAT to certain value.
|
||||||
|
Firstly, we split the target value into 3 half bytes (max value is 0xfff).
|
||||||
|
Then we could encounter two situations:
|
||||||
|
1. if the cluster index (indexed from zero) is even, we set the full byte computed by
|
||||||
|
self.cluster_id_to_logical_position_in_bits and the second half of the consequent byte.
|
||||||
|
Order of half bytes is 2, 1, 3.
|
||||||
|
|
||||||
|
2. if the cluster index is odd, we set the first half of the computed byte and the full consequent byte.
|
||||||
|
Order of half bytes is 1, 3, 2.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# value must fit into number of bits of the fat (12, 16 or 32)
|
||||||
|
assert value <= (1 << self.fatfs_state.fatfs_type) - 1
|
||||||
|
half_bytes = split_by_half_byte_12_bit_little_endian(value)
|
||||||
|
|
||||||
|
# hardcoded for fat 12
|
||||||
|
# IDF-4046 will extend it for fat 16
|
||||||
|
if self.fat_cluster_address % 8 == 0:
|
||||||
|
self.fatfs_state.binary_image[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0])
|
||||||
|
self._set_second_half_byte(self.real_cluster_address + 1, half_bytes[2])
|
||||||
|
elif self.fat_cluster_address % 8 != 0:
|
||||||
|
self._set_first_half_byte(self.real_cluster_address, half_bytes[0])
|
||||||
|
self.fatfs_state.binary_image[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_root(self) -> bool:
|
||||||
|
return self.id == Cluster.ROOT_BLOCK_ID
|
||||||
|
|
||||||
|
def allocate_cluster(self) -> None:
|
||||||
|
"""
|
||||||
|
This method sets bits in FAT table to `allocated` and clean the corresponding sector(s)
|
||||||
|
"""
|
||||||
|
self.is_empty = False
|
||||||
|
self.set_in_fat(Cluster.ALLOCATED_BLOCK_VALUE)
|
||||||
|
|
||||||
|
cluster_start = self.cluster_data_address
|
||||||
|
dir_size = self.fatfs_state.get_dir_size(self.is_root)
|
||||||
|
cluster_end = cluster_start + dir_size
|
||||||
|
self.fatfs_state.binary_image[cluster_start:cluster_end] = dir_size * b'\x00'
|
127
components/fatfs/fatfsgen_utils/entry.py
Normal file
127
components/fatfs/fatfsgen_utils/entry.py
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
|
||||||
|
|
||||||
|
from .exceptions import LowerCaseException, TooLongNameException
|
||||||
|
from .fatfs_state import FATFSState
|
||||||
|
from .utils import is_valid_fatfs_name, pad_string
|
||||||
|
|
||||||
|
|
||||||
|
class Entry:
|
||||||
|
"""
|
||||||
|
The class Entry represents entry of the directory.
|
||||||
|
"""
|
||||||
|
ATTR_READ_ONLY = 0x01
|
||||||
|
ATTR_HIDDEN = 0x02
|
||||||
|
ATTR_SYSTEM = 0x04
|
||||||
|
ATTR_VOLUME_ID = 0x08
|
||||||
|
ATTR_DIRECTORY = 0x10
|
||||||
|
ATTR_ARCHIVE = 0x20
|
||||||
|
MAX_NAME_SIZE_S = 8
|
||||||
|
MAX_EXT_SIZE_S = 3
|
||||||
|
|
||||||
|
ENTRY_FORMAT_SHORT_NAME = Struct(
|
||||||
|
'DIR_Name' / PaddedString(MAX_NAME_SIZE_S, 'utf-8'),
|
||||||
|
'DIR_Name_ext' / PaddedString(MAX_EXT_SIZE_S, 'utf-8'),
|
||||||
|
'DIR_Attr' / Int8ul,
|
||||||
|
'DIR_NTRes' / Const(b'\x00'),
|
||||||
|
'DIR_CrtTimeTenth' / Const(b'\x00'),
|
||||||
|
'DIR_CrtTime' / Const(b'\x01\x00'),
|
||||||
|
'DIR_CrtDate' / Const(b'\x21\x00'),
|
||||||
|
'DIR_LstAccDate' / Const(b'\x00\x00'),
|
||||||
|
'DIR_FstClusHI' / Const(b'\x00\x00'),
|
||||||
|
'DIR_WrtTime' / Const(b'\x01\x00'),
|
||||||
|
'DIR_WrtDate' / Const(b'\x01\x00'),
|
||||||
|
'DIR_FstClusLO' / Int16ul,
|
||||||
|
'DIR_FileSize' / Int32ul,
|
||||||
|
)
|
||||||
|
|
||||||
|
# IDF-4044
|
||||||
|
ENTRY_FORMAT_LONG_NAME = Struct()
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
entry_id: int,
|
||||||
|
parent_dir_entries_address: int,
|
||||||
|
fatfs_state: FATFSState) -> None:
|
||||||
|
self.fatfs_state = fatfs_state
|
||||||
|
self.id = entry_id
|
||||||
|
self.entry_address = parent_dir_entries_address + self.id * self.fatfs_state.entry_size
|
||||||
|
self._is_alias = False
|
||||||
|
self._is_empty = True
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_empty(self) -> bool:
|
||||||
|
return self._is_empty
|
||||||
|
|
||||||
|
def _parse_entry(self, entry_bytearray: Optional[bytearray]) -> dict:
|
||||||
|
if self.fatfs_state.long_names_enabled:
|
||||||
|
return Entry.ENTRY_FORMAT_LONG_NAME.parse(entry_bytearray) # type: ignore
|
||||||
|
return Entry.ENTRY_FORMAT_SHORT_NAME.parse(entry_bytearray) # type: ignore
|
||||||
|
|
||||||
|
def _build_entry(self, **kwargs) -> Any: # type: ignore
|
||||||
|
if self.fatfs_state.long_names_enabled:
|
||||||
|
return Entry.ENTRY_FORMAT_LONG_NAME.build(dict(**kwargs))
|
||||||
|
return Entry.ENTRY_FORMAT_SHORT_NAME.build(dict(**kwargs))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def entry_bytes(self) -> Any:
|
||||||
|
return self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size]
|
||||||
|
|
||||||
|
@entry_bytes.setter
|
||||||
|
def entry_bytes(self, value: int) -> None:
|
||||||
|
self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size] = value
|
||||||
|
|
||||||
|
def _clean_entry(self) -> None:
|
||||||
|
self.entry_bytes = self.fatfs_state.entry_size * b'\x00'
|
||||||
|
|
||||||
|
def allocate_entry(self,
|
||||||
|
first_cluster_id: int,
|
||||||
|
entity_name: str,
|
||||||
|
entity_type: int,
|
||||||
|
entity_extension: str = '',
|
||||||
|
size: int = 0) -> None:
|
||||||
|
"""
|
||||||
|
:param first_cluster_id: id of the first data cluster for given entry
|
||||||
|
:param entity_name: name recorded in the entry
|
||||||
|
:param entity_extension: extension recorded in the entry
|
||||||
|
:param size: size of the content of the file
|
||||||
|
:param entity_type: type of the entity (file [0x20] or directory [0x10])
|
||||||
|
:returns: None
|
||||||
|
|
||||||
|
:raises LowerCaseException: In case when long_names_enabled is set to False and filename exceeds 8 chars
|
||||||
|
for name or 3 chars for extension the exception is raised
|
||||||
|
"""
|
||||||
|
if not ((is_valid_fatfs_name(entity_name) and
|
||||||
|
is_valid_fatfs_name(entity_extension)) or
|
||||||
|
self.fatfs_state.long_names_enabled):
|
||||||
|
raise LowerCaseException('Lower case is not supported because long name support is not enabled!')
|
||||||
|
|
||||||
|
# clean entry before allocation
|
||||||
|
self._clean_entry()
|
||||||
|
self._is_empty = False
|
||||||
|
object_name = entity_name.upper()
|
||||||
|
object_extension = entity_extension.upper()
|
||||||
|
|
||||||
|
# implementation of long names support will be part of IDF-4044
|
||||||
|
exceeds_short_name = len(object_name) > Entry.MAX_NAME_SIZE_S or len(object_extension) > Entry.MAX_EXT_SIZE_S
|
||||||
|
if not self.fatfs_state.long_names_enabled and exceeds_short_name:
|
||||||
|
raise TooLongNameException(
|
||||||
|
'Maximal length of the object name is 8 characters and 3 characters for extension!')
|
||||||
|
|
||||||
|
start_address = self.entry_address
|
||||||
|
end_address = start_address + self.fatfs_state.entry_size
|
||||||
|
self.fatfs_state.binary_image[start_address: end_address] = self._build_entry(
|
||||||
|
DIR_Name=pad_string(object_name, size=Entry.MAX_NAME_SIZE_S),
|
||||||
|
DIR_Name_ext=pad_string(object_extension, size=Entry.MAX_EXT_SIZE_S),
|
||||||
|
DIR_Attr=entity_type,
|
||||||
|
DIR_FstClusLO=first_cluster_id,
|
||||||
|
DIR_FileSize=size
|
||||||
|
)
|
||||||
|
|
||||||
|
def update_content_size(self, content_size: int) -> None:
|
||||||
|
parsed_entry = self._parse_entry(self.entry_bytes)
|
||||||
|
parsed_entry.DIR_FileSize = content_size # type: ignore
|
||||||
|
self.entry_bytes = Entry.ENTRY_FORMAT_SHORT_NAME.build(parsed_entry)
|
33
components/fatfs/fatfsgen_utils/exceptions.py
Normal file
33
components/fatfs/fatfsgen_utils/exceptions.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
class WriteDirectoryException(Exception):
|
||||||
|
"""
|
||||||
|
Exception is raised when the user tries to write the content into the directory instead of file
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NoFreeClusterException(Exception):
|
||||||
|
"""
|
||||||
|
Exception is raised when the user tries allocate cluster but no free one is available
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class LowerCaseException(Exception):
|
||||||
|
"""
|
||||||
|
Exception is raised when the user tries to write file or directory with lower case
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TooLongNameException(Exception):
|
||||||
|
"""
|
||||||
|
Exception is raised when long name support is not enabled and user tries to write file longer then allowed
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FatalError(Exception):
|
||||||
|
pass
|
42
components/fatfs/fatfsgen_utils/fat.py
Normal file
42
components/fatfs/fatfsgen_utils/fat.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
from .cluster import Cluster
|
||||||
|
from .exceptions import NoFreeClusterException
|
||||||
|
from .fatfs_state import FATFSState
|
||||||
|
|
||||||
|
|
||||||
|
class FAT:
|
||||||
|
"""
|
||||||
|
The FAT represents the FAT region in file system. It is responsible for storing clusters
|
||||||
|
and chaining them in case we need to extend file or directory to more clusters.
|
||||||
|
"""
|
||||||
|
def __init__(self,
|
||||||
|
fatfs_state: FATFSState,
|
||||||
|
reserved_sectors_cnt: int) -> None:
|
||||||
|
self.fatfs_state = fatfs_state
|
||||||
|
self.reserved_sectors_cnt = reserved_sectors_cnt
|
||||||
|
|
||||||
|
self.clusters = [Cluster(cluster_id=i, fatfs_state=self.fatfs_state) for i in
|
||||||
|
range(1, self.fatfs_state.max_clusters)]
|
||||||
|
|
||||||
|
# update root directory record
|
||||||
|
self.clusters[0].allocate_cluster()
|
||||||
|
# add first reserved cluster
|
||||||
|
self.clusters = [Cluster(cluster_id=Cluster.RESERVED_BLOCK_ID, fatfs_state=self.fatfs_state)] + self.clusters
|
||||||
|
|
||||||
|
def find_free_cluster(self) -> Cluster:
|
||||||
|
# finds first empty cluster and allocates it
|
||||||
|
for cluster in self.clusters:
|
||||||
|
if cluster.is_empty:
|
||||||
|
cluster.allocate_cluster()
|
||||||
|
return cluster
|
||||||
|
raise NoFreeClusterException('No free cluster available!')
|
||||||
|
|
||||||
|
def allocate_chain(self, first_cluster: Cluster, size: int) -> None:
|
||||||
|
current = first_cluster
|
||||||
|
for _ in range(size - 1):
|
||||||
|
free_cluster = self.find_free_cluster()
|
||||||
|
current.next_cluster = free_cluster
|
||||||
|
current.set_in_fat(free_cluster.id)
|
||||||
|
current = free_cluster
|
98
components/fatfs/fatfsgen_utils/fatfs_state.py
Normal file
98
components/fatfs/fatfsgen_utils/fatfs_state.py
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
|
||||||
|
class FATFSState:
|
||||||
|
"""
|
||||||
|
The class represents the state and the configuration of the FATFS.
|
||||||
|
"""
|
||||||
|
FAT12_MAX_CLUSTERS = 4085
|
||||||
|
FAT16_MAX_CLUSTERS = 65525
|
||||||
|
FAT12 = 12
|
||||||
|
FAT16 = 16
|
||||||
|
FAT32 = 32
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
entry_size: int,
|
||||||
|
sector_size: int,
|
||||||
|
reserved_sectors_cnt: int,
|
||||||
|
root_dir_sectors_cnt: int,
|
||||||
|
size: int,
|
||||||
|
media_type: int,
|
||||||
|
sectors_per_fat: int,
|
||||||
|
sectors_per_cluster: int,
|
||||||
|
volume_label: str,
|
||||||
|
oem_name: str,
|
||||||
|
fat_tables_cnt: int,
|
||||||
|
sec_per_track: int,
|
||||||
|
num_heads: int,
|
||||||
|
hidden_sectors: int,
|
||||||
|
file_sys_type: str,
|
||||||
|
wl_sectors: int,
|
||||||
|
long_names_enabled: bool = False):
|
||||||
|
self._binary_image: bytearray = bytearray(b'')
|
||||||
|
self.fat_tables_cnt: int = fat_tables_cnt
|
||||||
|
self.oem_name: str = oem_name
|
||||||
|
self.wl_sectors_cnt: int = wl_sectors
|
||||||
|
self.file_sys_type: str = file_sys_type
|
||||||
|
self.sec_per_track: int = sec_per_track
|
||||||
|
self.hidden_sectors: int = hidden_sectors
|
||||||
|
self.volume_label: str = volume_label
|
||||||
|
self.media_type: int = media_type
|
||||||
|
self.long_names_enabled: bool = long_names_enabled
|
||||||
|
self.entry_size: int = entry_size
|
||||||
|
self.num_heads: int = num_heads
|
||||||
|
self.sector_size: int = sector_size
|
||||||
|
self.root_dir_sectors_cnt: int = root_dir_sectors_cnt
|
||||||
|
self.reserved_sectors_cnt: int = reserved_sectors_cnt
|
||||||
|
self.size: int = size
|
||||||
|
self.sectors_per_fat_cnt: int = sectors_per_fat
|
||||||
|
self.sectors_per_cluster: int = sectors_per_cluster
|
||||||
|
|
||||||
|
@property
|
||||||
|
def binary_image(self) -> bytearray:
|
||||||
|
return self._binary_image
|
||||||
|
|
||||||
|
@binary_image.setter
|
||||||
|
def binary_image(self, value: bytearray) -> None:
|
||||||
|
self._binary_image = value
|
||||||
|
|
||||||
|
def get_dir_size(self, is_root: bool) -> int:
|
||||||
|
return self.root_dir_sectors_cnt * self.sector_size if is_root else self.sector_size
|
||||||
|
|
||||||
|
@property
|
||||||
|
def start_address(self) -> int:
|
||||||
|
return self.sector_size * self.reserved_sectors_cnt
|
||||||
|
|
||||||
|
@property
|
||||||
|
def data_sectors(self) -> int:
|
||||||
|
return (self.size // self.sector_size) - self.non_data_sectors
|
||||||
|
|
||||||
|
@property
|
||||||
|
def non_data_sectors(self) -> int:
|
||||||
|
return self.reserved_sectors_cnt + self.sectors_per_fat_cnt + self.root_dir_sectors_cnt + self.wl_sectors_cnt
|
||||||
|
|
||||||
|
@property
|
||||||
|
def data_region_start(self) -> int:
|
||||||
|
return self.non_data_sectors * self.sector_size
|
||||||
|
|
||||||
|
@property
|
||||||
|
def max_clusters(self) -> int:
|
||||||
|
return self.data_sectors // self.sectors_per_cluster
|
||||||
|
|
||||||
|
@property
|
||||||
|
def root_directory_start(self) -> int:
|
||||||
|
return (self.reserved_sectors_cnt + self.sectors_per_fat_cnt) * self.sector_size
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fatfs_type(self) -> int:
|
||||||
|
if self.max_clusters < FATFSState.FAT12_MAX_CLUSTERS:
|
||||||
|
return FATFSState.FAT12
|
||||||
|
elif self.max_clusters < FATFSState.FAT16_MAX_CLUSTERS:
|
||||||
|
return FATFSState.FAT16
|
||||||
|
# fat is FAT.FAT32, not supported now
|
||||||
|
raise NotImplementedError('FAT32 is currently not supported.')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def entries_root_count(self) -> int:
|
||||||
|
return (self.root_dir_sectors_cnt * self.sector_size) // self.entry_size
|
225
components/fatfs/fatfsgen_utils/fs_object.py
Normal file
225
components/fatfs/fatfsgen_utils/fs_object.py
Normal file
@@ -0,0 +1,225 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
import os
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
from .entry import Entry
|
||||||
|
from .exceptions import FatalError, WriteDirectoryException
|
||||||
|
from .fat import FAT, Cluster
|
||||||
|
from .fatfs_state import FATFSState
|
||||||
|
from .utils import required_clusters_count, split_content_into_sectors, split_to_name_and_extension
|
||||||
|
|
||||||
|
|
||||||
|
class File:
|
||||||
|
"""
|
||||||
|
The class File provides API to write into the files. It represents file in the FS.
|
||||||
|
"""
|
||||||
|
ATTR_ARCHIVE = 0x20
|
||||||
|
ENTITY_TYPE = ATTR_ARCHIVE
|
||||||
|
|
||||||
|
def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None:
|
||||||
|
self.name = name
|
||||||
|
self.extension = extension
|
||||||
|
self.fatfs_state = fatfs_state
|
||||||
|
self.fat = fat
|
||||||
|
self.size = 0
|
||||||
|
self._first_cluster = None
|
||||||
|
self._entry = entry
|
||||||
|
|
||||||
|
@property
|
||||||
|
def entry(self) -> Entry:
|
||||||
|
return self._entry
|
||||||
|
|
||||||
|
@property
|
||||||
|
def first_cluster(self) -> Optional[Cluster]:
|
||||||
|
return self._first_cluster
|
||||||
|
|
||||||
|
@first_cluster.setter
|
||||||
|
def first_cluster(self, value: Cluster) -> None:
|
||||||
|
self._first_cluster = value
|
||||||
|
|
||||||
|
def name_equals(self, name: str, extension: str) -> bool:
|
||||||
|
return self.name == name and self.extension == extension
|
||||||
|
|
||||||
|
def write(self, content: str) -> None:
|
||||||
|
self.entry.update_content_size(len(content))
|
||||||
|
# we assume that the correct amount of clusters is allocated
|
||||||
|
current_cluster = self._first_cluster
|
||||||
|
for content_part in split_content_into_sectors(content, self.fatfs_state.sector_size):
|
||||||
|
content_as_list = content_part.encode()
|
||||||
|
if current_cluster is None:
|
||||||
|
raise FatalError('No free space left!')
|
||||||
|
|
||||||
|
address = current_cluster.cluster_data_address
|
||||||
|
self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list
|
||||||
|
current_cluster = current_cluster.next_cluster
|
||||||
|
|
||||||
|
|
||||||
|
class Directory:
|
||||||
|
"""
|
||||||
|
The Directory class provides API to add files and directories into the directory
|
||||||
|
and to find the file according to path and write it.
|
||||||
|
"""
|
||||||
|
ATTR_DIRECTORY = 0x10
|
||||||
|
ATTR_ARCHIVE = 0x20
|
||||||
|
ENTITY_TYPE = ATTR_DIRECTORY
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
name,
|
||||||
|
fat,
|
||||||
|
fatfs_state,
|
||||||
|
entry=None,
|
||||||
|
cluster=None,
|
||||||
|
size=None,
|
||||||
|
extension='',
|
||||||
|
parent=None):
|
||||||
|
# type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None
|
||||||
|
self.name = name
|
||||||
|
self.fatfs_state = fatfs_state
|
||||||
|
self.extension = extension
|
||||||
|
|
||||||
|
self.fat = fat
|
||||||
|
self.size = size or self.fatfs_state.sector_size
|
||||||
|
|
||||||
|
# if directory is root its parent is itself
|
||||||
|
self.parent: Directory = parent or self
|
||||||
|
self._first_cluster = cluster
|
||||||
|
|
||||||
|
# entries will be initialized after the cluster allocation
|
||||||
|
self.entries: List[Entry] = []
|
||||||
|
self.entities = [] # type: ignore
|
||||||
|
self._entry = entry # currently not in use (will use later for e.g. modification time, etc.)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_root(self) -> bool:
|
||||||
|
return self.parent is self
|
||||||
|
|
||||||
|
@property
|
||||||
|
def first_cluster(self) -> Cluster:
|
||||||
|
return self._first_cluster
|
||||||
|
|
||||||
|
@first_cluster.setter
|
||||||
|
def first_cluster(self, value: Cluster) -> None:
|
||||||
|
self._first_cluster = value
|
||||||
|
|
||||||
|
def name_equals(self, name: str, extension: str) -> bool:
|
||||||
|
return self.name == name and self.extension == extension
|
||||||
|
|
||||||
|
def create_entries(self, cluster: Cluster) -> list:
|
||||||
|
return [Entry(entry_id=i,
|
||||||
|
parent_dir_entries_address=cluster.cluster_data_address,
|
||||||
|
fatfs_state=self.fatfs_state)
|
||||||
|
for i in range(self.size // self.fatfs_state.entry_size)]
|
||||||
|
|
||||||
|
def init_directory(self) -> None:
|
||||||
|
self.entries = self.create_entries(self._first_cluster)
|
||||||
|
if not self.is_root:
|
||||||
|
# the root directory doesn't contain link to itself nor the parent
|
||||||
|
free_entry1 = self.find_free_entry() or self.chain_directory()
|
||||||
|
free_entry1.allocate_entry(first_cluster_id=self.first_cluster.id,
|
||||||
|
entity_name='.',
|
||||||
|
entity_extension='',
|
||||||
|
entity_type=self.ENTITY_TYPE)
|
||||||
|
self.first_cluster = self._first_cluster
|
||||||
|
free_entry2 = self.find_free_entry() or self.chain_directory()
|
||||||
|
free_entry2.allocate_entry(first_cluster_id=self.parent.first_cluster.id,
|
||||||
|
entity_name='..',
|
||||||
|
entity_extension='',
|
||||||
|
entity_type=self.parent.ENTITY_TYPE)
|
||||||
|
self.parent.first_cluster = self.parent.first_cluster
|
||||||
|
|
||||||
|
def lookup_entity(self, object_name: str, extension: str): # type: ignore
|
||||||
|
for entity in self.entities:
|
||||||
|
if entity.name == object_name and entity.extension == extension:
|
||||||
|
return entity
|
||||||
|
return None
|
||||||
|
|
||||||
|
def recursive_search(self, path_as_list, current_dir): # type: ignore
|
||||||
|
name, extension = split_to_name_and_extension(path_as_list[0])
|
||||||
|
next_obj = current_dir.lookup_entity(name, extension)
|
||||||
|
if next_obj is None:
|
||||||
|
raise FileNotFoundError('No such file or directory!')
|
||||||
|
if len(path_as_list) == 1 and next_obj.name_equals(name, extension):
|
||||||
|
return next_obj
|
||||||
|
return self.recursive_search(path_as_list[1:], next_obj)
|
||||||
|
|
||||||
|
def find_free_entry(self) -> Optional[Entry]:
|
||||||
|
for entry in self.entries:
|
||||||
|
if entry.is_empty:
|
||||||
|
return entry
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _extend_directory(self) -> None:
|
||||||
|
current = self.first_cluster
|
||||||
|
while current.next_cluster is not None:
|
||||||
|
current = current.next_cluster
|
||||||
|
new_cluster = self.fat.find_free_cluster()
|
||||||
|
current.set_in_fat(new_cluster.id)
|
||||||
|
current.next_cluster = new_cluster
|
||||||
|
self.entries += self.create_entries(new_cluster)
|
||||||
|
|
||||||
|
def chain_directory(self) -> Entry:
|
||||||
|
self._extend_directory()
|
||||||
|
free_entry = self.find_free_entry()
|
||||||
|
if free_entry is None:
|
||||||
|
raise FatalError('No more space left!')
|
||||||
|
return free_entry
|
||||||
|
|
||||||
|
def allocate_object(self,
|
||||||
|
name,
|
||||||
|
entity_type,
|
||||||
|
path_from_root=None,
|
||||||
|
extension=''):
|
||||||
|
# type: (str, int, Optional[List[str]], str) -> Tuple[Cluster, Entry, Directory]
|
||||||
|
"""
|
||||||
|
Method finds the target directory in the path
|
||||||
|
and allocates cluster (both the record in FAT and cluster in the data region)
|
||||||
|
and entry in the specified directory
|
||||||
|
"""
|
||||||
|
free_cluster = self.fat.find_free_cluster()
|
||||||
|
target_dir = self if not path_from_root else self.recursive_search(path_from_root, self)
|
||||||
|
free_entry = target_dir.find_free_entry() or target_dir.chain_directory()
|
||||||
|
free_entry.allocate_entry(first_cluster_id=free_cluster.id,
|
||||||
|
entity_name=name,
|
||||||
|
entity_extension=extension,
|
||||||
|
entity_type=entity_type)
|
||||||
|
return free_cluster, free_entry, target_dir
|
||||||
|
|
||||||
|
def new_file(self, name: str, extension: str, path_from_root: Optional[List[str]]) -> None:
|
||||||
|
free_cluster, free_entry, target_dir = self.allocate_object(name=name,
|
||||||
|
extension=extension,
|
||||||
|
entity_type=Directory.ATTR_ARCHIVE,
|
||||||
|
path_from_root=path_from_root)
|
||||||
|
|
||||||
|
file = File(name, fat=self.fat, extension=extension, fatfs_state=self.fatfs_state, entry=free_entry)
|
||||||
|
file.first_cluster = free_cluster
|
||||||
|
target_dir.entities.append(file)
|
||||||
|
|
||||||
|
def new_directory(self, name, parent, path_from_root):
|
||||||
|
# type: (str, Directory, Optional[List[str]]) -> None
|
||||||
|
free_cluster, free_entry, target_dir = self.allocate_object(name=name,
|
||||||
|
entity_type=Directory.ATTR_DIRECTORY,
|
||||||
|
path_from_root=path_from_root)
|
||||||
|
|
||||||
|
directory = Directory(name=name, fat=self.fat, parent=parent, fatfs_state=self.fatfs_state, entry=free_entry)
|
||||||
|
directory.first_cluster = free_cluster
|
||||||
|
directory.init_directory()
|
||||||
|
target_dir.entities.append(directory)
|
||||||
|
|
||||||
|
def write_to_file(self, path: List[str], content: str) -> None:
|
||||||
|
"""
|
||||||
|
Writes to file existing in the directory structure.
|
||||||
|
|
||||||
|
:param path: path split into the list
|
||||||
|
:param content: content as a string to be written into a file
|
||||||
|
:returns: None
|
||||||
|
:raises WriteDirectoryException: raised is the target object for writing is a directory
|
||||||
|
"""
|
||||||
|
entity_to_write = self.recursive_search(path, self)
|
||||||
|
if isinstance(entity_to_write, File):
|
||||||
|
clusters_cnt = required_clusters_count(cluster_size=self.fatfs_state.sector_size, content=content)
|
||||||
|
self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
|
||||||
|
entity_to_write.write(content)
|
||||||
|
else:
|
||||||
|
raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')
|
60
components/fatfs/fatfsgen_utils/utils.py
Normal file
60
components/fatfs/fatfsgen_utils/utils.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
import os
|
||||||
|
import typing
|
||||||
|
|
||||||
|
from construct import Int16ul
|
||||||
|
|
||||||
|
|
||||||
|
def required_clusters_count(cluster_size: int, content: str) -> int:
|
||||||
|
# compute number of required clusters for file text
|
||||||
|
return (len(content) + cluster_size - 1) // cluster_size
|
||||||
|
|
||||||
|
|
||||||
|
def pad_string(content: str, size: typing.Optional[int] = None, pad: int = 0x20) -> str:
|
||||||
|
# cut string if longer and fill with pad character if shorter than size
|
||||||
|
return content.ljust(size or len(content), chr(pad))[:size]
|
||||||
|
|
||||||
|
|
||||||
|
def split_to_name_and_extension(full_name: str) -> typing.Tuple[str, str]:
|
||||||
|
name, extension = os.path.splitext(full_name)
|
||||||
|
return name, extension.replace('.', '')
|
||||||
|
|
||||||
|
|
||||||
|
def is_valid_fatfs_name(string: str) -> bool:
|
||||||
|
return string == string.upper()
|
||||||
|
|
||||||
|
|
||||||
|
def split_by_half_byte_12_bit_little_endian(value: int) -> typing.Tuple[int, int, int]:
|
||||||
|
value_as_bytes = Int16ul.build(value)
|
||||||
|
return value_as_bytes[0] & 0x0f, value_as_bytes[0] >> 4, value_as_bytes[1] & 0x0f
|
||||||
|
|
||||||
|
|
||||||
|
def build_byte(first_half: int, second_half: int) -> int:
|
||||||
|
return (first_half << 4) | second_half
|
||||||
|
|
||||||
|
|
||||||
|
def clean_first_half_byte(bytes_array: bytearray, address: int) -> None:
|
||||||
|
"""
|
||||||
|
the function sets to zero first four bits of the byte.
|
||||||
|
E.g. 10111100 -> 10110000
|
||||||
|
"""
|
||||||
|
bytes_array[address] &= 0xf0
|
||||||
|
|
||||||
|
|
||||||
|
def clean_second_half_byte(bytes_array: bytearray, address: int) -> None:
|
||||||
|
"""
|
||||||
|
the function sets to zero last four bits of the byte.
|
||||||
|
E.g. 10111100 -> 00001100
|
||||||
|
"""
|
||||||
|
bytes_array[address] &= 0x0f
|
||||||
|
|
||||||
|
|
||||||
|
def split_content_into_sectors(content: str, sector_size: int) -> typing.List[str]:
|
||||||
|
result = []
|
||||||
|
clusters_cnt = required_clusters_count(cluster_size=sector_size, content=content)
|
||||||
|
|
||||||
|
for i in range(clusters_cnt):
|
||||||
|
result.append(content[sector_size * i:(i + 1) * sector_size])
|
||||||
|
return result
|
51
components/fatfs/project_include.cmake
Normal file
51
components/fatfs/project_include.cmake
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
# fatfs_create_partition_image
|
||||||
|
#
|
||||||
|
# Create a fatfs image of the specified directory on the host during build and optionally
|
||||||
|
# have the created image flashed using `idf.py flash`
|
||||||
|
function(fatfs_create_partition_image partition base_dir)
|
||||||
|
set(options FLASH_IN_PROJECT)
|
||||||
|
cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}")
|
||||||
|
|
||||||
|
idf_build_get_property(idf_path IDF_PATH)
|
||||||
|
idf_build_get_property(python PYTHON)
|
||||||
|
|
||||||
|
set(fatfsgen_py ${python} ${idf_path}/components/fatfs/fatfsgen.py)
|
||||||
|
|
||||||
|
get_filename_component(base_dir_full_path ${base_dir} ABSOLUTE)
|
||||||
|
|
||||||
|
partition_table_get_partition_info(size "--partition-name ${partition}" "size")
|
||||||
|
partition_table_get_partition_info(offset "--partition-name ${partition}" "offset")
|
||||||
|
|
||||||
|
if("${size}" AND "${offset}")
|
||||||
|
set(image_file ${CMAKE_BINARY_DIR}/${partition}.bin)
|
||||||
|
# Execute FATFS image generation; this always executes as there is no way to specify for CMake to watch for
|
||||||
|
# contents of the base dir changing.
|
||||||
|
add_custom_target(fatfs_${partition}_bin ALL
|
||||||
|
COMMAND ${fatfsgen_py} ${base_dir_full_path}
|
||||||
|
--partition_size ${size}
|
||||||
|
--output_file ${image_file}
|
||||||
|
)
|
||||||
|
|
||||||
|
set_property(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}" APPEND PROPERTY
|
||||||
|
ADDITIONAL_MAKE_CLEAN_FILES
|
||||||
|
${image_file})
|
||||||
|
|
||||||
|
idf_component_get_property(main_args esptool_py FLASH_ARGS)
|
||||||
|
idf_component_get_property(sub_args esptool_py FLASH_SUB_ARGS)
|
||||||
|
# Last (optional) parameter is the encryption for the target. In our
|
||||||
|
# case, fatfs is not encrypt so pass FALSE to the function.
|
||||||
|
esptool_py_flash_target(${partition}-flash "${main_args}" "${sub_args}" ALWAYS_PLAINTEXT)
|
||||||
|
esptool_py_flash_to_partition(${partition}-flash "${partition}" "${image_file}")
|
||||||
|
|
||||||
|
add_dependencies(${partition}-flash fatfs_${partition}_bin)
|
||||||
|
|
||||||
|
if(arg_FLASH_IN_PROJECT)
|
||||||
|
esptool_py_flash_to_partition(flash "${partition}" "${image_file}")
|
||||||
|
add_dependencies(flash fatfs_${partition}_bin)
|
||||||
|
endif()
|
||||||
|
else()
|
||||||
|
set(message "Failed to create FATFS image for partition '${partition}'. "
|
||||||
|
"Check project configuration if using the correct partition table file.")
|
||||||
|
fail_at_build_time(fatfs_${partition}_bin "${message}")
|
||||||
|
endif()
|
||||||
|
endfunction()
|
292
components/fatfs/test_fatfsgen/test_fatfsgen.py
Executable file
292
components/fatfs/test_fatfsgen/test_fatfsgen.py
Executable file
@@ -0,0 +1,292 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from typing import Any, Dict, Union
|
||||||
|
|
||||||
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
import fatfsgen # noqa E402
|
||||||
|
from fatfsgen_utils.exceptions import WriteDirectoryException # noqa E402
|
||||||
|
from fatfsgen_utils.exceptions import LowerCaseException, NoFreeClusterException, TooLongNameException # noqa E402
|
||||||
|
|
||||||
|
|
||||||
|
class FatFSGen(unittest.TestCase):
|
||||||
|
CFG = dict(
|
||||||
|
sector_size=4096,
|
||||||
|
entry_size=32,
|
||||||
|
fat_start=0x1000,
|
||||||
|
data_start=0x7000,
|
||||||
|
root_start=0x2000,
|
||||||
|
output_file=os.path.join('output_data', 'tmp_file.img'),
|
||||||
|
test_dir=os.path.join('output_data', 'test'),
|
||||||
|
test_dir2=os.path.join('output_data', 'tst_str'),
|
||||||
|
) # type: Union[Dict[str, Any]]
|
||||||
|
|
||||||
|
def setUp(self) -> None:
|
||||||
|
os.makedirs('output_data')
|
||||||
|
self.generate_test_dir_1()
|
||||||
|
self.generate_test_dir_2()
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
shutil.rmtree('output_data')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def generate_test_dir_1() -> None:
|
||||||
|
os.makedirs(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test'))
|
||||||
|
with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test', 'lastfile'), 'w') as file:
|
||||||
|
file.write('deeptest\n')
|
||||||
|
with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'testfil2'), 'w') as file:
|
||||||
|
file.write('thisistest\n')
|
||||||
|
with open(os.path.join(FatFSGen.CFG['test_dir'], 'testfile'), 'w') as file:
|
||||||
|
file.write('ahoj\n')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def generate_test_dir_2() -> None:
|
||||||
|
os.makedirs(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test'))
|
||||||
|
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test', 'lastfile.txt'), 'w') as file:
|
||||||
|
file.write('deeptest\n')
|
||||||
|
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'testfil2'), 'w') as file:
|
||||||
|
file.write('thisistest\n')
|
||||||
|
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'testfile'), 'w') as file:
|
||||||
|
file.write('ahoj\n')
|
||||||
|
|
||||||
|
def test_empty_file_sn_fat12(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_file('TESTFILE')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(file_system[0x2000:0x200c], b'TESTFILE \x20') # check entry name and type
|
||||||
|
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
|
||||||
|
|
||||||
|
def test_directory_sn_fat12(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10') # check entry name and type
|
||||||
|
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
|
||||||
|
self.assertEqual(file_system[0x6000:0x600c], b'. \x10') # reference to itself
|
||||||
|
self.assertEqual(file_system[0x6020:0x602c], b'.. \x10') # reference to parent
|
||||||
|
|
||||||
|
def test_empty_file_with_extension_sn_fat12(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_file('TESTF', extension='TXT')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
|
||||||
|
self.assertEqual(file_system[0x2000:0x200c], b'TESTF TXT\x20') # check entry name and type
|
||||||
|
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
|
||||||
|
|
||||||
|
def test_write_to_file_with_extension_sn_fat12(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_file('WRITEF', extension='TXT')
|
||||||
|
fatfs.write_content(path_from_root=['WRITEF.TXT'], content='testcontent')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
|
||||||
|
self.assertEqual(file_system[0x2000:0x200c], b'WRITEF TXT\x20') # check entry name and type
|
||||||
|
self.assertEqual(file_system[0x201a:0x2020], b'\x02\x00\x0b\x00\x00\x00') # check size and cluster ref
|
||||||
|
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
|
||||||
|
self.assertEqual(file_system[0x6000:0x600f], b'testcontent\x00\x00\x00\x00') # check file content
|
||||||
|
|
||||||
|
def test_write_to_file_in_folder_sn_fat12(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD'])
|
||||||
|
fatfs.write_content(path_from_root=['TESTFOLD', 'WRITEF.TXT'], content='testcontent')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
|
||||||
|
self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10')
|
||||||
|
self.assertEqual(
|
||||||
|
file_system[0x1000:0x1010],
|
||||||
|
b'\xf8\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x6040:0x6050], b'WRITEF TXT\x20\x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x605a:0x6060], b'\x03\x00\x0b\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x7000:0x700b], b'testcontent') # check file content
|
||||||
|
|
||||||
|
def test_cluster_setting_values(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_file('TESTFIL1')
|
||||||
|
fatfs.create_file('TESTFIL2')
|
||||||
|
fatfs.create_file('TESTFIL3')
|
||||||
|
fatfs.create_file('TESTFIL4')
|
||||||
|
fatfs.create_file('TESTFIL5')
|
||||||
|
fatfs.fat.clusters[2].set_in_fat(1000)
|
||||||
|
fatfs.fat.clusters[3].set_in_fat(4)
|
||||||
|
fatfs.fat.clusters[4].set_in_fat(5)
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(
|
||||||
|
file_system[0x1000:0x1010],
|
||||||
|
b'\xf8\xff\xff\xe8\x43\x00\x05\xf0\xff\xff\x0f\x00\x00\x00\x00\x00')
|
||||||
|
|
||||||
|
def test_full_sector_file(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_file('WRITEF', extension='TXT')
|
||||||
|
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\xff\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x6000: 0x7000], FatFSGen.CFG['sector_size'] * b'a')
|
||||||
|
|
||||||
|
def test_file_chaining(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_file('WRITEF', extension='TXT')
|
||||||
|
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a' + 'a')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\x03\xf0\xff\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x7000: 0x8000], b'a' + (FatFSGen.CFG['sector_size'] - 1) * b'\x00')
|
||||||
|
|
||||||
|
def test_full_sector_folder(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
|
||||||
|
for i in range(FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
|
||||||
|
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
|
||||||
|
fatfs.write_content(path_from_root=['TESTFOLD', 'A0'], content='first')
|
||||||
|
fatfs.write_content(path_from_root=['TESTFOLD', 'A126'], content='later')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(file_system[0x1000: 0x10d0],
|
||||||
|
b'\xf8\xff\xff\x82\xf0\xff' + 192 * b'\xff' + 10 * b'\x00')
|
||||||
|
self.assertEqual(file_system[0x85000:0x85005], b'later')
|
||||||
|
self.assertEqual(file_system[0x86000:0x86010], b'A126 \x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x86020:0x86030], b'A127 \x00\x00\x01\x00')
|
||||||
|
|
||||||
|
def test_write_to_folder_in_folder_sn_fat12(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
fatfs.create_directory('TESTFOLL', path_from_root=['TESTFOLD'])
|
||||||
|
self.assertRaises(WriteDirectoryException, fatfs.write_content, path_from_root=['TESTFOLD', 'TESTFOLL'],
|
||||||
|
content='testcontent')
|
||||||
|
|
||||||
|
def test_write_non_existing_file_in_folder_sn_fat12(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
self.assertRaises(FileNotFoundError, fatfs.write_content, path_from_root=['TESTFOLD', 'AHOJ'],
|
||||||
|
content='testcontent')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create_too_many_files() -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
|
||||||
|
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
|
||||||
|
|
||||||
|
def test_too_many_files(self) -> None:
|
||||||
|
self.assertRaises(NoFreeClusterException, self.create_too_many_files)
|
||||||
|
|
||||||
|
def test_full_two_sectors_folder(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS(size=2 * 1024 * 1024)
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
|
||||||
|
for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
|
||||||
|
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
|
||||||
|
fatfs.write_content(path_from_root=['TESTFOLD', 'A253'], content='later')
|
||||||
|
fatfs.write_content(path_from_root=['TESTFOLD', 'A255'], content='last')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(file_system[0x105000:0x105010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x108000:0x108010], b'last\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
|
||||||
|
def test_lower_case_dir_short_names(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
self.assertRaises(LowerCaseException, fatfs.create_directory, 'testfold')
|
||||||
|
|
||||||
|
def test_lower_case_file_short_names(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
self.assertRaises(LowerCaseException, fatfs.create_file, 'newfile')
|
||||||
|
|
||||||
|
def test_too_long_name_dir_short_names(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
self.assertRaises(TooLongNameException, fatfs.create_directory, 'TOOLONGNAME')
|
||||||
|
|
||||||
|
def test_fatfs16_detection(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS(size=16 * 1024 * 1024)
|
||||||
|
self.assertEqual(fatfs.state.fatfs_type, 16)
|
||||||
|
|
||||||
|
def test_fatfs32_detection(self) -> None:
|
||||||
|
self.assertRaises(NotImplementedError, fatfsgen.FATFS, size=256 * 1024 * 1024)
|
||||||
|
|
||||||
|
def test_deep_structure(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
fatfs.create_directory('TESTFOLL', path_from_root=['TESTFOLD'])
|
||||||
|
fatfs.create_directory('TESTFOLO', path_from_root=['TESTFOLD', 'TESTFOLL'])
|
||||||
|
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO'])
|
||||||
|
fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO', 'WRITEF.TXT'], content='later')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
|
||||||
|
self.assertEqual(file_system[0x9000:0x9010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
|
||||||
|
def test_same_name_deep_structure(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.create_directory('TESTFOLD')
|
||||||
|
fatfs.create_directory('TESTFOLD', path_from_root=['TESTFOLD'])
|
||||||
|
fatfs.create_directory('TESTFOLD', path_from_root=['TESTFOLD', 'TESTFOLD'])
|
||||||
|
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD'])
|
||||||
|
fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD', 'WRITEF.TXT'], content='later')
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
|
||||||
|
self.assertEqual(file_system[0x2000:0x2010], b'TESTFOLD \x10\x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x2010:0x2020], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x02\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x6040:0x6050], b'TESTFOLD \x10\x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x6040:0x6050], b'TESTFOLD \x10\x00\x00\x01\x00')
|
||||||
|
|
||||||
|
self.assertEqual(file_system[0x7040:0x7050], b'TESTFOLD \x10\x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x8040:0x8050], b'WRITEF TXT \x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x9000:0x9010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
|
||||||
|
def test_e2e_deep_folder_into_image(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.generate(FatFSGen.CFG['test_dir'])
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
|
||||||
|
file_system = fs_file.read()
|
||||||
|
self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x6070:0x6080], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x05\x00\x0b\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x7040:0x7050], b'LASTFILE \x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x8000:0x8010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x9000:0x9010], b'thisistest\n\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0xa000:0xa010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
|
||||||
|
def test_e2e_deep_folder_into_image_ext(self) -> None:
|
||||||
|
fatfs = fatfsgen.FATFS()
|
||||||
|
fatfs.generate(FatFSGen.CFG['test_dir2'])
|
||||||
|
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
file_system = fatfs.read_filesystem(FatFSGen.CFG['output_file'])
|
||||||
|
|
||||||
|
self.assertEqual(file_system[0x2020:0x2030], b'TESTFILE \x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x7000:0x7010], b'. \x10\x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x7040:0x7050], b'LASTFILETXT \x00\x00\x01\x00')
|
||||||
|
self.assertEqual(file_system[0x8000:0x8010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0x9000:0x9010], b'thisistest\n\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0xa000:0xa010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||||
|
self.assertEqual(file_system[0xb000:0xb009], b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
@@ -84,3 +84,37 @@ They provide implementation of disk I/O functions for SD/MMC cards and can be re
|
|||||||
.. doxygenfunction:: ff_diskio_register_wl_partition
|
.. doxygenfunction:: ff_diskio_register_wl_partition
|
||||||
.. doxygenfunction:: ff_diskio_register_raw_partition
|
.. doxygenfunction:: ff_diskio_register_raw_partition
|
||||||
|
|
||||||
|
|
||||||
|
FATFS partition generator
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
We provide partition generator for FATFS (:component_file:`fatfsgen.py<fatfs/fatfsgen.py>`)
|
||||||
|
which is integrated into the build system and could be easily used in the user project.
|
||||||
|
The tool is used to create filesystem images on a host and populate it with content of the specified host folder.
|
||||||
|
Current implementation supports short file names, FAT12 and read-only mode
|
||||||
|
(because the wear levelling is not implemented yet). The WL, long file names, and FAT16 are subjects of future work.
|
||||||
|
|
||||||
|
Build system integration with FATFS partition generator
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
It is possible to invoke FATFS generator directly from the CMake build system by calling ``fatfs_create_partition_image``::
|
||||||
|
|
||||||
|
fatfs_create_partition_image(<partition> <base_dir> [FLASH_IN_PROJECT])
|
||||||
|
|
||||||
|
``fatfs_create_partition_image`` must be called from project's CMakeLists.txt.
|
||||||
|
|
||||||
|
The arguments of the function are as follows:
|
||||||
|
|
||||||
|
1. partition - the name of the partition, you can define in partition table (e.g. :example_file:`storage/fatfsgen/partitions_example.csv`)
|
||||||
|
|
||||||
|
2. base_dir - the directory that will be encoded to FATFS partition and optionally flashed into the device. Beware that you have to specified suitable size of the partition in the partition table.
|
||||||
|
|
||||||
|
3. flag ``FLASH_IN_PROJECT`` - optionally, user can opt to have the image automatically flashed together with the app binaries, partition tables, etc. on ``idf.py flash -p <PORT>`` by specifying ``FLASH_IN_PROJECT``.
|
||||||
|
|
||||||
|
For example::
|
||||||
|
|
||||||
|
fatfs_create_partition_image(my_fatfs_partition my_folder FLASH_IN_PROJECT)
|
||||||
|
|
||||||
|
If FLASH_IN_PROJECT is not specified, the image will still be generated, but you will have to flash it manually using ``esptool.py`` or a custom build system target.
|
||||||
|
|
||||||
|
For an example, see :example:`storage/fatfsgen`.
|
||||||
|
6
examples/storage/fatfsgen/CMakeLists.txt
Normal file
6
examples/storage/fatfsgen/CMakeLists.txt
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
# The following lines of boilerplate have to be in your project's CMakeLists
|
||||||
|
# in this exact order for cmake to work correctly
|
||||||
|
cmake_minimum_required(VERSION 3.5)
|
||||||
|
|
||||||
|
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
|
||||||
|
project(fatfsgen)
|
56
examples/storage/fatfsgen/README.md
Normal file
56
examples/storage/fatfsgen/README.md
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
# FATFS partition generation on build example
|
||||||
|
|
||||||
|
(See the README.md file in the upper level 'examples' directory for more information about examples.)
|
||||||
|
|
||||||
|
This example demonstrates how to use the FATFS partition
|
||||||
|
generation tool [fatfsgen.py](../../../components/fatfs/fatfsgen.py) to automatically create a FATFS
|
||||||
|
filesystem image (without wear levelling support)
|
||||||
|
from the contents of a host folder during build, with an option of
|
||||||
|
automatically flashing the created image on invocation of `idf.py -p PORT flash`.
|
||||||
|
Beware that the minimal required size of the flash is 4 MB.
|
||||||
|
The generated partition does not support wear levelling,
|
||||||
|
so it can be mounted only in read-only mode.
|
||||||
|
|
||||||
|
The following gives an overview of the example:
|
||||||
|
|
||||||
|
1. There is a directory `fatfs_image` from which the FATFS filesystem image will be created.
|
||||||
|
|
||||||
|
2. The function `fatfs_create_partition_image` is used to specify that a FATFS image
|
||||||
|
should be created during build for the `storage` partition. For CMake, it is called from [the main component's CMakeLists.txt](./main/CMakeLists.txt).
|
||||||
|
`FLASH_IN_PROJECT` specifies that the created image
|
||||||
|
should be flashed on invocation of `idf.py -p PORT flash` together with app, bootloader, partition table, etc.
|
||||||
|
The image is created on the example's build directory with the output filename `storage.bin`.
|
||||||
|
|
||||||
|
3. Upon invocation of `idf.py -p PORT flash monitor`, application loads and
|
||||||
|
finds there is already a valid FATFS filesystem in the `storage` partition with files same as those in `fatfs_image` directory. The application is then
|
||||||
|
able to read those files.
|
||||||
|
|
||||||
|
## How to use example
|
||||||
|
|
||||||
|
### Build and flash
|
||||||
|
|
||||||
|
To run the example, type the following command:
|
||||||
|
|
||||||
|
```CMake
|
||||||
|
# CMake
|
||||||
|
idf.py -p PORT flash monitor
|
||||||
|
```
|
||||||
|
|
||||||
|
(To exit the serial monitor, type ``Ctrl-]``.)
|
||||||
|
|
||||||
|
See the Getting Started Guide for full steps to configure and use ESP-IDF to build projects.
|
||||||
|
|
||||||
|
## Example output
|
||||||
|
|
||||||
|
Here is the example's console output:
|
||||||
|
|
||||||
|
```
|
||||||
|
...
|
||||||
|
I (322) example: Mounting FAT filesystem
|
||||||
|
I (332) example: Reading file
|
||||||
|
I (332) example: Read from file: 'this is test'
|
||||||
|
I (332) example: Unmounting FAT filesystem
|
||||||
|
I (342) example: Done
|
||||||
|
```
|
||||||
|
|
||||||
|
The logic of the example is contained in a [single source file](./main/fatfsgen_example_main.c), and it should be relatively simple to match points in its execution with the log outputs above.
|
1
examples/storage/fatfsgen/fatfs_image/hello.txt
Normal file
1
examples/storage/fatfsgen/fatfs_image/hello.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
this file is test as well
|
1
examples/storage/fatfsgen/fatfs_image/sub/test.txt
Normal file
1
examples/storage/fatfsgen/fatfs_image/sub/test.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
this is test
|
20
examples/storage/fatfsgen/fatfsgen_example_test.py
Normal file
20
examples/storage/fatfsgen/fatfsgen_example_test.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
# SPDX-License-Identifier: CC0
|
||||||
|
import ttfw_idf
|
||||||
|
|
||||||
|
|
||||||
|
@ttfw_idf.idf_example_test(env_tag='Example_GENERIC')
|
||||||
|
def test_examples_fatfsgen(env, _): # type: ignore
|
||||||
|
|
||||||
|
dut = env.get_dut('fatfsgen', 'examples/storage/fatfsgen')
|
||||||
|
dut.start_app()
|
||||||
|
dut.expect_all('example: Mounting FAT filesystem',
|
||||||
|
'example: Reading file',
|
||||||
|
'example: Read from file: \'this is test\'',
|
||||||
|
'example: Unmounting FAT filesystem',
|
||||||
|
'example: Done',
|
||||||
|
timeout=20)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
test_examples_fatfsgen()
|
8
examples/storage/fatfsgen/main/CMakeLists.txt
Normal file
8
examples/storage/fatfsgen/main/CMakeLists.txt
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
idf_component_register(SRCS "fatfsgen_example_main.c"
|
||||||
|
INCLUDE_DIRS ".")
|
||||||
|
|
||||||
|
# Create a FATFS image from the contents of the 'fatfs_image' directory
|
||||||
|
# that fits the partition named 'storage'. FLASH_IN_PROJECT indicates that
|
||||||
|
# the generated image should be flashed when the entire project is flashed to
|
||||||
|
# the target with 'idf.py -p PORT flash'.
|
||||||
|
fatfs_create_partition_image(storage ../fatfs_image FLASH_IN_PROJECT)
|
58
examples/storage/fatfsgen/main/fatfsgen_example_main.c
Normal file
58
examples/storage/fatfsgen/main/fatfsgen_example_main.c
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
/*
|
||||||
|
* SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||||
|
*
|
||||||
|
* SPDX-License-Identifier: CC0
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include "esp_vfs.h"
|
||||||
|
#include "esp_vfs_fat.h"
|
||||||
|
#include "esp_system.h"
|
||||||
|
#include "sdkconfig.h"
|
||||||
|
|
||||||
|
static const char *TAG = "example";
|
||||||
|
|
||||||
|
|
||||||
|
// Mount path for the partition
|
||||||
|
const char *base_path = "/spiflash";
|
||||||
|
|
||||||
|
void app_main(void)
|
||||||
|
{
|
||||||
|
ESP_LOGI(TAG, "Mounting FAT filesystem");
|
||||||
|
// To mount device we need name of device partition, define base_path
|
||||||
|
// and allow format partition in case if it is new one and was not formatted before
|
||||||
|
const esp_vfs_fat_mount_config_t mount_config = {
|
||||||
|
.max_files = 4,
|
||||||
|
.format_if_mount_failed = false,
|
||||||
|
.allocation_unit_size = CONFIG_WL_SECTOR_SIZE
|
||||||
|
};
|
||||||
|
esp_err_t err = esp_vfs_fat_rawflash_mount(base_path, "storage", &mount_config);
|
||||||
|
if (err != ESP_OK) {
|
||||||
|
ESP_LOGE(TAG, "Failed to mount FATFS (%s)", esp_err_to_name(err));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Open file for reading
|
||||||
|
ESP_LOGI(TAG, "Reading file");
|
||||||
|
FILE *f = fopen("/spiflash/sub/test.txt", "rb");
|
||||||
|
if (f == NULL) {
|
||||||
|
ESP_LOGE(TAG, "Failed to open file for reading");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
char line[128];
|
||||||
|
fgets(line, sizeof(line), f);
|
||||||
|
fclose(f);
|
||||||
|
// strip newline
|
||||||
|
char *pos = strchr(line, '\n');
|
||||||
|
if (pos) {
|
||||||
|
*pos = '\0';
|
||||||
|
}
|
||||||
|
ESP_LOGI(TAG, "Read from file: '%s'", line);
|
||||||
|
|
||||||
|
// Unmount FATFS
|
||||||
|
ESP_LOGI(TAG, "Unmounting FAT filesystem");
|
||||||
|
ESP_ERROR_CHECK( esp_vfs_fat_rawflash_unmount(base_path, "storage"));
|
||||||
|
|
||||||
|
ESP_LOGI(TAG, "Done");
|
||||||
|
}
|
6
examples/storage/fatfsgen/partitions_example.csv
Normal file
6
examples/storage/fatfsgen/partitions_example.csv
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
# Name, Type, SubType, Offset, Size, Flags
|
||||||
|
# Note: if you have increased the bootloader size, make sure to update the offsets to avoid overlap
|
||||||
|
nvs, data, nvs, 0x9000, 0x6000,
|
||||||
|
phy_init, data, phy, 0xf000, 0x1000,
|
||||||
|
factory, app, factory, 0x10000, 1M,
|
||||||
|
storage, data, fat, , 1M,
|
|
4
examples/storage/fatfsgen/sdkconfig.defaults
Normal file
4
examples/storage/fatfsgen/sdkconfig.defaults
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
CONFIG_PARTITION_TABLE_CUSTOM=y
|
||||||
|
CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partitions_example.csv"
|
||||||
|
CONFIG_PARTITION_TABLE_FILENAME="partitions_example.csv"
|
||||||
|
CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y
|
@@ -7,6 +7,8 @@ components/espcoredump/espcoredump.py
|
|||||||
components/espcoredump/test/test_espcoredump.py
|
components/espcoredump/test/test_espcoredump.py
|
||||||
components/espcoredump/test/test_espcoredump.sh
|
components/espcoredump/test/test_espcoredump.sh
|
||||||
components/espcoredump/test_apps/build_espcoredump.sh
|
components/espcoredump/test_apps/build_espcoredump.sh
|
||||||
|
components/fatfs/fatfsgen.py
|
||||||
|
components/fatfs/test_fatfsgen/test_fatfsgen.py
|
||||||
components/heap/test_multi_heap_host/test_all_configs.sh
|
components/heap/test_multi_heap_host/test_all_configs.sh
|
||||||
components/mbedtls/esp_crt_bundle/gen_crt_bundle.py
|
components/mbedtls/esp_crt_bundle/gen_crt_bundle.py
|
||||||
components/mbedtls/esp_crt_bundle/test_gen_crt_bundle/test_gen_crt_bundle.py
|
components/mbedtls/esp_crt_bundle/test_gen_crt_bundle/test_gen_crt_bundle.py
|
||||||
|
Reference in New Issue
Block a user