From 3c4034d36e91109cd8d0cd4f64672f5ee9a8bd29 Mon Sep 17 00:00:00 2001 From: Martin Gano Date: Wed, 22 Sep 2021 00:32:54 +0200 Subject: [PATCH] support for generating FATFS on a host --- .gitlab/CODEOWNERS | 1 + .gitlab/ci/host-test.yml | 6 + components/fatfs/fatfsgen.py | 216 +++++++++++++ components/fatfs/fatfsgen_utils/__init__.py | 0 components/fatfs/fatfsgen_utils/cluster.py | 112 +++++++ components/fatfs/fatfsgen_utils/entry.py | 127 ++++++++ components/fatfs/fatfsgen_utils/exceptions.py | 33 ++ components/fatfs/fatfsgen_utils/fat.py | 42 +++ .../fatfs/fatfsgen_utils/fatfs_state.py | 98 ++++++ components/fatfs/fatfsgen_utils/fs_object.py | 225 ++++++++++++++ components/fatfs/fatfsgen_utils/utils.py | 60 ++++ components/fatfs/project_include.cmake | 51 +++ .../fatfs/test_fatfsgen/test_fatfsgen.py | 292 ++++++++++++++++++ docs/en/api-reference/storage/fatfs.rst | 34 ++ examples/storage/fatfsgen/CMakeLists.txt | 6 + examples/storage/fatfsgen/README.md | 56 ++++ .../storage/fatfsgen/fatfs_image/hello.txt | 1 + .../storage/fatfsgen/fatfs_image/sub/test.txt | 1 + .../storage/fatfsgen/fatfsgen_example_test.py | 20 ++ examples/storage/fatfsgen/main/CMakeLists.txt | 8 + .../fatfsgen/main/fatfsgen_example_main.c | 58 ++++ .../storage/fatfsgen/partitions_example.csv | 6 + examples/storage/fatfsgen/sdkconfig.defaults | 4 + tools/ci/executable-list.txt | 2 + 24 files changed, 1459 insertions(+) create mode 100755 components/fatfs/fatfsgen.py create mode 100644 components/fatfs/fatfsgen_utils/__init__.py create mode 100644 components/fatfs/fatfsgen_utils/cluster.py create mode 100644 components/fatfs/fatfsgen_utils/entry.py create mode 100644 components/fatfs/fatfsgen_utils/exceptions.py create mode 100644 components/fatfs/fatfsgen_utils/fat.py create mode 100644 components/fatfs/fatfsgen_utils/fatfs_state.py create mode 100644 components/fatfs/fatfsgen_utils/fs_object.py create mode 100644 components/fatfs/fatfsgen_utils/utils.py create mode 100644 components/fatfs/project_include.cmake create mode 100755 components/fatfs/test_fatfsgen/test_fatfsgen.py create mode 100644 examples/storage/fatfsgen/CMakeLists.txt create mode 100644 examples/storage/fatfsgen/README.md create mode 100644 examples/storage/fatfsgen/fatfs_image/hello.txt create mode 100644 examples/storage/fatfsgen/fatfs_image/sub/test.txt create mode 100644 examples/storage/fatfsgen/fatfsgen_example_test.py create mode 100644 examples/storage/fatfsgen/main/CMakeLists.txt create mode 100644 examples/storage/fatfsgen/main/fatfsgen_example_main.c create mode 100644 examples/storage/fatfsgen/partitions_example.csv create mode 100644 examples/storage/fatfsgen/sdkconfig.defaults diff --git a/.gitlab/CODEOWNERS b/.gitlab/CODEOWNERS index e9ffd6755c..b183a1f607 100644 --- a/.gitlab/CODEOWNERS +++ b/.gitlab/CODEOWNERS @@ -103,6 +103,7 @@ /components/esptool_py/ @esp-idf-codeowners/tools /components/expat/ @esp-idf-codeowners/app-utilities /components/fatfs/ @esp-idf-codeowners/storage +/components/fatfs/**/*.py @esp-idf-codeowners/tools /components/freemodbus/ @esp-idf-codeowners/peripherals /components/freertos/ @esp-idf-codeowners/system /components/hal/ @esp-idf-codeowners/peripherals diff --git a/.gitlab/ci/host-test.yml b/.gitlab/ci/host-test.yml index d22e609862..5bebaa040a 100644 --- a/.gitlab/ci/host-test.yml +++ b/.gitlab/ci/host-test.yml @@ -115,6 +115,12 @@ test_spiffs_on_host: - cd ../test_spiffsgen - ./test_spiffsgen.py +test_fatfsgen_on_host: + extends: .host_test_template + script: + - cd components/fatfs/test_fatfsgen/ + - ./test_fatfsgen.py + test_multi_heap_on_host: extends: .host_test_template script: diff --git a/components/fatfs/fatfsgen.py b/components/fatfs/fatfsgen.py new file mode 100755 index 0000000000..8d52c98bcc --- /dev/null +++ b/components/fatfs/fatfsgen.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import os +import uuid +from typing import Any, List, Optional + +from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct +from fatfsgen_utils.fat import FAT +from fatfsgen_utils.fatfs_state import FATFSState +from fatfsgen_utils.fs_object import Directory +from fatfsgen_utils.utils import pad_string + + +class FATFS: + """ + The class FATFS provides API for generating FAT file system. + It contains reference to the FAT table and to the root directory. + """ + MAX_VOL_LAB_SIZE = 11 + MAX_OEM_NAME_SIZE = 8 + MAX_FS_TYPE_SIZE = 8 + BOOT_HEADER_SIZE = 512 + + BOOT_SECTOR_HEADER = Struct( + 'BS_jmpBoot' / Const(b'\xeb\xfe\x90'), + 'BS_OEMName' / PaddedString(MAX_OEM_NAME_SIZE, 'utf-8'), + 'BPB_BytsPerSec' / Int16ul, + 'BPB_SecPerClus' / Int8ul, + 'BPB_RsvdSecCnt' / Int16ul, + 'BPB_NumFATs' / Int8ul, + 'BPB_RootEntCnt' / Int16ul, + 'BPB_TotSec16' / Int16ul, + 'BPB_Media' / Int8ul, + 'BPB_FATSz16' / Int16ul, + 'BPB_SecPerTrk' / Int16ul, + 'BPB_NumHeads' / Int16ul, + 'BPB_HiddSec' / Int32ul, + 'BPB_TotSec32' / Int32ul, + 'BS_DrvNum' / Const(b'\x80'), + 'BS_Reserved1' / Const(b'\x00'), + 'BS_BootSig' / Const(b'\x29'), + 'BS_VolID' / Int32ul, + 'BS_VolLab' / PaddedString(MAX_VOL_LAB_SIZE, 'utf-8'), + 'BS_FilSysType' / PaddedString(MAX_FS_TYPE_SIZE, 'utf-8'), + 'BS_EMPTY' / Const(448 * b'\x00'), + 'Signature_word' / Const(b'\x55\xAA') + ) + + def __init__(self, + binary_image_path: Optional[str] = None, + size: int = 1024 * 1024, + reserved_sectors_cnt: int = 1, + fat_tables_cnt: int = 1, + sectors_per_cluster: int = 1, + sector_size: int = 0x1000, + sectors_per_fat: int = 1, + root_dir_sectors_cnt: int = 4, + hidden_sectors: int = 0, + long_names_enabled: bool = False, + entry_size: int = 32, + wl_sectors: int = 0, + num_heads: int = 0xff, + oem_name: str = 'MSDOS5.0', + sec_per_track: int = 0x3f, + volume_label: str = 'Espressif', + file_sys_type: str = 'FAT', + media_type: int = 0xf8) -> None: + + self.state = FATFSState(entry_size=entry_size, + sector_size=sector_size, + reserved_sectors_cnt=reserved_sectors_cnt, + root_dir_sectors_cnt=root_dir_sectors_cnt, + size=size, + file_sys_type=file_sys_type, + num_heads=num_heads, + fat_tables_cnt=fat_tables_cnt, + sectors_per_fat=sectors_per_fat, + sectors_per_cluster=sectors_per_cluster, + media_type=media_type, + hidden_sectors=hidden_sectors, + sec_per_track=sec_per_track, + long_names_enabled=long_names_enabled, + volume_label=volume_label, + wl_sectors=wl_sectors, + oem_name=oem_name) + binary_image = bytearray( + self.read_filesystem(binary_image_path) if binary_image_path else self.create_empty_fatfs()) + self.state.binary_image = binary_image + + self.fat = FAT(fatfs_state=self.state, + reserved_sectors_cnt=self.state.reserved_sectors_cnt) + + self.root_directory = Directory(name='A', # the name is not important + size=self.state.root_dir_sectors_cnt * self.state.sector_size, + fat=self.fat, + cluster=self.fat.clusters[1], + fatfs_state=self.state) + self.root_directory.init_directory() + + def create_file(self, name: str, extension: str = '', path_from_root: Optional[List[str]] = None) -> None: + # when path_from_root is None the dir is root + self.root_directory.new_file(name=name, extension=extension, path_from_root=path_from_root) + + def create_directory(self, name: str, path_from_root: Optional[List[str]] = None) -> None: + # when path_from_root is None the dir is root + parent_dir = self.root_directory + if path_from_root: + parent_dir = self.root_directory.recursive_search(path_from_root, self.root_directory) + self.root_directory.new_directory(name=name, parent=parent_dir, path_from_root=path_from_root) + + def write_content(self, path_from_root: List[str], content: str) -> None: + """ + fat fs invokes root directory to recursively find the required file and writes the content + """ + self.root_directory.write_to_file(path_from_root, content) + + def create_empty_fatfs(self) -> Any: + sectors_count = self.state.size // self.state.sector_size + volume_uuid = uuid.uuid4().int & 0xFFFFFFFF + return ( + FATFS.BOOT_SECTOR_HEADER.build( + dict(BS_OEMName=pad_string(self.state.oem_name, size=FATFS.MAX_OEM_NAME_SIZE), + BPB_BytsPerSec=self.state.sectors_per_cluster * self.state.sector_size, + BPB_SecPerClus=self.state.sectors_per_cluster, + BPB_RsvdSecCnt=self.state.reserved_sectors_cnt, + BPB_NumFATs=self.state.fat_tables_cnt, + BPB_RootEntCnt=self.state.entries_root_count, + BPB_TotSec16=0x00 if self.state.fatfs_type == FATFSState.FAT32 else sectors_count, + BPB_Media=self.state.media_type, + BPB_FATSz16=self.state.sectors_per_fat_cnt, + BPB_SecPerTrk=self.state.sec_per_track, + BPB_NumHeads=self.state.num_heads, + BPB_HiddSec=self.state.hidden_sectors, + BPB_TotSec32=sectors_count if self.state.fatfs_type == FATFSState.FAT32 else 0x00, + BS_VolID=volume_uuid, + BS_VolLab=pad_string(self.state.volume_label, size=FATFS.MAX_VOL_LAB_SIZE), + BS_FilSysType=pad_string(self.state.file_sys_type, size=FATFS.MAX_FS_TYPE_SIZE) + ) + ) + + (self.state.sector_size - FATFS.BOOT_HEADER_SIZE) * b'\x00' + + self.state.sectors_per_fat_cnt * self.state.fat_tables_cnt * self.state.sector_size * b'\x00' + + self.state.root_dir_sectors_cnt * self.state.sector_size * b'\x00' + + self.state.data_sectors * self.state.sector_size * b'\xff' + ) + + @staticmethod + def read_filesystem(path: str) -> bytearray: + with open(path, 'rb') as fs_file: + return bytearray(fs_file.read()) + + def write_filesystem(self, output_path: str) -> None: + with open(output_path, 'wb') as output: + output.write(bytearray(self.state.binary_image)) + + def _generate_partition_from_folder(self, + folder_relative_path: str, + folder_path: str = '', + is_dir: bool = False) -> None: + """ + Given path to folder and folder name recursively encodes folder into binary image. + Used by method generate + """ + real_path = os.path.join(folder_path, folder_relative_path) + smaller_path = folder_relative_path + + folder_relative_path = folder_relative_path.upper() + + normal_path = os.path.normpath(folder_relative_path) + split_path = normal_path.split(os.sep) + if os.path.isfile(real_path): + with open(real_path) as file: + content = file.read() + file_name, extension = os.path.splitext(split_path[-1]) + extension = extension[1:] # remove the dot from the extension + self.create_file(name=file_name, extension=extension, path_from_root=split_path[1:-1] or None) + self.write_content(split_path[1:], content) + elif os.path.isdir(real_path): + if not is_dir: + self.create_directory(split_path[-1], split_path[1:-1]) + + # sorting files for better testability + dir_content = list(sorted(os.listdir(real_path))) + for path in dir_content: + self._generate_partition_from_folder(os.path.join(smaller_path, path), folder_path=folder_path) + + def generate(self, input_directory: str) -> None: + """ + Normalize path to folder and recursively encode folder to binary image + """ + path_to_folder, folder_name = os.path.split(input_directory) + self._generate_partition_from_folder(folder_name, folder_path=path_to_folder, is_dir=True) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Create a FAT filesystem and populate it with directory content') + parser.add_argument('input_directory', + help='Path to the directory that will be encoded into fatfs image') + parser.add_argument('--output_file', + default='fatfs_image.img', + help='Filename of the generated fatfs image') + parser.add_argument('--partition_size', + default=1024 * 1024, + help='Size of the partition in bytes') + args = parser.parse_args() + + input_dir = args.input_directory + try: + partition_size = eval(args.partition_size) + except ValueError: + partition_size = args.partition_size + fatfs = FATFS(size=partition_size) + fatfs.generate(input_dir) + fatfs.write_filesystem(args.output_file) diff --git a/components/fatfs/fatfsgen_utils/__init__.py b/components/fatfs/fatfsgen_utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/components/fatfs/fatfsgen_utils/cluster.py b/components/fatfs/fatfsgen_utils/cluster.py new file mode 100644 index 0000000000..69f182e030 --- /dev/null +++ b/components/fatfs/fatfsgen_utils/cluster.py @@ -0,0 +1,112 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from typing import Optional + +from .fatfs_state import FATFSState +from .utils import build_byte, clean_first_half_byte, clean_second_half_byte, split_by_half_byte_12_bit_little_endian + + +class Cluster: + """ + class Cluster handles values in FAT table and allocates sectors in data region. + """ + RESERVED_BLOCK_ID = 0 + ROOT_BLOCK_ID = 1 + ALLOCATED_BLOCK_VALUE = 0xFFF # for fat 12 + + def __init__(self, + cluster_id: int, + fatfs_state: FATFSState, + is_empty: bool = True) -> None: + + self.id = cluster_id + self.fatfs_state = fatfs_state + + self._next_cluster = None # type: Optional[Cluster] + if self.id == Cluster.RESERVED_BLOCK_ID: + self.is_empty = False + self.set_in_fat(0xff8) + return + + self.cluster_data_address = self._compute_cluster_data_address() + self.is_empty = is_empty + + assert self.cluster_data_address or self.is_empty + + @property + def next_cluster(self): # type: () -> Optional[Cluster] + return self._next_cluster + + @next_cluster.setter + def next_cluster(self, value): # type: (Optional[Cluster]) -> None + self._next_cluster = value + + def _cluster_id_to_logical_position_in_bits(self, _id: int) -> int: + # computes address of the cluster in fat table + return self.fatfs_state.fatfs_type * _id # type: ignore + + def _compute_cluster_data_address(self) -> int: + if self.id == Cluster.ROOT_BLOCK_ID: + return self.fatfs_state.root_directory_start # type: ignore + # the first data cluster id is 2 (we have to subtract reserved cluster and cluster for root) + return self.fatfs_state.sector_size * (self.id - 2) + self.fatfs_state.data_region_start # type: ignore + + def _set_first_half_byte(self, address: int, value: int) -> None: + clean_second_half_byte(self.fatfs_state.binary_image, address) + self.fatfs_state.binary_image[address] |= value << 4 + + def _set_second_half_byte(self, address: int, value: int) -> None: + clean_first_half_byte(self.fatfs_state.binary_image, address) + self.fatfs_state.binary_image[address] |= value + + @property + def fat_cluster_address(self) -> int: + """Determines how many bits precede the first bit of the cluster in FAT""" + return self._cluster_id_to_logical_position_in_bits(self.id) + + @property + def real_cluster_address(self) -> int: + return self.fatfs_state.start_address + self.fat_cluster_address // 8 # type: ignore + + def set_in_fat(self, value: int) -> None: + """ + Sets cluster in FAT to certain value. + Firstly, we split the target value into 3 half bytes (max value is 0xfff). + Then we could encounter two situations: + 1. if the cluster index (indexed from zero) is even, we set the full byte computed by + self.cluster_id_to_logical_position_in_bits and the second half of the consequent byte. + Order of half bytes is 2, 1, 3. + + 2. if the cluster index is odd, we set the first half of the computed byte and the full consequent byte. + Order of half bytes is 1, 3, 2. + """ + + # value must fit into number of bits of the fat (12, 16 or 32) + assert value <= (1 << self.fatfs_state.fatfs_type) - 1 + half_bytes = split_by_half_byte_12_bit_little_endian(value) + + # hardcoded for fat 12 + # IDF-4046 will extend it for fat 16 + if self.fat_cluster_address % 8 == 0: + self.fatfs_state.binary_image[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0]) + self._set_second_half_byte(self.real_cluster_address + 1, half_bytes[2]) + elif self.fat_cluster_address % 8 != 0: + self._set_first_half_byte(self.real_cluster_address, half_bytes[0]) + self.fatfs_state.binary_image[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1]) + + @property + def is_root(self) -> bool: + return self.id == Cluster.ROOT_BLOCK_ID + + def allocate_cluster(self) -> None: + """ + This method sets bits in FAT table to `allocated` and clean the corresponding sector(s) + """ + self.is_empty = False + self.set_in_fat(Cluster.ALLOCATED_BLOCK_VALUE) + + cluster_start = self.cluster_data_address + dir_size = self.fatfs_state.get_dir_size(self.is_root) + cluster_end = cluster_start + dir_size + self.fatfs_state.binary_image[cluster_start:cluster_end] = dir_size * b'\x00' diff --git a/components/fatfs/fatfsgen_utils/entry.py b/components/fatfs/fatfsgen_utils/entry.py new file mode 100644 index 0000000000..7436030ba2 --- /dev/null +++ b/components/fatfs/fatfsgen_utils/entry.py @@ -0,0 +1,127 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, Optional + +from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct + +from .exceptions import LowerCaseException, TooLongNameException +from .fatfs_state import FATFSState +from .utils import is_valid_fatfs_name, pad_string + + +class Entry: + """ + The class Entry represents entry of the directory. + """ + ATTR_READ_ONLY = 0x01 + ATTR_HIDDEN = 0x02 + ATTR_SYSTEM = 0x04 + ATTR_VOLUME_ID = 0x08 + ATTR_DIRECTORY = 0x10 + ATTR_ARCHIVE = 0x20 + MAX_NAME_SIZE_S = 8 + MAX_EXT_SIZE_S = 3 + + ENTRY_FORMAT_SHORT_NAME = Struct( + 'DIR_Name' / PaddedString(MAX_NAME_SIZE_S, 'utf-8'), + 'DIR_Name_ext' / PaddedString(MAX_EXT_SIZE_S, 'utf-8'), + 'DIR_Attr' / Int8ul, + 'DIR_NTRes' / Const(b'\x00'), + 'DIR_CrtTimeTenth' / Const(b'\x00'), + 'DIR_CrtTime' / Const(b'\x01\x00'), + 'DIR_CrtDate' / Const(b'\x21\x00'), + 'DIR_LstAccDate' / Const(b'\x00\x00'), + 'DIR_FstClusHI' / Const(b'\x00\x00'), + 'DIR_WrtTime' / Const(b'\x01\x00'), + 'DIR_WrtDate' / Const(b'\x01\x00'), + 'DIR_FstClusLO' / Int16ul, + 'DIR_FileSize' / Int32ul, + ) + + # IDF-4044 + ENTRY_FORMAT_LONG_NAME = Struct() + + def __init__(self, + entry_id: int, + parent_dir_entries_address: int, + fatfs_state: FATFSState) -> None: + self.fatfs_state = fatfs_state + self.id = entry_id + self.entry_address = parent_dir_entries_address + self.id * self.fatfs_state.entry_size + self._is_alias = False + self._is_empty = True + + @property + def is_empty(self) -> bool: + return self._is_empty + + def _parse_entry(self, entry_bytearray: Optional[bytearray]) -> dict: + if self.fatfs_state.long_names_enabled: + return Entry.ENTRY_FORMAT_LONG_NAME.parse(entry_bytearray) # type: ignore + return Entry.ENTRY_FORMAT_SHORT_NAME.parse(entry_bytearray) # type: ignore + + def _build_entry(self, **kwargs) -> Any: # type: ignore + if self.fatfs_state.long_names_enabled: + return Entry.ENTRY_FORMAT_LONG_NAME.build(dict(**kwargs)) + return Entry.ENTRY_FORMAT_SHORT_NAME.build(dict(**kwargs)) + + @property + def entry_bytes(self) -> Any: + return self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size] + + @entry_bytes.setter + def entry_bytes(self, value: int) -> None: + self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size] = value + + def _clean_entry(self) -> None: + self.entry_bytes = self.fatfs_state.entry_size * b'\x00' + + def allocate_entry(self, + first_cluster_id: int, + entity_name: str, + entity_type: int, + entity_extension: str = '', + size: int = 0) -> None: + """ + :param first_cluster_id: id of the first data cluster for given entry + :param entity_name: name recorded in the entry + :param entity_extension: extension recorded in the entry + :param size: size of the content of the file + :param entity_type: type of the entity (file [0x20] or directory [0x10]) + :returns: None + + :raises LowerCaseException: In case when long_names_enabled is set to False and filename exceeds 8 chars + for name or 3 chars for extension the exception is raised + """ + if not ((is_valid_fatfs_name(entity_name) and + is_valid_fatfs_name(entity_extension)) or + self.fatfs_state.long_names_enabled): + raise LowerCaseException('Lower case is not supported because long name support is not enabled!') + + # clean entry before allocation + self._clean_entry() + self._is_empty = False + object_name = entity_name.upper() + object_extension = entity_extension.upper() + + # implementation of long names support will be part of IDF-4044 + exceeds_short_name = len(object_name) > Entry.MAX_NAME_SIZE_S or len(object_extension) > Entry.MAX_EXT_SIZE_S + if not self.fatfs_state.long_names_enabled and exceeds_short_name: + raise TooLongNameException( + 'Maximal length of the object name is 8 characters and 3 characters for extension!') + + start_address = self.entry_address + end_address = start_address + self.fatfs_state.entry_size + self.fatfs_state.binary_image[start_address: end_address] = self._build_entry( + DIR_Name=pad_string(object_name, size=Entry.MAX_NAME_SIZE_S), + DIR_Name_ext=pad_string(object_extension, size=Entry.MAX_EXT_SIZE_S), + DIR_Attr=entity_type, + DIR_FstClusLO=first_cluster_id, + DIR_FileSize=size + ) + + def update_content_size(self, content_size: int) -> None: + parsed_entry = self._parse_entry(self.entry_bytes) + parsed_entry.DIR_FileSize = content_size # type: ignore + self.entry_bytes = Entry.ENTRY_FORMAT_SHORT_NAME.build(parsed_entry) diff --git a/components/fatfs/fatfsgen_utils/exceptions.py b/components/fatfs/fatfsgen_utils/exceptions.py new file mode 100644 index 0000000000..c14adf675e --- /dev/null +++ b/components/fatfs/fatfsgen_utils/exceptions.py @@ -0,0 +1,33 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +class WriteDirectoryException(Exception): + """ + Exception is raised when the user tries to write the content into the directory instead of file + """ + pass + + +class NoFreeClusterException(Exception): + """ + Exception is raised when the user tries allocate cluster but no free one is available + """ + pass + + +class LowerCaseException(Exception): + """ + Exception is raised when the user tries to write file or directory with lower case + """ + pass + + +class TooLongNameException(Exception): + """ + Exception is raised when long name support is not enabled and user tries to write file longer then allowed + """ + pass + + +class FatalError(Exception): + pass diff --git a/components/fatfs/fatfsgen_utils/fat.py b/components/fatfs/fatfsgen_utils/fat.py new file mode 100644 index 0000000000..bcd2f37642 --- /dev/null +++ b/components/fatfs/fatfsgen_utils/fat.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from .cluster import Cluster +from .exceptions import NoFreeClusterException +from .fatfs_state import FATFSState + + +class FAT: + """ + The FAT represents the FAT region in file system. It is responsible for storing clusters + and chaining them in case we need to extend file or directory to more clusters. + """ + def __init__(self, + fatfs_state: FATFSState, + reserved_sectors_cnt: int) -> None: + self.fatfs_state = fatfs_state + self.reserved_sectors_cnt = reserved_sectors_cnt + + self.clusters = [Cluster(cluster_id=i, fatfs_state=self.fatfs_state) for i in + range(1, self.fatfs_state.max_clusters)] + + # update root directory record + self.clusters[0].allocate_cluster() + # add first reserved cluster + self.clusters = [Cluster(cluster_id=Cluster.RESERVED_BLOCK_ID, fatfs_state=self.fatfs_state)] + self.clusters + + def find_free_cluster(self) -> Cluster: + # finds first empty cluster and allocates it + for cluster in self.clusters: + if cluster.is_empty: + cluster.allocate_cluster() + return cluster + raise NoFreeClusterException('No free cluster available!') + + def allocate_chain(self, first_cluster: Cluster, size: int) -> None: + current = first_cluster + for _ in range(size - 1): + free_cluster = self.find_free_cluster() + current.next_cluster = free_cluster + current.set_in_fat(free_cluster.id) + current = free_cluster diff --git a/components/fatfs/fatfsgen_utils/fatfs_state.py b/components/fatfs/fatfsgen_utils/fatfs_state.py new file mode 100644 index 0000000000..fe07320e13 --- /dev/null +++ b/components/fatfs/fatfsgen_utils/fatfs_state.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + + +class FATFSState: + """ + The class represents the state and the configuration of the FATFS. + """ + FAT12_MAX_CLUSTERS = 4085 + FAT16_MAX_CLUSTERS = 65525 + FAT12 = 12 + FAT16 = 16 + FAT32 = 32 + + def __init__(self, + entry_size: int, + sector_size: int, + reserved_sectors_cnt: int, + root_dir_sectors_cnt: int, + size: int, + media_type: int, + sectors_per_fat: int, + sectors_per_cluster: int, + volume_label: str, + oem_name: str, + fat_tables_cnt: int, + sec_per_track: int, + num_heads: int, + hidden_sectors: int, + file_sys_type: str, + wl_sectors: int, + long_names_enabled: bool = False): + self._binary_image: bytearray = bytearray(b'') + self.fat_tables_cnt: int = fat_tables_cnt + self.oem_name: str = oem_name + self.wl_sectors_cnt: int = wl_sectors + self.file_sys_type: str = file_sys_type + self.sec_per_track: int = sec_per_track + self.hidden_sectors: int = hidden_sectors + self.volume_label: str = volume_label + self.media_type: int = media_type + self.long_names_enabled: bool = long_names_enabled + self.entry_size: int = entry_size + self.num_heads: int = num_heads + self.sector_size: int = sector_size + self.root_dir_sectors_cnt: int = root_dir_sectors_cnt + self.reserved_sectors_cnt: int = reserved_sectors_cnt + self.size: int = size + self.sectors_per_fat_cnt: int = sectors_per_fat + self.sectors_per_cluster: int = sectors_per_cluster + + @property + def binary_image(self) -> bytearray: + return self._binary_image + + @binary_image.setter + def binary_image(self, value: bytearray) -> None: + self._binary_image = value + + def get_dir_size(self, is_root: bool) -> int: + return self.root_dir_sectors_cnt * self.sector_size if is_root else self.sector_size + + @property + def start_address(self) -> int: + return self.sector_size * self.reserved_sectors_cnt + + @property + def data_sectors(self) -> int: + return (self.size // self.sector_size) - self.non_data_sectors + + @property + def non_data_sectors(self) -> int: + return self.reserved_sectors_cnt + self.sectors_per_fat_cnt + self.root_dir_sectors_cnt + self.wl_sectors_cnt + + @property + def data_region_start(self) -> int: + return self.non_data_sectors * self.sector_size + + @property + def max_clusters(self) -> int: + return self.data_sectors // self.sectors_per_cluster + + @property + def root_directory_start(self) -> int: + return (self.reserved_sectors_cnt + self.sectors_per_fat_cnt) * self.sector_size + + @property + def fatfs_type(self) -> int: + if self.max_clusters < FATFSState.FAT12_MAX_CLUSTERS: + return FATFSState.FAT12 + elif self.max_clusters < FATFSState.FAT16_MAX_CLUSTERS: + return FATFSState.FAT16 + # fat is FAT.FAT32, not supported now + raise NotImplementedError('FAT32 is currently not supported.') + + @property + def entries_root_count(self) -> int: + return (self.root_dir_sectors_cnt * self.sector_size) // self.entry_size diff --git a/components/fatfs/fatfsgen_utils/fs_object.py b/components/fatfs/fatfsgen_utils/fs_object.py new file mode 100644 index 0000000000..e5137ca9a2 --- /dev/null +++ b/components/fatfs/fatfsgen_utils/fs_object.py @@ -0,0 +1,225 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +from typing import List, Optional, Tuple + +from .entry import Entry +from .exceptions import FatalError, WriteDirectoryException +from .fat import FAT, Cluster +from .fatfs_state import FATFSState +from .utils import required_clusters_count, split_content_into_sectors, split_to_name_and_extension + + +class File: + """ + The class File provides API to write into the files. It represents file in the FS. + """ + ATTR_ARCHIVE = 0x20 + ENTITY_TYPE = ATTR_ARCHIVE + + def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None: + self.name = name + self.extension = extension + self.fatfs_state = fatfs_state + self.fat = fat + self.size = 0 + self._first_cluster = None + self._entry = entry + + @property + def entry(self) -> Entry: + return self._entry + + @property + def first_cluster(self) -> Optional[Cluster]: + return self._first_cluster + + @first_cluster.setter + def first_cluster(self, value: Cluster) -> None: + self._first_cluster = value + + def name_equals(self, name: str, extension: str) -> bool: + return self.name == name and self.extension == extension + + def write(self, content: str) -> None: + self.entry.update_content_size(len(content)) + # we assume that the correct amount of clusters is allocated + current_cluster = self._first_cluster + for content_part in split_content_into_sectors(content, self.fatfs_state.sector_size): + content_as_list = content_part.encode() + if current_cluster is None: + raise FatalError('No free space left!') + + address = current_cluster.cluster_data_address + self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list + current_cluster = current_cluster.next_cluster + + +class Directory: + """ + The Directory class provides API to add files and directories into the directory + and to find the file according to path and write it. + """ + ATTR_DIRECTORY = 0x10 + ATTR_ARCHIVE = 0x20 + ENTITY_TYPE = ATTR_DIRECTORY + + def __init__(self, + name, + fat, + fatfs_state, + entry=None, + cluster=None, + size=None, + extension='', + parent=None): + # type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None + self.name = name + self.fatfs_state = fatfs_state + self.extension = extension + + self.fat = fat + self.size = size or self.fatfs_state.sector_size + + # if directory is root its parent is itself + self.parent: Directory = parent or self + self._first_cluster = cluster + + # entries will be initialized after the cluster allocation + self.entries: List[Entry] = [] + self.entities = [] # type: ignore + self._entry = entry # currently not in use (will use later for e.g. modification time, etc.) + + @property + def is_root(self) -> bool: + return self.parent is self + + @property + def first_cluster(self) -> Cluster: + return self._first_cluster + + @first_cluster.setter + def first_cluster(self, value: Cluster) -> None: + self._first_cluster = value + + def name_equals(self, name: str, extension: str) -> bool: + return self.name == name and self.extension == extension + + def create_entries(self, cluster: Cluster) -> list: + return [Entry(entry_id=i, + parent_dir_entries_address=cluster.cluster_data_address, + fatfs_state=self.fatfs_state) + for i in range(self.size // self.fatfs_state.entry_size)] + + def init_directory(self) -> None: + self.entries = self.create_entries(self._first_cluster) + if not self.is_root: + # the root directory doesn't contain link to itself nor the parent + free_entry1 = self.find_free_entry() or self.chain_directory() + free_entry1.allocate_entry(first_cluster_id=self.first_cluster.id, + entity_name='.', + entity_extension='', + entity_type=self.ENTITY_TYPE) + self.first_cluster = self._first_cluster + free_entry2 = self.find_free_entry() or self.chain_directory() + free_entry2.allocate_entry(first_cluster_id=self.parent.first_cluster.id, + entity_name='..', + entity_extension='', + entity_type=self.parent.ENTITY_TYPE) + self.parent.first_cluster = self.parent.first_cluster + + def lookup_entity(self, object_name: str, extension: str): # type: ignore + for entity in self.entities: + if entity.name == object_name and entity.extension == extension: + return entity + return None + + def recursive_search(self, path_as_list, current_dir): # type: ignore + name, extension = split_to_name_and_extension(path_as_list[0]) + next_obj = current_dir.lookup_entity(name, extension) + if next_obj is None: + raise FileNotFoundError('No such file or directory!') + if len(path_as_list) == 1 and next_obj.name_equals(name, extension): + return next_obj + return self.recursive_search(path_as_list[1:], next_obj) + + def find_free_entry(self) -> Optional[Entry]: + for entry in self.entries: + if entry.is_empty: + return entry + return None + + def _extend_directory(self) -> None: + current = self.first_cluster + while current.next_cluster is not None: + current = current.next_cluster + new_cluster = self.fat.find_free_cluster() + current.set_in_fat(new_cluster.id) + current.next_cluster = new_cluster + self.entries += self.create_entries(new_cluster) + + def chain_directory(self) -> Entry: + self._extend_directory() + free_entry = self.find_free_entry() + if free_entry is None: + raise FatalError('No more space left!') + return free_entry + + def allocate_object(self, + name, + entity_type, + path_from_root=None, + extension=''): + # type: (str, int, Optional[List[str]], str) -> Tuple[Cluster, Entry, Directory] + """ + Method finds the target directory in the path + and allocates cluster (both the record in FAT and cluster in the data region) + and entry in the specified directory + """ + free_cluster = self.fat.find_free_cluster() + target_dir = self if not path_from_root else self.recursive_search(path_from_root, self) + free_entry = target_dir.find_free_entry() or target_dir.chain_directory() + free_entry.allocate_entry(first_cluster_id=free_cluster.id, + entity_name=name, + entity_extension=extension, + entity_type=entity_type) + return free_cluster, free_entry, target_dir + + def new_file(self, name: str, extension: str, path_from_root: Optional[List[str]]) -> None: + free_cluster, free_entry, target_dir = self.allocate_object(name=name, + extension=extension, + entity_type=Directory.ATTR_ARCHIVE, + path_from_root=path_from_root) + + file = File(name, fat=self.fat, extension=extension, fatfs_state=self.fatfs_state, entry=free_entry) + file.first_cluster = free_cluster + target_dir.entities.append(file) + + def new_directory(self, name, parent, path_from_root): + # type: (str, Directory, Optional[List[str]]) -> None + free_cluster, free_entry, target_dir = self.allocate_object(name=name, + entity_type=Directory.ATTR_DIRECTORY, + path_from_root=path_from_root) + + directory = Directory(name=name, fat=self.fat, parent=parent, fatfs_state=self.fatfs_state, entry=free_entry) + directory.first_cluster = free_cluster + directory.init_directory() + target_dir.entities.append(directory) + + def write_to_file(self, path: List[str], content: str) -> None: + """ + Writes to file existing in the directory structure. + + :param path: path split into the list + :param content: content as a string to be written into a file + :returns: None + :raises WriteDirectoryException: raised is the target object for writing is a directory + """ + entity_to_write = self.recursive_search(path, self) + if isinstance(entity_to_write, File): + clusters_cnt = required_clusters_count(cluster_size=self.fatfs_state.sector_size, content=content) + self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt) + entity_to_write.write(content) + else: + raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!') diff --git a/components/fatfs/fatfsgen_utils/utils.py b/components/fatfs/fatfsgen_utils/utils.py new file mode 100644 index 0000000000..b2e9416bec --- /dev/null +++ b/components/fatfs/fatfsgen_utils/utils.py @@ -0,0 +1,60 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +import typing + +from construct import Int16ul + + +def required_clusters_count(cluster_size: int, content: str) -> int: + # compute number of required clusters for file text + return (len(content) + cluster_size - 1) // cluster_size + + +def pad_string(content: str, size: typing.Optional[int] = None, pad: int = 0x20) -> str: + # cut string if longer and fill with pad character if shorter than size + return content.ljust(size or len(content), chr(pad))[:size] + + +def split_to_name_and_extension(full_name: str) -> typing.Tuple[str, str]: + name, extension = os.path.splitext(full_name) + return name, extension.replace('.', '') + + +def is_valid_fatfs_name(string: str) -> bool: + return string == string.upper() + + +def split_by_half_byte_12_bit_little_endian(value: int) -> typing.Tuple[int, int, int]: + value_as_bytes = Int16ul.build(value) + return value_as_bytes[0] & 0x0f, value_as_bytes[0] >> 4, value_as_bytes[1] & 0x0f + + +def build_byte(first_half: int, second_half: int) -> int: + return (first_half << 4) | second_half + + +def clean_first_half_byte(bytes_array: bytearray, address: int) -> None: + """ + the function sets to zero first four bits of the byte. + E.g. 10111100 -> 10110000 + """ + bytes_array[address] &= 0xf0 + + +def clean_second_half_byte(bytes_array: bytearray, address: int) -> None: + """ + the function sets to zero last four bits of the byte. + E.g. 10111100 -> 00001100 + """ + bytes_array[address] &= 0x0f + + +def split_content_into_sectors(content: str, sector_size: int) -> typing.List[str]: + result = [] + clusters_cnt = required_clusters_count(cluster_size=sector_size, content=content) + + for i in range(clusters_cnt): + result.append(content[sector_size * i:(i + 1) * sector_size]) + return result diff --git a/components/fatfs/project_include.cmake b/components/fatfs/project_include.cmake new file mode 100644 index 0000000000..d7dab88c05 --- /dev/null +++ b/components/fatfs/project_include.cmake @@ -0,0 +1,51 @@ +# fatfs_create_partition_image +# +# Create a fatfs image of the specified directory on the host during build and optionally +# have the created image flashed using `idf.py flash` +function(fatfs_create_partition_image partition base_dir) + set(options FLASH_IN_PROJECT) + cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}") + + idf_build_get_property(idf_path IDF_PATH) + idf_build_get_property(python PYTHON) + + set(fatfsgen_py ${python} ${idf_path}/components/fatfs/fatfsgen.py) + + get_filename_component(base_dir_full_path ${base_dir} ABSOLUTE) + + partition_table_get_partition_info(size "--partition-name ${partition}" "size") + partition_table_get_partition_info(offset "--partition-name ${partition}" "offset") + + if("${size}" AND "${offset}") + set(image_file ${CMAKE_BINARY_DIR}/${partition}.bin) + # Execute FATFS image generation; this always executes as there is no way to specify for CMake to watch for + # contents of the base dir changing. + add_custom_target(fatfs_${partition}_bin ALL + COMMAND ${fatfsgen_py} ${base_dir_full_path} + --partition_size ${size} + --output_file ${image_file} + ) + + set_property(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}" APPEND PROPERTY + ADDITIONAL_MAKE_CLEAN_FILES + ${image_file}) + + idf_component_get_property(main_args esptool_py FLASH_ARGS) + idf_component_get_property(sub_args esptool_py FLASH_SUB_ARGS) + # Last (optional) parameter is the encryption for the target. In our + # case, fatfs is not encrypt so pass FALSE to the function. + esptool_py_flash_target(${partition}-flash "${main_args}" "${sub_args}" ALWAYS_PLAINTEXT) + esptool_py_flash_to_partition(${partition}-flash "${partition}" "${image_file}") + + add_dependencies(${partition}-flash fatfs_${partition}_bin) + + if(arg_FLASH_IN_PROJECT) + esptool_py_flash_to_partition(flash "${partition}" "${image_file}") + add_dependencies(flash fatfs_${partition}_bin) + endif() + else() + set(message "Failed to create FATFS image for partition '${partition}'. " + "Check project configuration if using the correct partition table file.") + fail_at_build_time(fatfs_${partition}_bin "${message}") + endif() +endfunction() diff --git a/components/fatfs/test_fatfsgen/test_fatfsgen.py b/components/fatfs/test_fatfsgen/test_fatfsgen.py new file mode 100755 index 0000000000..b20ad23b17 --- /dev/null +++ b/components/fatfs/test_fatfsgen/test_fatfsgen.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +import shutil +import sys +import unittest +from typing import Any, Dict, Union + +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) +import fatfsgen # noqa E402 +from fatfsgen_utils.exceptions import WriteDirectoryException # noqa E402 +from fatfsgen_utils.exceptions import LowerCaseException, NoFreeClusterException, TooLongNameException # noqa E402 + + +class FatFSGen(unittest.TestCase): + CFG = dict( + sector_size=4096, + entry_size=32, + fat_start=0x1000, + data_start=0x7000, + root_start=0x2000, + output_file=os.path.join('output_data', 'tmp_file.img'), + test_dir=os.path.join('output_data', 'test'), + test_dir2=os.path.join('output_data', 'tst_str'), + ) # type: Union[Dict[str, Any]] + + def setUp(self) -> None: + os.makedirs('output_data') + self.generate_test_dir_1() + self.generate_test_dir_2() + + def tearDown(self) -> None: + shutil.rmtree('output_data') + + @staticmethod + def generate_test_dir_1() -> None: + os.makedirs(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test')) + with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test', 'lastfile'), 'w') as file: + file.write('deeptest\n') + with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'testfil2'), 'w') as file: + file.write('thisistest\n') + with open(os.path.join(FatFSGen.CFG['test_dir'], 'testfile'), 'w') as file: + file.write('ahoj\n') + + @staticmethod + def generate_test_dir_2() -> None: + os.makedirs(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test')) + with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test', 'lastfile.txt'), 'w') as file: + file.write('deeptest\n') + with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'testfil2'), 'w') as file: + file.write('thisistest\n') + with open(os.path.join(FatFSGen.CFG['test_dir2'], 'testfile'), 'w') as file: + file.write('ahoj\n') + + def test_empty_file_sn_fat12(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_file('TESTFILE') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual(file_system[0x2000:0x200c], b'TESTFILE \x20') # check entry name and type + self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat + + def test_directory_sn_fat12(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10') # check entry name and type + self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat + self.assertEqual(file_system[0x6000:0x600c], b'. \x10') # reference to itself + self.assertEqual(file_system[0x6020:0x602c], b'.. \x10') # reference to parent + + def test_empty_file_with_extension_sn_fat12(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_file('TESTF', extension='TXT') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + + self.assertEqual(file_system[0x2000:0x200c], b'TESTF TXT\x20') # check entry name and type + self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat + + def test_write_to_file_with_extension_sn_fat12(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_file('WRITEF', extension='TXT') + fatfs.write_content(path_from_root=['WRITEF.TXT'], content='testcontent') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + + self.assertEqual(file_system[0x2000:0x200c], b'WRITEF TXT\x20') # check entry name and type + self.assertEqual(file_system[0x201a:0x2020], b'\x02\x00\x0b\x00\x00\x00') # check size and cluster ref + self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat + self.assertEqual(file_system[0x6000:0x600f], b'testcontent\x00\x00\x00\x00') # check file content + + def test_write_to_file_in_folder_sn_fat12(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD']) + fatfs.write_content(path_from_root=['TESTFOLD', 'WRITEF.TXT'], content='testcontent') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + + self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10') + self.assertEqual( + file_system[0x1000:0x1010], + b'\xf8\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0x6040:0x6050], b'WRITEF TXT\x20\x00\x00\x01\x00') + self.assertEqual(file_system[0x605a:0x6060], b'\x03\x00\x0b\x00\x00\x00') + self.assertEqual(file_system[0x7000:0x700b], b'testcontent') # check file content + + def test_cluster_setting_values(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_file('TESTFIL1') + fatfs.create_file('TESTFIL2') + fatfs.create_file('TESTFIL3') + fatfs.create_file('TESTFIL4') + fatfs.create_file('TESTFIL5') + fatfs.fat.clusters[2].set_in_fat(1000) + fatfs.fat.clusters[3].set_in_fat(4) + fatfs.fat.clusters[4].set_in_fat(5) + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual( + file_system[0x1000:0x1010], + b'\xf8\xff\xff\xe8\x43\x00\x05\xf0\xff\xff\x0f\x00\x00\x00\x00\x00') + + def test_full_sector_file(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_file('WRITEF', extension='TXT') + fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\xff\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0x6000: 0x7000], FatFSGen.CFG['sector_size'] * b'a') + + def test_file_chaining(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_file('WRITEF', extension='TXT') + fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a' + 'a') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\x03\xf0\xff\x00\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0x7000: 0x8000], b'a' + (FatFSGen.CFG['sector_size'] - 1) * b'\x00') + + def test_full_sector_folder(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + + for i in range(FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']): + fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD']) + fatfs.write_content(path_from_root=['TESTFOLD', 'A0'], content='first') + fatfs.write_content(path_from_root=['TESTFOLD', 'A126'], content='later') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual(file_system[0x1000: 0x10d0], + b'\xf8\xff\xff\x82\xf0\xff' + 192 * b'\xff' + 10 * b'\x00') + self.assertEqual(file_system[0x85000:0x85005], b'later') + self.assertEqual(file_system[0x86000:0x86010], b'A126 \x00\x00\x01\x00') + self.assertEqual(file_system[0x86020:0x86030], b'A127 \x00\x00\x01\x00') + + def test_write_to_folder_in_folder_sn_fat12(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + fatfs.create_directory('TESTFOLL', path_from_root=['TESTFOLD']) + self.assertRaises(WriteDirectoryException, fatfs.write_content, path_from_root=['TESTFOLD', 'TESTFOLL'], + content='testcontent') + + def test_write_non_existing_file_in_folder_sn_fat12(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + self.assertRaises(FileNotFoundError, fatfs.write_content, path_from_root=['TESTFOLD', 'AHOJ'], + content='testcontent') + + @staticmethod + def create_too_many_files() -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']): + fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD']) + + def test_too_many_files(self) -> None: + self.assertRaises(NoFreeClusterException, self.create_too_many_files) + + def test_full_two_sectors_folder(self) -> None: + fatfs = fatfsgen.FATFS(size=2 * 1024 * 1024) + fatfs.create_directory('TESTFOLD') + + for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']): + fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD']) + fatfs.write_content(path_from_root=['TESTFOLD', 'A253'], content='later') + fatfs.write_content(path_from_root=['TESTFOLD', 'A255'], content='last') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual(file_system[0x105000:0x105010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0x108000:0x108010], b'last\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + + def test_lower_case_dir_short_names(self) -> None: + fatfs = fatfsgen.FATFS() + self.assertRaises(LowerCaseException, fatfs.create_directory, 'testfold') + + def test_lower_case_file_short_names(self) -> None: + fatfs = fatfsgen.FATFS() + self.assertRaises(LowerCaseException, fatfs.create_file, 'newfile') + + def test_too_long_name_dir_short_names(self) -> None: + fatfs = fatfsgen.FATFS() + self.assertRaises(TooLongNameException, fatfs.create_directory, 'TOOLONGNAME') + + def test_fatfs16_detection(self) -> None: + fatfs = fatfsgen.FATFS(size=16 * 1024 * 1024) + self.assertEqual(fatfs.state.fatfs_type, 16) + + def test_fatfs32_detection(self) -> None: + self.assertRaises(NotImplementedError, fatfsgen.FATFS, size=256 * 1024 * 1024) + + def test_deep_structure(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + fatfs.create_directory('TESTFOLL', path_from_root=['TESTFOLD']) + fatfs.create_directory('TESTFOLO', path_from_root=['TESTFOLD', 'TESTFOLL']) + fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO']) + fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO', 'WRITEF.TXT'], content='later') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + + self.assertEqual(file_system[0x9000:0x9010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + + def test_same_name_deep_structure(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.create_directory('TESTFOLD') + fatfs.create_directory('TESTFOLD', path_from_root=['TESTFOLD']) + fatfs.create_directory('TESTFOLD', path_from_root=['TESTFOLD', 'TESTFOLD']) + fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD']) + fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD', 'WRITEF.TXT'], content='later') + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + + self.assertEqual(file_system[0x2000:0x2010], b'TESTFOLD \x10\x00\x00\x01\x00') + self.assertEqual(file_system[0x2010:0x2020], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x02\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0x6040:0x6050], b'TESTFOLD \x10\x00\x00\x01\x00') + self.assertEqual(file_system[0x6040:0x6050], b'TESTFOLD \x10\x00\x00\x01\x00') + + self.assertEqual(file_system[0x7040:0x7050], b'TESTFOLD \x10\x00\x00\x01\x00') + self.assertEqual(file_system[0x8040:0x8050], b'WRITEF TXT \x00\x00\x01\x00') + self.assertEqual(file_system[0x9000:0x9010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + + def test_e2e_deep_folder_into_image(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.generate(FatFSGen.CFG['test_dir']) + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + with open(FatFSGen.CFG['output_file'], 'rb') as fs_file: + file_system = fs_file.read() + self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00') + self.assertEqual(file_system[0x6070:0x6080], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x05\x00\x0b\x00\x00\x00') + self.assertEqual(file_system[0x7040:0x7050], b'LASTFILE \x00\x00\x01\x00') + self.assertEqual(file_system[0x8000:0x8010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0x9000:0x9010], b'thisistest\n\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0xa000:0xa010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + + def test_e2e_deep_folder_into_image_ext(self) -> None: + fatfs = fatfsgen.FATFS() + fatfs.generate(FatFSGen.CFG['test_dir2']) + fatfs.write_filesystem(FatFSGen.CFG['output_file']) + file_system = fatfs.read_filesystem(FatFSGen.CFG['output_file']) + + self.assertEqual(file_system[0x2020:0x2030], b'TESTFILE \x00\x00\x01\x00') + self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00') + self.assertEqual(file_system[0x7000:0x7010], b'. \x10\x00\x00\x01\x00') + self.assertEqual(file_system[0x7040:0x7050], b'LASTFILETXT \x00\x00\x01\x00') + self.assertEqual(file_system[0x8000:0x8010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0x9000:0x9010], b'thisistest\n\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0xa000:0xa010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual(file_system[0xb000:0xb009], b'\xff\xff\xff\xff\xff\xff\xff\xff\xff') + + +if __name__ == '__main__': + unittest.main() diff --git a/docs/en/api-reference/storage/fatfs.rst b/docs/en/api-reference/storage/fatfs.rst index 0964a60141..6b9be69f3c 100644 --- a/docs/en/api-reference/storage/fatfs.rst +++ b/docs/en/api-reference/storage/fatfs.rst @@ -84,3 +84,37 @@ They provide implementation of disk I/O functions for SD/MMC cards and can be re .. doxygenfunction:: ff_diskio_register_wl_partition .. doxygenfunction:: ff_diskio_register_raw_partition + +FATFS partition generator +------------------------- + +We provide partition generator for FATFS (:component_file:`fatfsgen.py`) +which is integrated into the build system and could be easily used in the user project. +The tool is used to create filesystem images on a host and populate it with content of the specified host folder. +Current implementation supports short file names, FAT12 and read-only mode +(because the wear levelling is not implemented yet). The WL, long file names, and FAT16 are subjects of future work. + +Build system integration with FATFS partition generator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +It is possible to invoke FATFS generator directly from the CMake build system by calling ``fatfs_create_partition_image``:: + + fatfs_create_partition_image( [FLASH_IN_PROJECT]) + +``fatfs_create_partition_image`` must be called from project's CMakeLists.txt. + +The arguments of the function are as follows: + +1. partition - the name of the partition, you can define in partition table (e.g. :example_file:`storage/fatfsgen/partitions_example.csv`) + +2. base_dir - the directory that will be encoded to FATFS partition and optionally flashed into the device. Beware that you have to specified suitable size of the partition in the partition table. + +3. flag ``FLASH_IN_PROJECT`` - optionally, user can opt to have the image automatically flashed together with the app binaries, partition tables, etc. on ``idf.py flash -p `` by specifying ``FLASH_IN_PROJECT``. + +For example:: + + fatfs_create_partition_image(my_fatfs_partition my_folder FLASH_IN_PROJECT) + +If FLASH_IN_PROJECT is not specified, the image will still be generated, but you will have to flash it manually using ``esptool.py`` or a custom build system target. + +For an example, see :example:`storage/fatfsgen`. diff --git a/examples/storage/fatfsgen/CMakeLists.txt b/examples/storage/fatfsgen/CMakeLists.txt new file mode 100644 index 0000000000..ca865a2f62 --- /dev/null +++ b/examples/storage/fatfsgen/CMakeLists.txt @@ -0,0 +1,6 @@ +# The following lines of boilerplate have to be in your project's CMakeLists +# in this exact order for cmake to work correctly +cmake_minimum_required(VERSION 3.5) + +include($ENV{IDF_PATH}/tools/cmake/project.cmake) +project(fatfsgen) diff --git a/examples/storage/fatfsgen/README.md b/examples/storage/fatfsgen/README.md new file mode 100644 index 0000000000..9d7fbc359c --- /dev/null +++ b/examples/storage/fatfsgen/README.md @@ -0,0 +1,56 @@ +# FATFS partition generation on build example + +(See the README.md file in the upper level 'examples' directory for more information about examples.) + +This example demonstrates how to use the FATFS partition +generation tool [fatfsgen.py](../../../components/fatfs/fatfsgen.py) to automatically create a FATFS +filesystem image (without wear levelling support) +from the contents of a host folder during build, with an option of +automatically flashing the created image on invocation of `idf.py -p PORT flash`. +Beware that the minimal required size of the flash is 4 MB. +The generated partition does not support wear levelling, +so it can be mounted only in read-only mode. + +The following gives an overview of the example: + +1. There is a directory `fatfs_image` from which the FATFS filesystem image will be created. + +2. The function `fatfs_create_partition_image` is used to specify that a FATFS image +should be created during build for the `storage` partition. For CMake, it is called from [the main component's CMakeLists.txt](./main/CMakeLists.txt). +`FLASH_IN_PROJECT` specifies that the created image +should be flashed on invocation of `idf.py -p PORT flash` together with app, bootloader, partition table, etc. +The image is created on the example's build directory with the output filename `storage.bin`. + +3. Upon invocation of `idf.py -p PORT flash monitor`, application loads and +finds there is already a valid FATFS filesystem in the `storage` partition with files same as those in `fatfs_image` directory. The application is then +able to read those files. + +## How to use example + +### Build and flash + +To run the example, type the following command: + +```CMake +# CMake +idf.py -p PORT flash monitor +``` + +(To exit the serial monitor, type ``Ctrl-]``.) + +See the Getting Started Guide for full steps to configure and use ESP-IDF to build projects. + +## Example output + +Here is the example's console output: + +``` +... +I (322) example: Mounting FAT filesystem +I (332) example: Reading file +I (332) example: Read from file: 'this is test' +I (332) example: Unmounting FAT filesystem +I (342) example: Done +``` + +The logic of the example is contained in a [single source file](./main/fatfsgen_example_main.c), and it should be relatively simple to match points in its execution with the log outputs above. diff --git a/examples/storage/fatfsgen/fatfs_image/hello.txt b/examples/storage/fatfsgen/fatfs_image/hello.txt new file mode 100644 index 0000000000..36eccd159a --- /dev/null +++ b/examples/storage/fatfsgen/fatfs_image/hello.txt @@ -0,0 +1 @@ +this file is test as well diff --git a/examples/storage/fatfsgen/fatfs_image/sub/test.txt b/examples/storage/fatfsgen/fatfs_image/sub/test.txt new file mode 100644 index 0000000000..d0141680ee --- /dev/null +++ b/examples/storage/fatfsgen/fatfs_image/sub/test.txt @@ -0,0 +1 @@ +this is test diff --git a/examples/storage/fatfsgen/fatfsgen_example_test.py b/examples/storage/fatfsgen/fatfsgen_example_test.py new file mode 100644 index 0000000000..81c2f26e91 --- /dev/null +++ b/examples/storage/fatfsgen/fatfsgen_example_test.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: CC0 +import ttfw_idf + + +@ttfw_idf.idf_example_test(env_tag='Example_GENERIC') +def test_examples_fatfsgen(env, _): # type: ignore + + dut = env.get_dut('fatfsgen', 'examples/storage/fatfsgen') + dut.start_app() + dut.expect_all('example: Mounting FAT filesystem', + 'example: Reading file', + 'example: Read from file: \'this is test\'', + 'example: Unmounting FAT filesystem', + 'example: Done', + timeout=20) + + +if __name__ == '__main__': + test_examples_fatfsgen() diff --git a/examples/storage/fatfsgen/main/CMakeLists.txt b/examples/storage/fatfsgen/main/CMakeLists.txt new file mode 100644 index 0000000000..ccf3f0f0ed --- /dev/null +++ b/examples/storage/fatfsgen/main/CMakeLists.txt @@ -0,0 +1,8 @@ +idf_component_register(SRCS "fatfsgen_example_main.c" + INCLUDE_DIRS ".") + +# Create a FATFS image from the contents of the 'fatfs_image' directory +# that fits the partition named 'storage'. FLASH_IN_PROJECT indicates that +# the generated image should be flashed when the entire project is flashed to +# the target with 'idf.py -p PORT flash'. +fatfs_create_partition_image(storage ../fatfs_image FLASH_IN_PROJECT) diff --git a/examples/storage/fatfsgen/main/fatfsgen_example_main.c b/examples/storage/fatfsgen/main/fatfsgen_example_main.c new file mode 100644 index 0000000000..b4daa59dae --- /dev/null +++ b/examples/storage/fatfsgen/main/fatfsgen_example_main.c @@ -0,0 +1,58 @@ +/* + * SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: CC0 + */ + +#include +#include +#include +#include "esp_vfs.h" +#include "esp_vfs_fat.h" +#include "esp_system.h" +#include "sdkconfig.h" + +static const char *TAG = "example"; + + +// Mount path for the partition +const char *base_path = "/spiflash"; + +void app_main(void) +{ + ESP_LOGI(TAG, "Mounting FAT filesystem"); + // To mount device we need name of device partition, define base_path + // and allow format partition in case if it is new one and was not formatted before + const esp_vfs_fat_mount_config_t mount_config = { + .max_files = 4, + .format_if_mount_failed = false, + .allocation_unit_size = CONFIG_WL_SECTOR_SIZE + }; + esp_err_t err = esp_vfs_fat_rawflash_mount(base_path, "storage", &mount_config); + if (err != ESP_OK) { + ESP_LOGE(TAG, "Failed to mount FATFS (%s)", esp_err_to_name(err)); + return; + } + // Open file for reading + ESP_LOGI(TAG, "Reading file"); + FILE *f = fopen("/spiflash/sub/test.txt", "rb"); + if (f == NULL) { + ESP_LOGE(TAG, "Failed to open file for reading"); + return; + } + char line[128]; + fgets(line, sizeof(line), f); + fclose(f); + // strip newline + char *pos = strchr(line, '\n'); + if (pos) { + *pos = '\0'; + } + ESP_LOGI(TAG, "Read from file: '%s'", line); + + // Unmount FATFS + ESP_LOGI(TAG, "Unmounting FAT filesystem"); + ESP_ERROR_CHECK( esp_vfs_fat_rawflash_unmount(base_path, "storage")); + + ESP_LOGI(TAG, "Done"); +} diff --git a/examples/storage/fatfsgen/partitions_example.csv b/examples/storage/fatfsgen/partitions_example.csv new file mode 100644 index 0000000000..1c79321a10 --- /dev/null +++ b/examples/storage/fatfsgen/partitions_example.csv @@ -0,0 +1,6 @@ +# Name, Type, SubType, Offset, Size, Flags +# Note: if you have increased the bootloader size, make sure to update the offsets to avoid overlap +nvs, data, nvs, 0x9000, 0x6000, +phy_init, data, phy, 0xf000, 0x1000, +factory, app, factory, 0x10000, 1M, +storage, data, fat, , 1M, diff --git a/examples/storage/fatfsgen/sdkconfig.defaults b/examples/storage/fatfsgen/sdkconfig.defaults new file mode 100644 index 0000000000..47363c32d5 --- /dev/null +++ b/examples/storage/fatfsgen/sdkconfig.defaults @@ -0,0 +1,4 @@ +CONFIG_PARTITION_TABLE_CUSTOM=y +CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partitions_example.csv" +CONFIG_PARTITION_TABLE_FILENAME="partitions_example.csv" +CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y diff --git a/tools/ci/executable-list.txt b/tools/ci/executable-list.txt index 711fc388aa..15d65cd320 100644 --- a/tools/ci/executable-list.txt +++ b/tools/ci/executable-list.txt @@ -7,6 +7,8 @@ components/espcoredump/espcoredump.py components/espcoredump/test/test_espcoredump.py components/espcoredump/test/test_espcoredump.sh components/espcoredump/test_apps/build_espcoredump.sh +components/fatfs/fatfsgen.py +components/fatfs/test_fatfsgen/test_fatfsgen.py components/heap/test_multi_heap_host/test_all_configs.sh components/mbedtls/esp_crt_bundle/gen_crt_bundle.py components/mbedtls/esp_crt_bundle/test_gen_crt_bundle/test_gen_crt_bundle.py