diff --git a/tools/idf_tools.py b/tools/idf_tools.py index 8d6658c56f..d32ed4a8cd 100755 --- a/tools/idf_tools.py +++ b/tools/idf_tools.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # coding=utf-8 # -# SPDX-FileCopyrightText: 2019-2023 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2019-2024 Espressif Systems (Shanghai) CO LTD # # SPDX-License-Identifier: Apache-2.0 # @@ -27,7 +27,6 @@ # * To start using the tools, run `eval "$(idf_tools.py export)"` — this will update # the PATH to point to the installed tools and set up other environment variables # needed by the tools. - import argparse import contextlib import copy @@ -47,10 +46,11 @@ import sys import tarfile import tempfile import time -from collections import OrderedDict, namedtuple +from collections import namedtuple +from collections import OrderedDict from json import JSONEncoder -from ssl import SSLContext # noqa: F401 -from tarfile import TarFile # noqa: F401 +from ssl import SSLContext +from tarfile import TarFile from zipfile import ZipFile # Important notice: Please keep the lines above compatible with old Pythons so it won't fail with ImportError but with @@ -64,12 +64,11 @@ except RuntimeError as e: print(e) raise SystemExit(1) -from typing import IO, Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union # noqa: F401 +from typing import IO, Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union from urllib.error import ContentTooShortError from urllib.parse import urljoin, urlparse from urllib.request import urlopen -# the following is only for typing annotation -from urllib.response import addinfourl # noqa: F401 +from urllib.response import addinfourl try: from exceptions import WindowsError @@ -112,6 +111,11 @@ PLATFORM_LINUX_ARM64 = 'linux-arm64' class Platforms: + """ + Mappings from various other names these platforms are known as, to the identifiers above. + This includes strings produced from "platform.system() + '-' + platform.machine()", see PYTHON_PLATFORM + definition above. + """ # Mappings from various other names these platforms are known as, to the identifiers above. # This includes strings produced from "platform.system() + '-' + platform.machine()", see PYTHON_PLATFORM # definition above. @@ -163,7 +167,7 @@ class Platforms: } @staticmethod - def detect_linux_arm_platform(supposed_platform): # type: (Optional[str]) -> Optional[str] + def detect_linux_arm_platform(supposed_platform: Optional[str]) -> Optional[str]: """ We probe the python binary to check exactly what environment the script is running in. @@ -174,7 +178,6 @@ class Platforms: ARM platform may run on aarch64 hardware but having armhf installed packages (it happens if a docker container is running on arm64 hardware, but using an armhf image). - """ if supposed_platform not in (PLATFORM_LINUX_ARM32, PLATFORM_LINUX_ARMHF, PLATFORM_LINUX_ARM64): return supposed_platform @@ -199,8 +202,11 @@ class Platforms: return supposed_platform @staticmethod - def get(platform_alias): # type: (Optional[str]) -> Optional[str] - if platform_alias is None: + def get(platform_alias: Optional[str]) -> Optional[str]: + """ + Get a proper platform name based on PLATFORM_FROM_NAME dict. + """ + if not platform_alias: return None if platform_alias == 'any' and CURRENT_PLATFORM: @@ -210,7 +216,10 @@ class Platforms: return platform_name @staticmethod - def get_by_filename(file_name): # type: (str) -> Optional[str] + def get_by_filename(file_name: str) -> Optional[str]: + """ + Guess the right platform based on the file name. + """ found_alias = '' for platform_alias in Platforms.PLATFORM_FROM_NAME: # Find the longest alias which matches with file name to avoid mismatching @@ -219,7 +228,11 @@ class Platforms: return Platforms.get(found_alias) -def parse_platform_arg(platform_str): # type: (str) -> str +def parse_platform_arg(platform_str: str) -> str: + """ + Parses platform from input string and checks whether it is a valid platform. + If not, raises SystemExit exception. + """ platform = Platforms.get(platform_str) if platform is None: fatal(f'unknown platform: {platform}') @@ -289,31 +302,43 @@ DL_CERT_DICT = {'dl.espressif.com': DIGICERT_ROOT_G2_CERT, 'github.com': DIGICERT_ROOT_CA_CERT} -global_quiet = False -global_non_interactive = False -global_idf_path = None # type: Optional[str] -global_idf_tools_path = None # type: Optional[str] -global_tools_json = None # type: Optional[str] +global_quiet: bool = False +global_non_interactive: bool = False +global_idf_path: Optional[str] = None +global_idf_tools_path: Optional[str] = None +global_tools_json: Optional[str] = None -def fatal(text, *args): # type: (str, str) -> None +def fatal(text: str, *args: str) -> None: + """ + Writes ERROR: + text to sys.stderr. + """ if not global_quiet: sys.stderr.write('ERROR: ' + text + '\n', *args) -def warn(text, *args): # type: (str, str) -> None +def warn(text: str, *args: str) -> None: + """ + Writes WARNING: + text to sys.stderr. + """ if not global_quiet: sys.stderr.write('WARNING: ' + text + '\n', *args) -def info(text, f=None, *args): # type: (str, Optional[IO[str]], str) -> None +def info(text: str, f: Optional[IO[str]]=None, *args: str) -> None: + """ + Writes text to a stream specified by second arg, sys.stdout by default. + """ if not global_quiet: if f is None: f = sys.stdout f.write(text + '\n', *args) -def print_hints_on_download_error(err): # type: (str) -> None +def print_hints_on_download_error(err: str) -> None: + """ + Prints hint on download error. Tries to specify the message depending on the error. + """ info('Please make sure you have a working Internet connection.') if 'CERTIFICATE' in err: @@ -332,8 +357,12 @@ def print_hints_on_download_error(err): # type: (str) -> None 'you might be able to work around this issue.') -def run_cmd_check_output(cmd, input_text=None, extra_paths=None): - # type: (List[str], Optional[str], Optional[List[str]]) -> bytes +def run_cmd_check_output(cmd: List[str], input_text: Optional[str]=None, extra_paths: Optional[List[str]]=None) -> bytes: + """ + Runs command and checks output for exceptions. If AttributeError or TypeError occurs, function re-runs the process. + If return code was not 0, subprocess.CalledProcessError is raised, otherwise, the original error is masked. + Returns both stdout and stderr of the run command. + """ # If extra_paths is given, locate the executable in one of these directories. # Note: it would seem logical to add extra_paths to env[PATH], instead, and let OS do the job of finding the # executable for us. However this does not work on Windows: https://bugs.python.org/issue8557. @@ -369,14 +398,17 @@ def run_cmd_check_output(cmd, input_text=None, extra_paths=None): return stdout + stderr -def to_shell_specific_paths(paths_list): # type: (List[str]) -> List[str] +def to_shell_specific_paths(paths_list: List[str]) -> List[str]: + """ + Converts / (linux) to \\ (Windows) if called under win32 platform. + """ if sys.platform == 'win32': paths_list = [p.replace('/', os.path.sep) if os.path.sep in p else p for p in paths_list] return paths_list -def get_env_for_extra_paths(extra_paths): # type: (List[str]) -> Dict[str, str] +def get_env_for_extra_paths(extra_paths: List[str]) -> Dict[str, str]: """ Return a copy of environment variables dict, prepending paths listed in extra_paths to the PATH environment variable. @@ -390,7 +422,10 @@ def get_env_for_extra_paths(extra_paths): # type: (List[str]) -> Dict[str, str] return env_arg -def get_file_size_sha256(filename, block_size=65536): # type: (str, int) -> Tuple[int, str] +def get_file_size_sha256(filename: str, block_size: int=65536) -> Tuple[int, str]: + """ + Gets file size and its sha256. + """ sha256 = hashlib.sha256() size = 0 with open(filename, 'rb') as f: @@ -400,14 +435,21 @@ def get_file_size_sha256(filename, block_size=65536): # type: (str, int) -> Tup return size, sha256.hexdigest() -def report_progress(count, block_size, total_size): # type: (int, int, int) -> None +def report_progress(count: int, block_size: int, total_size: int) -> None: + """ + Prints progress (count * block_size * 100 / total_size) to stdout. + """ percent = int(count * block_size * 100 / total_size) percent = min(100, percent) sys.stdout.write('\r%d%%' % percent) sys.stdout.flush() -def mkdir_p(path): # type: (str) -> None +def mkdir_p(path: str) -> None: + """ + Makes directory in given path. + Supresses error when directory is already created or path is a path to file. + """ try: os.makedirs(path) except OSError as exc: @@ -415,10 +457,13 @@ def mkdir_p(path): # type: (str) -> None raise -def unpack(filename, destination): # type: (str, str) -> None +def unpack(filename: str, destination: str) -> None: + """ + Extracts file specified by filename into destination depending on its type. + """ info('Extracting {0} to {1}'.format(filename, destination)) if filename.endswith(('.tar.gz', '.tgz')): - archive_obj = tarfile.open(filename, 'r:gz') # type: Union[TarFile, ZipFile] + archive_obj: Union[TarFile, ZipFile] = tarfile.open(filename, 'r:gz') elif filename.endswith(('.tar.xz')): archive_obj = tarfile.open(filename, 'r:xz') elif filename.endswith(('.tar.bz2')): @@ -442,7 +487,10 @@ def unpack(filename, destination): # type: (str, str) -> None os.chmod(extracted_file, extracted_permissions) -def splittype(url): # type: (str) -> Tuple[Optional[str], str] +def splittype(url: str) -> Tuple[Optional[str], str]: + """ + Splits given url into its type (e.g. https, file) and the rest. + """ match = re.match('([^/:]+):(.*)', url, re.DOTALL) if match: scheme, data = match.groups() @@ -450,9 +498,14 @@ def splittype(url): # type: (str) -> Tuple[Optional[str], str] return None, url -# An alternative version of urlretrieve which takes SSL context as an argument -def urlretrieve_ctx(url, filename, reporthook=None, data=None, context=None): - # type: (str, str, Optional[Callable[[int, int, int], None]], Optional[bytes], Optional[SSLContext]) -> Tuple[str, addinfourl] +def urlretrieve_ctx(url: str, + filename: str, + reporthook: Optional[Callable[[int, int, int], None]]=None, + data: Optional[bytes]=None, + context: Optional[SSLContext]=None) -> Tuple[str, addinfourl]: + """ + Retrieve data from given URL. An alternative version of urlretrieve which takes SSL context as an argument. + """ url_type, path = splittype(url) # urlopen doesn't have context argument in Python <=2.7.9 @@ -498,7 +551,10 @@ def urlretrieve_ctx(url, filename, reporthook=None, data=None, context=None): return result -def download(url, destination): # type: (str, str) -> Optional[Exception] +def download(url: str, destination: str) -> Union[None, Exception]: + """ + Download from given url and save into given destiantion. + """ info(f'Downloading {url}') info(f'Destination: {destination}') try: @@ -522,12 +578,14 @@ def download(url, destination): # type: (str, str) -> Optional[Exception] sys.stdout.flush() -# Sometimes renaming a directory on Windows (randomly?) causes a PermissionError. -# This is confirmed to be a workaround: -# https://github.com/espressif/esp-idf/issues/3819#issuecomment-515167118 -# https://github.com/espressif/esp-idf/issues/4063#issuecomment-531490140 -# https://stackoverflow.com/a/43046729 -def rename_with_retry(path_from, path_to): # type: (str, str) -> None +def rename_with_retry(path_from: str, path_to: str) -> None: + """ + Sometimes renaming a directory on Windows (randomly?) causes a PermissionError. + This is confirmed to be a workaround: + https://github.com/espressif/esp-idf/issues/3819#issuecomment-515167118 + https://github.com/espressif/esp-idf/issues/4063#issuecomment-531490140 + https://stackoverflow.com/a/43046729 + """ retry_count = 20 if sys.platform.startswith('win') else 1 for retry in range(retry_count): try: @@ -543,7 +601,11 @@ def rename_with_retry(path_from, path_to): # type: (str, str) -> None time.sleep(0.5) -def do_strip_container_dirs(path, levels): # type: (str, int) -> None +def do_strip_container_dirs(path: str, levels: int) -> None: + """ + The number of top directory levels specified by levels argument will be removed when extracting. + E.g. if levels=2, archive path a/b/c/d.txt will be extracted as c/d.txt. + """ assert levels > 0 # move the original directory out of the way (add a .tmp suffix) tmp_path = path + '.tmp' @@ -569,20 +631,32 @@ def do_strip_container_dirs(path, levels): # type: (str, int) -> None shutil.rmtree(tmp_path) -class ToolNotFound(RuntimeError): +class ToolNotFoundError(RuntimeError): + """ + Raise when the tool is not found (not present in the paths etc.). + """ pass class ToolExecError(RuntimeError): + """ + Raise when the tool returns with a non-zero exit code. + """ pass class ToolBinaryError(RuntimeError): + """" + Raise when an error occurred when running any version of the tool. + """ pass class IDFToolDownload(object): - def __init__(self, platform_name, url, size, sha256, rename_dist): # type: (str, str, int, str, str) -> None + """ + Structure to store all the relevant information about particular download. + """ + def __init__(self, platform_name: str, url: str, size: int, sha256: str, rename_dist: str) -> None: self.platform_name = platform_name self.url = url self.size = size @@ -592,19 +666,24 @@ class IDFToolDownload(object): @functools.total_ordering class IDFToolVersion(object): + """ + Used for storing information about version; status (recommended, supported, deprecated) + and easy way of comparing different versions. Also allows platform compatibility check + and getting right download for given platform, if available. + """ STATUS_RECOMMENDED = 'recommended' STATUS_SUPPORTED = 'supported' STATUS_DEPRECATED = 'deprecated' STATUS_VALUES = [STATUS_RECOMMENDED, STATUS_SUPPORTED, STATUS_DEPRECATED] - def __init__(self, version, status): # type: (str, str) -> None + def __init__(self, version: str, status: str) -> None: self.version = version self.status = status - self.downloads = OrderedDict() # type: OrderedDict[str, IDFToolDownload] + self.downloads: OrderedDict[str, IDFToolDownload] = OrderedDict() self.latest = False - def __lt__(self, other): # type: (IDFToolVersion) -> bool + def __lt__(self, other: 'IDFToolVersion') -> bool: if self.status != other.status: return self.status > other.status else: @@ -612,15 +691,21 @@ class IDFToolVersion(object): and other.status == IDFToolVersion.STATUS_RECOMMENDED) return self.version < other.version - def __eq__(self, other): # type: (object) -> bool + def __eq__(self, other: object) -> bool: if not isinstance(other, IDFToolVersion): return NotImplemented return self.status == other.status and self.version == other.version - def add_download(self, platform_name, url, size, sha256, rename_dist=''): # type: (str, str, int, str, str) -> None + def add_download(self, platform_name: str, url: str, size: int, sha256: str, rename_dist: str = '') -> None: + """ + Add download entry of type IDFToolDownload into self.downloads. + """ self.downloads[platform_name] = IDFToolDownload(platform_name, url, size, sha256, rename_dist) - def get_download_for_platform(self, platform_name): # type: (Optional[str]) -> Optional[IDFToolDownload] + def get_download_for_platform(self, platform_name: Optional[str]) -> Optional[IDFToolDownload]: + """ + Get download for given platform if usable download already exists. + """ platform_name = Platforms.get(platform_name) if platform_name and platform_name in self.downloads.keys(): return self.downloads[platform_name] @@ -628,11 +713,16 @@ class IDFToolVersion(object): return self.downloads['any'] return None - def compatible_with_platform(self, platform_name=PYTHON_PLATFORM): - # type: (Optional[str]) -> bool + def compatible_with_platform(self, platform_name: Optional[str] = PYTHON_PLATFORM) -> bool: + """ + Check whether this version is compatible with given platform name. + """ return self.get_download_for_platform(platform_name) is not None - def get_supported_platforms(self): # type: () -> set[str] + def get_supported_platforms(self) -> Set[str]: + """ + Get all platforms for which this version has a valid download record. + """ return set(self.downloads.keys()) @@ -651,35 +741,53 @@ IDFToolOptions = namedtuple('IDFToolOptions', [ class IDFTool(object): + """ + Used to store info about IDF tools from tools.json file in a Python-accesible form. + The term "IDF tool" is used for e.g. CMake, ninja, QUEMU and toolchains. + """ # possible values of 'install' field INSTALL_ALWAYS = 'always' INSTALL_ON_REQUEST = 'on_request' INSTALL_NEVER = 'never' - def __init__(self, name, description, install, info_url, license, version_cmd, version_regex, supported_targets, version_regex_replace=None, - strip_container_dirs=0, is_executable=True): - # type: (str, str, str, str, str, List[str], str, List[str], Optional[str], int, bool) -> None + def __init__(self, name: str, + description: str, + install: str, + info_url: str, + license: str, + version_cmd: List[str], + version_regex: str, + supported_targets: List[str], + version_regex_replace: Optional[str] = None, + strip_container_dirs: int = 0, + is_executable: bool = True) -> None: self.name = name self.description = description self.drop_versions() - self.version_in_path = None # type: Optional[str] - self.versions_installed = [] # type: List[str] + self.version_in_path: Optional[str] = None + self.versions_installed: List[str] = [] if version_regex_replace is None: version_regex_replace = VERSION_REGEX_REPLACE_DEFAULT self.options = IDFToolOptions(version_cmd, version_regex, version_regex_replace, is_executable, [], OrderedDict(), install, info_url, license, strip_container_dirs, supported_targets) # type: ignore - self.platform_overrides = [] # type: List[Dict[str, str]] + self.platform_overrides: List[Dict[str, str]] = [] self._platform = CURRENT_PLATFORM self._update_current_options() self.is_executable = is_executable - def copy_for_platform(self, platform): # type: (str) -> IDFTool + def copy_for_platform(self, platform: str) -> 'IDFTool': + """ + Copy the IDFTool record in respect to given platform (e.g. apply platform overrides). + """ result = copy.deepcopy(self) result._platform = platform result._update_current_options() return result - def _update_current_options(self): # type: () -> None + def _update_current_options(self) -> None: + """ + Update current options by platform overrides, if applicable for current platform. + """ self._current_options = IDFToolOptions(*self.options) for override in self.platform_overrides: if self._platform and self._platform not in override['platforms']: @@ -688,29 +796,44 @@ class IDFTool(object): del override_dict['platforms'] self._current_options = self._current_options._replace(**override_dict) # type: ignore - def drop_versions(self): # type: () -> None - self.versions = OrderedDict() # type: Dict[str, IDFToolVersion] + def drop_versions(self) -> None: + """ + Clear self.versions dictionary. + """ + self.versions: Dict[str, IDFToolVersion] = OrderedDict() - def add_version(self, version): # type: (IDFToolVersion) -> None + def add_version(self, version: IDFToolVersion) -> None: + """ + Add new IDFVersion to self.versions. + """ assert type(version) is IDFToolVersion self.versions[version.version] = version - def get_path(self): # type: () -> str + def get_path(self) -> str: + """ + Returns path where the tool is installed. + """ return os.path.join(global_idf_tools_path or '', 'tools', self.name) - def get_path_for_version(self, version): # type: (str) -> str + def get_path_for_version(self, version: str) -> str: + """ + Returns path for the tool of given version. + """ assert version in self.versions return os.path.join(self.get_path(), version) - def get_export_paths(self, version): # type: (str) -> List[str] + def get_export_paths(self, version: str) -> List[str]: + """ + Returns a list of paths that need to be exported. + """ tool_path = self.get_path_for_version(version) return [os.path.join(tool_path, *p) for p in self._current_options.export_paths] # type: ignore - def get_export_vars(self, version): # type: (str) -> Dict[str, str] + def get_export_vars(self, version: str) -> Dict[str, str]: """ Get the dictionary of environment variables to be exported, for the given version. Expands: - - ${TOOL_PATH} => the actual path where the version is installed + - ${TOOL_PATH} => the actual path where the version is installed. """ result = {} for k, v in self._current_options.export_vars.items(): # type: ignore @@ -723,11 +846,11 @@ class IDFTool(object): result[k] = v_repl return result - def get_version(self, extra_paths=None, executable_path=None): # type: (Optional[List[str]], Optional[str]) -> str + def get_version(self, extra_paths: Optional[List[str]] = None, executable_path: Optional[str] = None) -> str: """ Execute the tool, optionally prepending extra_paths to PATH, extract the version string and return it as a result. - Raises ToolNotFound if the tool is not found (not present in the paths). + Raises ToolNotFoundError if the tool is not found (not present in the paths). Raises ToolExecError if the tool returns with a non-zero exit code. Returns 'unknown' if tool returns something from which version string can not be extracted. @@ -742,13 +865,13 @@ class IDFTool(object): # There is no command available, so return early. It seems that # within some very strange context empty [''] may actually execute # something https://github.com/espressif/esp-idf/issues/11880 - raise ToolNotFound('Tool {} not found'.format(self.name)) + raise ToolNotFoundError('Tool {} not found'.format(self.name)) try: version_cmd_result = run_cmd_check_output(cmd, None, extra_paths) except OSError: # tool is not on the path - raise ToolNotFound('Tool {} not found'.format(self.name)) + raise ToolNotFoundError('Tool {} not found'.format(self.name)) except subprocess.CalledProcessError as e: raise ToolExecError('returned non-zero exit code ({}) with error message:\n{}'.format( e.returncode, e.stderr.decode('utf-8',errors='ignore'))) # type: ignore @@ -759,33 +882,51 @@ class IDFTool(object): return UNKNOWN_VERSION return re.sub(self._current_options.version_regex, self._current_options.version_regex_replace, match.group(0)) # type: ignore - def check_version(self, executable_path): # type: (Optional[str]) -> bool + def check_version(self, executable_path: Optional[str]) -> bool: + """ + Check if tool's version from executable path is in self.version dictionary. + """ version = self.get_version(executable_path=executable_path) return version in self.versions - def get_install_type(self): # type: () -> Callable[[str], None] + def get_install_type(self) -> Callable[[str], None]: + """ + Returns whether the tools are installed always, on request or never. + """ return self._current_options.install # type: ignore - def get_supported_targets(self): # type: () -> list[str] + def get_supported_targets(self) -> List[str]: + """ + Returns list of supported targets with current options. + """ return self._current_options.supported_targets # type: ignore - def is_supported_for_any_of_targets(self, targets): # type: (list[str]) -> bool + def is_supported_for_any_of_targets(self, targets: List[str]) -> bool: """ Checks whether the tool is suitable for at least one of the specified targets. """ supported_targets = self.get_supported_targets() return (any(item in targets for item in supported_targets) or supported_targets == ['all']) - def compatible_with_platform(self): # type: () -> bool + def compatible_with_platform(self) -> bool: + """ + Checks whether this tool (any version) is compatible with the platform. + """ return any([v.compatible_with_platform() for v in self.versions.values()]) - def get_supported_platforms(self): # type: () -> Set[str] + def get_supported_platforms(self) -> Set[str]: + """ + Return set of platforms that are supported by at least one version of the tool. + """ result = set() for v in self.versions.values(): result.update(v.get_supported_platforms()) return result - def get_recommended_version(self): # type: () -> Optional[str] + def get_recommended_version(self) -> Optional[str]: + """ + Get all reccomended versions of the tool. If more versions are recommended, highest version is returned. + """ recommended_versions = [k for k, v in self.versions.items() if v.status == IDFToolVersion.STATUS_RECOMMENDED and v.compatible_with_platform(self._platform)] @@ -794,7 +935,10 @@ class IDFTool(object): return recommended_versions[0] return None - def get_preferred_installed_version(self): # type: () -> Optional[str] + def get_preferred_installed_version(self) -> Optional[str]: + """ + Get the preferred installed version of the tool. If more versions installed, return the highest. + """ recommended_versions = [k for k in self.versions_installed if self.versions[k].status == IDFToolVersion.STATUS_RECOMMENDED and self.versions[k].compatible_with_platform(self._platform)] @@ -803,21 +947,20 @@ class IDFTool(object): return recommended_versions[0] return None - def find_installed_versions(self): # type: () -> None + def find_installed_versions(self) -> None: """ Checks whether the tool can be found in PATH and in global_idf_tools_path. Writes results to self.version_in_path and self.versions_installed. - Raises ToolBinaryError if an error occurred when running any version of the tool + Raises ToolBinaryError if an error occurred when running any version of the tool. """ - # this function can not be called for a different platform assert self._platform == CURRENT_PLATFORM tool_error = False # First check if the tool is in system PATH try: ver_str = self.get_version() - except ToolNotFound: + except ToolNotFoundError: # not in PATH pass except ToolExecError as e: @@ -841,7 +984,7 @@ class IDFTool(object): continue try: ver_str = self.get_version(self.get_export_paths(version)) - except ToolNotFound: + except ToolNotFoundError: warn('directory for tool {} version {} is present, but tool was not found'.format( self.name, version)) except ToolExecError as e: @@ -857,7 +1000,7 @@ class IDFTool(object): if tool_error: raise ToolBinaryError - def latest_installed_version(self): # type: () -> Optional[str] + def latest_installed_version(self) -> Optional[str]: """ Get the latest installed tool version by directly checking the tool's version directories. @@ -874,7 +1017,7 @@ class IDFTool(object): paths = [os.path.join(tool_path, version, *p) for p in self._current_options.export_paths] try: ver_str = self.get_version(paths) - except (ToolNotFound, ToolExecError): + except (ToolNotFoundError, ToolExecError): continue if ver_str != version: continue @@ -882,7 +1025,10 @@ class IDFTool(object): return None - def download(self, version): # type: (str) -> None + def download(self, version: str) -> None: + """ + Download archive of the tool for platform given by self._platform. + """ assert version in self.versions download_obj = self.versions[version].get_download_for_platform(self._platform) if not download_obj: @@ -918,7 +1064,11 @@ class IDFTool(object): print_hints_on_download_error(str(err)) raise SystemExit(1) - def install(self, version): # type: (str) -> None + def install(self, version: str) -> None: + """ + Unpack archive to destination directory and remove given number of top-level folder if specified. + Should always be called after IDFTool.download(). + """ # Currently this is called after calling 'download' method, so here are a few asserts # for the conditions which should be true once that method is done. assert version in self.versions @@ -937,7 +1087,10 @@ class IDFTool(object): do_strip_container_dirs(dest_dir, self._current_options.strip_container_dirs) # type: ignore @staticmethod - def check_download_file(download_obj, local_path): # type: (IDFToolDownload, str) -> bool + def check_download_file(download_obj: IDFToolDownload, local_path: str) -> bool: + """ + Compare the computed sha256 to sha256 specified by downloaded archive. + """ expected_sha256 = download_obj.sha256 expected_size = download_obj.size file_size, file_sha256 = get_file_size_sha256(local_path) @@ -950,7 +1103,10 @@ class IDFTool(object): return True @classmethod - def from_json(cls, tool_dict): # type: (Dict[str, Union[str, List[str], Dict[str, str]]]) -> IDFTool + def from_json(cls, tool_dict: Dict[str, Union[str, List[str], Dict[str, str]]]) -> 'IDFTool': + """ + Create IDFTool class instance form its JSON dump. + """ # Validate json fields tool_name = tool_dict.get('name') # type: ignore if not isinstance(tool_name, str): @@ -1013,9 +1169,9 @@ class IDFTool(object): raise RuntimeError('supported_targets for tool %s is not a list of strings' % tool_name) # Create the object - tool_obj = cls(tool_name, description, install, info_url, license, # type: ignore - version_cmd, version_regex, supported_targets, version_regex_replace, # type: ignore - strip_container_dirs, is_executable) # type: ignore + tool_obj: 'IDFTool' = cls(tool_name, description, install, info_url, license, # type: ignore + version_cmd, version_regex, supported_targets, version_regex_replace, # type: ignore + strip_container_dirs, is_executable) # type: ignore for path in export_paths: # type: ignore tool_obj.options.export_paths.append(path) # type: ignore @@ -1056,7 +1212,7 @@ class IDFTool(object): raise RuntimeError('export_vars for override %d of tool %s is not a mapping' % (index, tool_name)) tool_obj.platform_overrides.append(override) # type: ignore - recommended_versions = {} # type: dict[str, list[str]] + recommended_versions:Dict[str, List[str]] = {} for version_dict in versions: # type: ignore version = version_dict.get('name') # type: ignore if not isinstance(version, str): @@ -1097,6 +1253,9 @@ class IDFTool(object): return tool_obj def to_json(self): # type: ignore + """ + Make a JSON dump of self. + """ versions_array = [] for version, version_obj in self.versions.items(): version_json = { @@ -1146,8 +1305,8 @@ class IDFTool(object): class IDFEnvEncoder(JSONEncoder): """ - IDFEnvEncoder is used for encoding IDFEnv, IDFRecord, SelectedIDFRecord classes to JSON in readable format. Not as (__main__.IDFRecord object at '0x7fcxx') - Additionally remove first underscore with private properties when processing + IDFEnvEncoder is used for encoding IDFEnv, IDFRecord, SelectedIDFRecord classes to JSON in readable format. Not as (__main__.IDFRecord object at '0x7fcxx'). + Additionally remove first underscore with private properties when processing. """ def default(self, obj): # type: ignore return {k.lstrip('_'): v for k, v in vars(obj).items()} @@ -1164,10 +1323,10 @@ class IDFRecord: - Default value is [], since user didn't define any targets yet """ def __init__(self) -> None: - self.version = '' # type: str - self.path = '' # type: str - self._features = ['core'] # type: list[str] - self._targets = [] # type: list[str] + self.version: str = '' + self.path: str = '' + self._features: List[str] = ['core'] + self._targets: List[str] = [] def __iter__(self): # type: ignore yield from { @@ -1221,14 +1380,14 @@ class IDFRecord: self._targets = list(set(targets + self._targets)) @classmethod - def get_active_idf_record(cls): # type: () -> IDFRecord + def get_active_idf_record(cls) -> 'IDFRecord': idf_record_obj = cls() idf_record_obj.version = get_idf_version() idf_record_obj.path = global_idf_path or '' return idf_record_obj @classmethod - def get_idf_record_from_dict(cls, record_dict): # type: (Dict[str, Any]) -> IDFRecord + def get_idf_record_from_dict(cls, record_dict: Dict[str, Any]) -> 'IDFRecord': idf_record_obj = cls() try: idf_record_obj.version = record_dict['version'] @@ -1245,15 +1404,14 @@ class IDFRecord: class IDFEnv: """ - IDFEnv represents ESP-IDF Environments installed on system and is responsible for loading and saving structured data - All information is saved and loaded from IDF_ENV_FILE + IDFEnv represents ESP-IDF Environments installed on system and is responsible for loading and saving structured data. + All information is saved and loaded from IDF_ENV_FILE. Contains: - * idf_installed - all installed environments of ESP-IDF on system + * idf_installed - all installed environments of ESP-IDF on system. """ - def __init__(self) -> None: active_idf_id = active_repo_id() - self.idf_installed = {active_idf_id: IDFRecord.get_active_idf_record()} # type: Dict[str, IDFRecord] + self.idf_installed: Dict[str, IDFRecord] = {active_idf_id: IDFRecord.get_active_idf_record()} def __iter__(self): # type: ignore yield from { @@ -1268,7 +1426,7 @@ class IDFEnv: def save(self) -> None: """ - Diff current class instance with instance loaded from IDF_ENV_FILE and save only if are different + Diff current class instance with instance loaded from IDF_ENV_FILE and save only if are different. """ # It is enough to compare just active records because others can't be touched by the running script if self.get_active_idf_record() != self.get_idf_env().get_active_idf_record(): @@ -1289,8 +1447,10 @@ class IDFEnv: return self.idf_installed[active_repo_id()] @classmethod - def get_idf_env(cls): # type: () -> IDFEnv - # IDFEnv class is used to process IDF_ENV_FILE file. The constructor is therefore called only in this method that loads the file and checks its contents + def get_idf_env(cls) -> 'IDFEnv': + """ + IDFEnv class is used to process IDF_ENV_FILE file. The constructor is therefore called only in this method that loads the file and checks its contents. + """ idf_env_obj = cls() try: idf_env_file_path = os.path.join(global_idf_tools_path or '', IDF_ENV_FILE) @@ -1305,7 +1465,7 @@ class IDFEnv: else: # Load and verify ESP-IDF records found in IDF_ENV_FILE idf_installed.pop('sha', None) - idf_installed_verified = {} # type: dict[str, IDFRecord] + idf_installed_verified:Dict[str, IDFRecord] = {} for idf in idf_installed: try: idf_installed_verified[idf] = IDFRecord.get_idf_record_from_dict(idf_installed[idf]) @@ -1324,10 +1484,10 @@ class IDFEnv: class ENVState: """ - ENVState is used to handle IDF global variables that are set in environment and need to be removed when switching between ESP-IDF versions in opened shell - Every opened shell/terminal has it's own temporary file to store these variables - The temporary file's name is generated automatically with suffix 'idf_ + opened shell ID'. Path to this tmp file is stored as env global variable (env_key) - The shell ID is crucial, since in one terminal can be opened more shells + ENVState is used to handle IDF global variables that are set in environment and need to be removed when switching between ESP-IDF versions in opened shell. + Every opened shell/terminal has it's own temporary file to store these variables. + The temporary file's name is generated automatically with suffix 'idf_ + opened shell ID'. Path to this tmp file is stored as env global variable (env_key). + The shell ID is crucial, since in one terminal can be opened more shells. * env_key - global variable name/key * deactivate_file_path - global variable value (generated tmp file name) * idf_variables - loaded IDF variables from file @@ -1336,10 +1496,10 @@ class ENVState: deactivate_file_path = os.environ.get(env_key, '') def __init__(self) -> None: - self.idf_variables = {} # type: Dict[str, Any] + self.idf_variables: Dict[str, Any] = {} @classmethod - def get_env_state(cls): # type: () -> ENVState + def get_env_state(cls) -> 'ENVState': env_state_obj = cls() if cls.deactivate_file_path: @@ -1366,9 +1526,9 @@ class ENVState: return self.deactivate_file_path -def load_tools_info(): # type: () -> dict[str, IDFTool] +def load_tools_info() -> Dict[str, IDFTool]: """ - Load tools metadata from tools.json, return a dictionary: tool name - tool info + Load tools metadata from tools.json, return a dictionary: tool name - tool info. """ tool_versions_file_name = global_tools_json @@ -1396,16 +1556,27 @@ def parse_tools_info_json(tools_info): # type: ignore return tools_dict -def dump_tools_json(tools_info): # type: ignore +def dump_tools_json(tools_info: Optional[Dict[str, IDFTool]]): # type: ignore + """ + Dump all the tools into JSON. + """ tools_array = [] - for tool_name, tool_obj in tools_info.items(): - tool_json = tool_obj.to_json() - tools_array.append(tool_json) - file_json = {'version': TOOLS_FILE_VERSION, 'tools': tools_array} - return json.dumps(file_json, indent=2, separators=(',', ': '), sort_keys=True) + if tools_info: + for _, tool_obj in tools_info.items(): + tool_json = tool_obj.to_json() + tools_array.append(tool_json) + file_json = {'version': TOOLS_FILE_VERSION, 'tools': tools_array} + return json.dumps(file_json, indent=2, separators=(',', ': '), sort_keys=True) + else: + return json.dumps({}, indent=2, separators=(',', ': '), sort_keys=True) def get_python_exe_and_subdir() -> Tuple[str, str]: + """ + Returns: + * python executable name (python on Unix, python.exe on Win) + * subdir in which the executable is (bin for Unix, Scripts for Win) + """ if sys.platform == 'win32': subdir = 'Scripts' python_exe = 'python.exe' @@ -1416,6 +1587,9 @@ def get_python_exe_and_subdir() -> Tuple[str, str]: def get_idf_version() -> str: + """ + Return ESP-IDF version. + """ version_file_path = os.path.join(global_idf_path, 'version.txt') # type: ignore if os.path.exists(version_file_path): with open(version_file_path, 'r') as version_file: @@ -1435,7 +1609,7 @@ def get_idf_version() -> str: pass match = re.match(r'^v([0-9]+\.[0-9]+).*', idf_version_str) if match: - idf_version = match.group(1) # type: Optional[str] + idf_version: Optional[str] = match.group(1) else: idf_version = None # fallback when IDF is a shallow clone @@ -1458,6 +1632,9 @@ def get_idf_version() -> str: def get_python_env_path() -> Tuple[str, str, str, str]: + """ + Returns tuple of Python environment path, Python env. path with subdir and full path from Python (i.e. with executable). + """ python_ver_major_minor = '{}.{}'.format(sys.version_info.major, sys.version_info.minor) idf_version = get_idf_version() @@ -1471,9 +1648,9 @@ def get_python_env_path() -> Tuple[str, str, str, str]: return idf_python_env_path, idf_python_export_path, virtualenv_python, idf_version -def parse_tools_arg(tools_str): # type: (List[str]) -> List[str] +def parse_tools_arg(tools_str: List[str]) -> List[str]: """ - Base parsing "tools" argumets: all, required, etc + Base parsing "tools" argumets: all, required, etc. """ if not tools_str: return ['required'] @@ -1481,8 +1658,9 @@ def parse_tools_arg(tools_str): # type: (List[str]) -> List[str] return tools_str -def expand_tools_arg(tools_spec, overall_tools, targets): # type: (list[str], OrderedDict, list[str]) -> list[str] - """ Expand list of tools 'tools_spec' in according: +def expand_tools_arg(tools_spec: List[str], overall_tools: OrderedDict, targets: List[str]) -> List[str]: + """ + Expand list of tools 'tools_spec' in according: - a tool is in the 'overall_tools' list - consider metapackages like "required" and "all" - process wildcards in tool names @@ -1506,9 +1684,9 @@ def expand_tools_arg(tools_spec, overall_tools, targets): # type: (list[str], O return tools -def parse_targets_arg(targets_str): # type: (str) -> List[str] +def parse_targets_arg(targets_str: str) -> List[str]: """ - Parse and check if targets_str is a valid list of targets and return a target list + Parse and check if targets_str is a valid list of targets and return a target list. """ targets_from_tools_json = get_all_targets_from_tools_json() invalid_targets = [] @@ -1525,20 +1703,26 @@ def parse_targets_arg(targets_str): # type: (str) -> List[str] return targets -def add_and_check_targets(idf_env_obj, targets_str): # type: (IDFEnv, str) -> list[str] +def add_and_check_targets(idf_env_obj: IDFEnv, targets_str: str) -> List[str]: """ - Define targets from targets_str, check that the target names are valid and add them to idf_env_obj + Define targets from targets_str, check that the target names are valid and add them to idf_env_obj. """ targets = parse_targets_arg(targets_str) idf_env_obj.get_active_idf_record().extend_targets(targets) return idf_env_obj.get_active_idf_record().targets -def feature_to_requirements_path(feature): # type: (str) -> str +def feature_to_requirements_path(feature: str) -> str: + """ + Convert feature (ci, core, docs, gdbgui, pytest, ...) to the path to its requirements.txt. + """ return os.path.join(global_idf_path or '', 'tools', 'requirements', 'requirements.{}.txt'.format(feature)) -def process_and_check_features(idf_env_obj, features_str): # type: (IDFEnv, str) -> list[str] +def process_and_check_features(idf_env_obj: IDFEnv, features_str: str) -> List[str]: + """ + Check whether new feature is valid. If yes, update features in active IDF record. + """ new_features = [] remove_features = [] for new_feature_candidate in features_str.split(','): @@ -1553,9 +1737,12 @@ def process_and_check_features(idf_env_obj, features_str): # type: (IDFEnv, str return idf_env_obj.get_active_idf_record().features -def get_all_targets_from_tools_json(): # type: () -> list[str] +def get_all_targets_from_tools_json() -> List[str]: + """ + Returns list of all targets from tools.json file. + """ tools_info = load_tools_info() - targets_from_tools_json = [] # type: list[str] + targets_from_tools_json: List[str] = [] for _, v in tools_info.items(): targets_from_tools_json.extend(v.get_supported_targets()) @@ -1566,7 +1753,12 @@ def get_all_targets_from_tools_json(): # type: () -> list[str] return sorted(targets_from_tools_json) -def filter_tools_info(idf_env_obj, tools_info): # type: (IDFEnv, OrderedDict[str, IDFTool]) -> OrderedDict[str,IDFTool] +def filter_tools_info(idf_env_obj: IDFEnv, tools_info: Dict[str, IDFTool]) -> Dict[str,IDFTool]: + """ + Filter tools info; return only those targets which: + * are installable (install type is INSTALL_ALWAYS or INSTALL_ON_REQUEST) + * support at least one target from active IDF record + """ targets = idf_env_obj.get_active_idf_record().targets if not targets: return tools_info @@ -1577,7 +1769,7 @@ def filter_tools_info(idf_env_obj, tools_info): # type: (IDFEnv, OrderedDict[st return OrderedDict(filtered_tools_spec) -def add_variables_to_deactivate_file(args, new_idf_vars): # type: (list[str], dict[str, Any]) -> str +def add_variables_to_deactivate_file(args: List[str], new_idf_vars:Dict[str, Any]) -> str: """ Add IDF global variables that need to be removed when the active esp-idf environment is deactivated. """ @@ -1601,17 +1793,17 @@ def add_variables_to_deactivate_file(args, new_idf_vars): # type: (list[str], d return deactivate_file_path -def deactivate_statement(args): # type: (list[str]) -> None +def deactivate_statement(args: List[str]) -> None: """ Deactivate statement is sequence of commands, that remove IDF global variables from enviroment, - so the environment gets to the state it was before calling export.{sh/fish} script. + so the environment gets to the state it was before calling export.{sh/fish} script. """ env_state_obj = ENVState.get_env_state() if not env_state_obj.idf_variables: warn('No IDF variables to remove from environment found. Deactivation of previous esp-idf version was not successful.') return unset_vars = env_state_obj.idf_variables - env_path = os.getenv('PATH') # type: Optional[str] + env_path: Optional[str] = os.getenv('PATH') if env_path: cleared_env_path = ':'.join([k for k in env_path.split(':') if k not in unset_vars['PATH']]) @@ -1631,16 +1823,24 @@ def deactivate_statement(args): # type: (list[str]) -> None return -def get_export_format_and_separator(args): # type: (list[str]) -> Tuple[str, str] +def get_export_format_and_separator(args: List[str]) -> Tuple[str, str]: + """ + Returns export pattern (formatted string) either for exporting in shell or as a key-value pair. + """ return {EXPORT_SHELL: ('export {}="{}"', ';'), EXPORT_KEY_VALUE: ('{}={}', '\n')}[args.format] # type: ignore -def get_unset_format_and_separator(args): # type: (list[str]) -> Tuple[str, str] +def get_unset_format_and_separator(args: List[str]) -> Tuple[str, str]: + """ + Returns pattern to unset a variable (formatted string) either for shell or for key-value pair. + """ return {EXPORT_SHELL: ('unset {}', ';'), EXPORT_KEY_VALUE: ('{}', '\n')}[args.format] # type: ignore def different_idf_detected() -> bool: - + """ + Checks if new IDF detected. + """ # If IDF global variable found, test if belong to different ESP-IDF version if 'IDF_TOOLS_EXPORT_CMD' in os.environ: if global_idf_path != os.path.dirname(os.environ['IDF_TOOLS_EXPORT_CMD']): @@ -1658,15 +1858,20 @@ def different_idf_detected() -> bool: return True -# Function returns unique id of running ESP-IDF combining current idfpath with version. -# The id is unique with same version & different path or same path & different version. def active_repo_id() -> str: + """ + Function returns unique id of running ESP-IDF combining current idfpath with version. + The id is unique with same version & different path or same path & different version. + """ if global_idf_path is None: return 'UNKNOWN_PATH' + '-v' + get_idf_version() return global_idf_path + '-v' + get_idf_version() def list_default(args): # type: ignore + """ + Prints currently installed versions of all tools compatible with current platform. + """ tools_info = load_tools_info() tool_error = False for name, tool in tools_info.items(): @@ -1692,6 +1897,9 @@ def list_default(args): # type: ignore def list_outdated(args): # type: ignore + """ + Prints info if currently installed version can be replaced by newer one for every tool. + """ tools_info = load_tools_info() for name, tool in tools_info.items(): if tool.get_install_type() == IDFTool.INSTALL_NEVER: @@ -1708,6 +1916,10 @@ def list_outdated(args): # type: ignore def action_list(args): # type: ignore + """ + If args.outdated flag is set, prints if currently installed tools can be replaced by their newer version. + If not, prints currently installed tools with their version. + """ if args.outdated: list_outdated(args) else: @@ -1715,6 +1927,9 @@ def action_list(args): # type: ignore def action_check(args): # type: ignore + """ + Checks what tools are installed and if some mandatory tool is missing, exits with return code 1. + """ tools_info = load_tools_info() tools_info = filter_tools_info(IDFEnv.get_idf_env(), tools_info) not_found_list = [] @@ -1747,13 +1962,16 @@ def action_check(args): # type: ignore raise SystemExit(1) -# The following functions are used in process_tool which is a part of the action_export. +# The following function is used in process_tool which is a part of the action_export. def handle_recommended_version_to_use( - tool, - tool_name, - version_to_use, - prefer_system_hint, -): # type: (IDFTool, str, str, str) -> Tuple[list, dict] + tool: IDFTool, + tool_name: str, + version_to_use: str, + prefer_system_hint: str, +) -> Tuple[list, dict]: + """ + If there is unsupported tools version in PATH, prints info about that. + """ tool_export_paths = tool.get_export_paths(version_to_use) tool_export_vars = tool.get_export_vars(version_to_use) if tool.version_in_path and tool.version_in_path not in tool.versions: @@ -1762,7 +1980,11 @@ def handle_recommended_version_to_use( return tool_export_paths, tool_export_vars -def handle_supported_or_deprecated_version(tool, tool_name): # type: (IDFTool, str) -> None +# The following function is used in process_tool which is a part of the action_export. +def handle_supported_or_deprecated_version(tool: IDFTool, tool_name: str) -> None: + """ + Prints info if supported, but not recommended or deprecated version of the tool is used. + """ version_obj: IDFToolVersion = tool.versions[tool.version_in_path] # type: ignore if version_obj.status == IDFToolVersion.STATUS_SUPPORTED: info('Using a supported version of tool {} found in PATH: {}.'.format(tool_name, tool.version_in_path), @@ -1773,12 +1995,16 @@ def handle_supported_or_deprecated_version(tool, tool_name): # type: (IDFTool, warn('using a deprecated version of tool {} found in PATH: {}'.format(tool_name, tool.version_in_path)) +# The following function is used in process_tool which is a part of the action_export. def handle_missing_versions( - tool, - tool_name, - install_cmd, - prefer_system_hint -): # type: (IDFTool, str, str, str) -> None + tool: IDFTool, + tool_name: str, + install_cmd: str, + prefer_system_hint: str +) -> None: + """ + Prints the info about missing tool to stderr if tool has no supported versions installed. + """ fatal('tool {} has no installed versions. Please run \'{}\' to install it.'.format( tool.name, install_cmd)) if tool.version_in_path and tool.version_in_path not in tool.versions: @@ -1787,12 +2013,19 @@ def handle_missing_versions( def process_tool( - tool, - tool_name, - args, - install_cmd, - prefer_system_hint -): # type: (IDFTool, str, argparse.Namespace, str, str) -> Tuple[list, dict, bool] + tool: IDFTool, + tool_name: str, + args: argparse.Namespace, + install_cmd: str, + prefer_system_hint: str +) -> Tuple[list, dict, bool]: + """ + Helper function used only in action export. + Returns: + * Paths that need to be exported. + * Dictionary of environment variables that need to be exported for the tool. + * Flag if any tool was found. + """ tool_found: bool = True tool_export_paths: List[str] = [] tool_export_vars: Dict[str, str] = {} @@ -1838,7 +2071,10 @@ def process_tool( return tool_export_paths, tool_export_vars, tool_found -def action_export(args): # type: ignore +def action_export(args: Any) -> None: + """ + Exports all necessary environment variables and paths needed for tools used. + """ if args.deactivate and different_idf_detected(): deactivate_statement(args) return @@ -1846,7 +2082,7 @@ def action_export(args): # type: ignore tools_info = load_tools_info() tools_info = filter_tools_info(IDFEnv.get_idf_env(), tools_info) all_tools_found = True - export_vars = {} + export_vars: Dict[str, str] = {} paths_to_export = [] self_restart_cmd = f'{sys.executable} {__file__}{(" --tools-json " + args.tools_json) if args.tools_json else ""}' @@ -1872,16 +2108,16 @@ def action_export(args): # type: ignore idf_python_env_path = to_shell_specific_paths([idf_python_env_path])[0] if os.getenv('IDF_PYTHON_ENV_PATH') != idf_python_env_path: export_vars['IDF_PYTHON_ENV_PATH'] = to_shell_specific_paths([idf_python_env_path])[0] - if idf_python_export_path not in current_path: + if current_path and idf_python_export_path not in current_path: # getenv can return None paths_to_export.append(idf_python_export_path) idf_version = get_idf_version() if os.getenv('ESP_IDF_VERSION') != idf_version: export_vars['ESP_IDF_VERSION'] = idf_version - idf_tools_dir = os.path.join(global_idf_path, 'tools') + idf_tools_dir = os.path.join(global_idf_path, 'tools') # type: ignore idf_tools_dir = to_shell_specific_paths([idf_tools_dir])[0] - if idf_tools_dir not in current_path: + if current_path and idf_tools_dir not in current_path: paths_to_export.append(idf_tools_dir) if sys.platform == 'win32': @@ -1913,15 +2149,20 @@ def action_export(args): # type: ignore print(export_statements) -def get_idf_download_url_apply_mirrors(args=None, download_url=IDF_DL_URL): # type: (Any, str) -> str +def get_idf_download_url_apply_mirrors(args: Any = None, download_url: str = IDF_DL_URL) -> str: + """ + Returns URL for ESP-IDF download with applied mirrors if available. + If original URL pointed to Github and IDF_GITHUB_ASSETS is set, change the source to Espressif's download servers. + """ url = apply_mirror_prefix_map(args, download_url) url = apply_github_assets_option(url) return url -def apply_mirror_prefix_map(args, idf_download_url): # type: (Any, str) -> str - """Rewrite URL for given idf_download_url. - if --mirror-prefix-map flag or IDF_MIRROR_PREFIX_MAP environment variable is given. +def apply_mirror_prefix_map(args: Any, idf_download_url: str) -> str: + """ + Rewrite URL for given idf_download_url. + If --mirror-prefix-map flag or IDF_MIRROR_PREFIX_MAP environment variable is given. """ new_url = idf_download_url mirror_prefix_map = None @@ -1946,8 +2187,9 @@ def apply_mirror_prefix_map(args, idf_download_url): # type: (Any, str) -> str return new_url -def apply_github_assets_option(idf_download_url): # type: (str) -> str - """ Rewrite URL for given idf_download_url if the download URL is an https://github.com/ URL and the variable +def apply_github_assets_option(idf_download_url: str) -> str: + """ + Rewrite URL for given idf_download_url if the download URL is an https://github.com/ URL and the variable IDF_GITHUB_ASSETS is set. The github.com part of the URL will be replaced. """ new_url = idf_download_url @@ -1970,9 +2212,12 @@ def apply_github_assets_option(idf_download_url): # type: (str) -> str return new_url -def get_tools_spec_and_platform_info(selected_platform, targets, tools_spec, - quiet=False): # type: (str, list[str], list[str], bool) -> Tuple[list[str], Dict[str, IDFTool]] - # If this function is not called from action_download, but is used just for detecting active tools, info about downloading is unwanted. +def get_tools_spec_and_platform_info(selected_platform: str, targets: List[str], tools_spec: List[str], + quiet: bool = False) -> Tuple[List[str], Dict[str, IDFTool]]: + """ + Returns tools_spec list and dict of tools for selected platform in form tool_name : IDFTool object. + NOTE: If this function is not called from action_download, but is used just for detecting active tools, info about downloading is unwanted. + """ global global_quiet try: old_global_quiet = global_quiet @@ -1992,9 +2237,13 @@ def get_tools_spec_and_platform_info(selected_platform, targets, tools_spec, def action_download(args): # type: ignore + """ + Saves current IDF environment and for every tools in tools_spec, downloads the right archive for tools version and target platform, if possible. + If not, prints apropriate message to stderr and raise SystemExit() expception. + """ tools_spec = parse_tools_arg(args.tools) - targets = [] # type: list[str] + targets: List[str] = [] # Saving IDFEnv::targets for selected ESP_targets if all tools have been specified if 'required' in tools_spec or 'all' in tools_spec: idf_env_obj = IDFEnv.get_idf_env() @@ -2040,9 +2289,13 @@ def action_download(args): # type: ignore def action_install(args): # type: ignore + """ + Saves current IDF environment and for every tool in tools_spec, installs the tool from the archive downloaded before, if possible. + If not, raises SystemExit. + """ tools_spec = parse_tools_arg(args.tools) - targets = [] # type: list[str] + targets: List[str] = [] # Saving IDFEnv::targets for selected ESP_targets if all tools have been specified if 'required' in tools_spec or 'all' in tools_spec: idf_env_obj = IDFEnv.get_idf_env() @@ -2102,7 +2355,10 @@ def action_install(args): # type: ignore raise SystemExit(1) -def get_wheels_dir(): # type: () -> Optional[str] +def get_wheels_dir() -> Optional[str]: + """ + Gets path for idf-python-wheels package. + """ tools_info = load_tools_info() wheels_package_name = 'idf-python-wheels' if wheels_package_name not in tools_info: @@ -2117,7 +2373,10 @@ def get_wheels_dir(): # type: () -> Optional[str] return wheels_dir -def get_requirements(new_features): # type: (str) -> list[str] +def get_requirements(new_features: str) -> List[str]: + """ + Returns list of path for requirements.txt for given feature list. + """ idf_env_obj = IDFEnv.get_idf_env() features = process_and_check_features(idf_env_obj, new_features) try: @@ -2129,7 +2388,11 @@ def get_requirements(new_features): # type: (str) -> list[str] return [feature_to_requirements_path(feature) for feature in features] -def get_constraints(idf_version, online=True): # type: (str, bool) -> str +def get_constraints(idf_version: str, online: bool = True) -> str: + """ + Download constraints file for specified IDF vversion if it was not downloaded recently (1 day), + check success and place it in constraints file location. + """ idf_download_url = get_idf_download_url_apply_mirrors() constraint_file = 'espidf.constraints.v{}.txt'.format(idf_version) constraint_path = os.path.join(global_idf_tools_path or '', constraint_file) @@ -2177,7 +2440,11 @@ def get_constraints(idf_version, online=True): # type: (str, bool) -> str raise SystemExit(1) -def install_legacy_python_virtualenv(path): # type: (str) -> None +def install_legacy_python_virtualenv(path: str) -> None: + """ + Checks if pip is installed (and installs it if not), checks whether virtualenv is already installed (and in which version), + and finally creates virtual environment with python -m virtualenv . + """ # Before creating the virtual environment, check if pip is installed. try: subprocess.check_call([sys.executable, '-m', 'pip', '--version']) @@ -2221,6 +2488,11 @@ def install_legacy_python_virtualenv(path): # type: (str) -> None def action_install_python_env(args): # type: ignore + """ + (Re)installs python virtual environment. + If Python virtualenv is already installed, checks for errors (missing/corrupted python interpreter, pip...) + and reinstalls if needed. Removes current virtualenv before reinstalling. + """ use_constraints = not args.no_constraints reinstall = args.reinstall idf_python_env_path, _, virtualenv_python, idf_version = get_python_env_path() @@ -2235,7 +2507,7 @@ def action_install_python_env(args): # type: ignore subprocess.check_call([virtualenv_python, '--version'], stdout=sys.stdout, stderr=sys.stderr) except (OSError, subprocess.CalledProcessError): # At this point we can reinstall the virtual environment if it is non-functional. This can happen at least - # when the Python interpreter was removed which was used to create the virtual environment. + # when the Python interpreter which was used to create the virtual environment was removed. reinstall = True try: @@ -2313,6 +2585,10 @@ def action_install_python_env(args): # type: ignore def action_check_python_dependencies(args): # type: ignore + """ + Checks if all the dependencies (from requirements, constraints...) are installed properly. + Raises SystemExit if not. + """ use_constraints = not args.no_constraints req_paths = get_requirements('') # no new features -> just detect the existing ones @@ -2360,15 +2636,18 @@ class ChecksumCalculator(): """ A class used to get size/checksum/basename of local artifact files. """ - def __init__(self, files): # type: (list[str]) -> None + def __init__(self, files: List[str]) -> None: self.files = files - def __iter__(self): # type: () -> Iterator[Tuple[int, str, str]] + def __iter__(self) -> Iterator[Tuple[int, str, str]]: for f in self.files: yield (*get_file_size_sha256(f), os.path.basename(f)) class ChecksumParsingError(RuntimeError): + """ + Raised when checksum cannot be parsed. + """ pass @@ -2379,7 +2658,7 @@ class ChecksumFileParser(): * ... (2 lines for every artifact) ... """ - def __init__(self, tool_name, url): # type: (str, str) -> None + def __init__(self, tool_name: str, url: str) -> None: self.tool_name = tool_name sha256_file_tmp = os.path.join(global_idf_tools_path or '', 'tools', 'add-version.sha256.tmp') @@ -2397,7 +2676,7 @@ class ChecksumFileParser(): if os.path.isfile(sha256_file_tmp): os.remove(sha256_file_tmp) - def parseLine(self, regex, line): # type: (str, str) -> str + def parseLine(self, regex: str, line: str) -> str: match = re.search(regex, line) if not match: raise ChecksumParsingError(f'Can not parse line "{line}" with regex "{regex}"') @@ -2405,7 +2684,7 @@ class ChecksumFileParser(): # parse checksum file with formatting used by crosstool-ng, gdb, ... releases # e.g. https://github.com/espressif/crosstool-NG/releases/download/esp-2021r2/crosstool-NG-esp-2021r2-checksum.sha256 - def __iter__(self): # type: () -> Iterator[Tuple[int, str, str]] + def __iter__(self) -> Iterator[Tuple[int, str, str]]: try: for bytes_str, hash_str in zip(self.checksum[0::2], self.checksum[1::2]): bytes_filename = self.parseLine(r'^# (\S*):', bytes_str) @@ -2426,7 +2705,10 @@ class ChecksumFileParser(): raise SystemExit(1) -def action_add_version(args): # type: ignore +def action_add_version(args: Any) -> None: + """ + Adds new version of the tool to IDFTool entry together with download entry and updating json dump. + """ tools_info = load_tools_info() tool_name = args.tool tool_obj = tools_info.get(tool_name) @@ -2447,7 +2729,9 @@ def action_add_version(args): # type: ignore version_obj = IDFToolVersion(version, version_status) tool_obj.versions[version] = version_obj url_prefix = args.url_prefix or 'https://%s/' % TODO_MESSAGE - checksum_info = ChecksumFileParser(tool_name, args.checksum_file) if args.checksum_file else ChecksumCalculator(args.artifact_file) + checksum_info: ChecksumFileParser = (ChecksumFileParser(tool_name, args.checksum_file) + if args.checksum_file + else ChecksumCalculator(args.artifact_file)) # type: ignore for file_size, file_sha256, file_name in checksum_info: # Guess which platform this file is for found_platform = Platforms.get_by_filename(file_name) @@ -2462,7 +2746,7 @@ def action_add_version(args): # type: ignore version_obj.add_download(found_platform, url, file_size, file_sha256) json_str = dump_tools_json(tools_info) if not args.output: - args.output = os.path.join(global_idf_path, TOOLS_FILE_NEW) + args.output = os.path.join(global_idf_path, TOOLS_FILE_NEW) # type: ignore with open(args.output, 'w') as f: f.write(json_str) f.write('\n') @@ -2470,6 +2754,9 @@ def action_add_version(args): # type: ignore def action_rewrite(args): # type: ignore + """ + Write JSON dump of all tools from tools_info into file specified by args.output. + """ tools_info = load_tools_info() json_str = dump_tools_json(tools_info) if not args.output: @@ -2480,11 +2767,11 @@ def action_rewrite(args): # type: ignore info('Wrote output to {}'.format(args.output)) -def action_uninstall(args): # type: (Any) -> None - """ Print or remove installed tools versions, that are not used by active ESP-IDF version anymore. +def action_uninstall(args: Any) -> None: + """ + Print or remove installed tools versions, that are not used by active ESP-IDF version anymore. Additionally remove all older versions of previously downloaded archives. """ - tools_info = load_tools_info() tools_path = os.path.join(global_idf_tools_path or '', 'tools') dist_path = os.path.join(global_idf_tools_path or '', 'dist') @@ -2558,6 +2845,9 @@ def action_uninstall(args): # type: (Any) -> None def action_validate(args): # type: ignore + """ + Validate whether everything is installed correctly. + """ try: import jsonschema except ImportError: @@ -2574,10 +2864,13 @@ def action_validate(args): # type: ignore def action_gen_doc(args): # type: ignore + """ + Prints basic documentation and information about every tool from tool_info. + """ f = args.output tools_info = load_tools_info() - def print_out(text): # type: (str) -> None + def print_out(text: str) -> None: f.write(text + '\n') print_out('.. |zwsp| unicode:: U+200B') @@ -2654,10 +2947,10 @@ More info: {info_url} print_out('') -def action_check_tool_supported(args): # type: (Any) -> None +def action_check_tool_supported(args: Any) -> None: """ - Print "True"/"False" to stdout as a result that tool is supported in IDF - Print erorr message to stderr otherwise and set exit code to 1 + Print "True"/"False" to stdout as a result that tool is supported in IDF. + Print error message to stderr otherwise and set exit code to 1. """ try: tools_info = load_tools_info() @@ -2665,16 +2958,16 @@ def action_check_tool_supported(args): # type: (Any) -> None if v.name == args.tool_name: print(v.check_version(args.exec_path)) break - except (RuntimeError, ToolNotFound, ToolExecError) as err: + except (RuntimeError, ToolNotFoundError, ToolExecError) as err: fatal(f'Failed to check tool support: (name: {args.tool_name}, exec: {args.exec_path})') fatal(f'{err}') raise SystemExit(1) -def action_get_tool_supported_versions(args): # type: (Any) -> None +def action_get_tool_supported_versions(args: Any) -> None: """ - Print supported versions of a tool to stdout - Print erorr message to stderr otherwise and set exit code to 1 + Print supported versions of a tool to stdout. + Print error message to stderr otherwise and set exit code to 1. """ try: tools_info = load_tools_info() @@ -2688,7 +2981,7 @@ def action_get_tool_supported_versions(args): # type: (Any) -> None raise SystemExit(1) -def main(argv): # type: (list[str]) -> None +def main(argv: List[str]) -> None: parser = argparse.ArgumentParser() parser.add_argument('--quiet', help='Don\'t output diagnostic messages to stdout/stderr', action='store_true')