tools: Add Python typing annotations for idf_tools.py

This commit is contained in:
Roland Dobai
2021-03-04 09:53:47 +01:00
parent 907ea44ee9
commit 191c4ccc01
2 changed files with 139 additions and 127 deletions

View File

@@ -239,7 +239,6 @@ tools/idf_py_actions/serial_ext.py
tools/idf_py_actions/tools.py tools/idf_py_actions/tools.py
tools/idf_py_actions/uf2_ext.py tools/idf_py_actions/uf2_ext.py
tools/idf_size.py tools/idf_size.py
tools/idf_tools.py
tools/idf.py tools/idf.py
tools/kconfig_new/confgen.py tools/kconfig_new/confgen.py
tools/kconfig_new/confserver.py tools/kconfig_new/confserver.py

View File

@@ -55,24 +55,30 @@ import ssl
import subprocess import subprocess
import sys import sys
import tarfile import tarfile
import zipfile
from collections import OrderedDict, namedtuple from collections import OrderedDict, namedtuple
from ssl import SSLContext # noqa: F401
from tarfile import TarFile # noqa: F401
from zipfile import ZipFile
try: try:
import typing # noqa: F401 from typing import IO, Callable, Optional, Tuple, Union # noqa: F401
except ImportError: except ImportError:
pass pass
try: try:
from urllib.error import ContentTooShortError from urllib.error import ContentTooShortError
from urllib.request import urlopen from urllib.request import urlopen
# the following is only for typing annotation
from urllib.response import addinfourl # noqa: F401
except ImportError: except ImportError:
from urllib import ContentTooShortError, urlopen # Python 2
from urllib import ContentTooShortError, urlopen # type: ignore
try: try:
from exceptions import WindowsError from exceptions import WindowsError
except ImportError: except ImportError:
class WindowsError(OSError): # Unix
class WindowsError(OSError): # type: ignore
pass pass
@@ -181,22 +187,22 @@ emyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc=
global_quiet = False global_quiet = False
global_non_interactive = False global_non_interactive = False
global_idf_path = None # type: typing.Optional[str] global_idf_path = None # type: Optional[str]
global_idf_tools_path = None # type: typing.Optional[str] global_idf_tools_path = None # type: Optional[str]
global_tools_json = None # type: typing.Optional[str] global_tools_json = None # type: Optional[str]
def fatal(text, *args): def fatal(text, *args): # type: (str, str) -> None
if not global_quiet: if not global_quiet:
sys.stderr.write('ERROR: ' + text + '\n', *args) sys.stderr.write('ERROR: ' + text + '\n', *args)
def warn(text, *args): def warn(text, *args): # type: (str, str) -> None
if not global_quiet: if not global_quiet:
sys.stderr.write('WARNING: ' + text + '\n', *args) sys.stderr.write('WARNING: ' + text + '\n', *args)
def info(text, f=None, *args): def info(text, f=None, *args): # type: (str, Optional[IO[str]], str) -> None
if not global_quiet: if not global_quiet:
if f is None: if f is None:
f = sys.stdout f = sys.stdout
@@ -204,6 +210,7 @@ def info(text, f=None, *args):
def run_cmd_check_output(cmd, input_text=None, extra_paths=None): def run_cmd_check_output(cmd, input_text=None, extra_paths=None):
# type: (list[str], Optional[str], Optional[list[str]]) -> bytes
# If extra_paths is given, locate the executable in one of these directories. # If extra_paths is given, locate the executable in one of these directories.
# Note: it would seem logical to add extra_paths to env[PATH], instead, and let OS do the job of finding the # Note: it would seem logical to add extra_paths to env[PATH], instead, and let OS do the job of finding the
# executable for us. However this does not work on Windows: https://bugs.python.org/issue8557. # executable for us. However this does not work on Windows: https://bugs.python.org/issue8557.
@@ -223,13 +230,14 @@ def run_cmd_check_output(cmd, input_text=None, extra_paths=None):
break break
try: try:
input_bytes = None
if input_text: if input_text:
input_text = input_text.encode() input_bytes = input_text.encode()
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, input=input_text) result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, input=input_bytes)
return result.stdout + result.stderr return result.stdout + result.stderr
except (AttributeError, TypeError): except (AttributeError, TypeError):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input_text) stdout, stderr = p.communicate(input_bytes)
if p.returncode != 0: if p.returncode != 0:
try: try:
raise subprocess.CalledProcessError(p.returncode, cmd, stdout, stderr) raise subprocess.CalledProcessError(p.returncode, cmd, stdout, stderr)
@@ -238,7 +246,7 @@ def run_cmd_check_output(cmd, input_text=None, extra_paths=None):
return stdout + stderr return stdout + stderr
def to_shell_specific_paths(paths_list): def to_shell_specific_paths(paths_list): # type: (list[str]) -> list[str]
if sys.platform == 'win32': if sys.platform == 'win32':
paths_list = [p.replace('/', os.path.sep) if os.path.sep in p else p for p in paths_list] paths_list = [p.replace('/', os.path.sep) if os.path.sep in p else p for p in paths_list]
@@ -250,7 +258,7 @@ def to_shell_specific_paths(paths_list):
return paths_list return paths_list
def get_env_for_extra_paths(extra_paths): def get_env_for_extra_paths(extra_paths): # type: (list[str]) -> dict[str, str]
""" """
Return a copy of environment variables dict, prepending paths listed in extra_paths Return a copy of environment variables dict, prepending paths listed in extra_paths
to the PATH environment variable. to the PATH environment variable.
@@ -258,13 +266,13 @@ def get_env_for_extra_paths(extra_paths):
env_arg = os.environ.copy() env_arg = os.environ.copy()
new_path = os.pathsep.join(extra_paths) + os.pathsep + env_arg['PATH'] new_path = os.pathsep.join(extra_paths) + os.pathsep + env_arg['PATH']
if sys.version_info.major == 2: if sys.version_info.major == 2:
env_arg['PATH'] = new_path.encode('utf8') env_arg['PATH'] = new_path.encode('utf8') # type: ignore
else: else:
env_arg['PATH'] = new_path env_arg['PATH'] = new_path
return env_arg return env_arg
def get_file_size_sha256(filename, block_size=65536): def get_file_size_sha256(filename, block_size=65536): # type: (str, int) -> Tuple[int, str]
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
size = 0 size = 0
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
@@ -274,14 +282,14 @@ def get_file_size_sha256(filename, block_size=65536):
return size, sha256.hexdigest() return size, sha256.hexdigest()
def report_progress(count, block_size, total_size): def report_progress(count, block_size, total_size): # type: (int, int, int) -> None
percent = int(count * block_size * 100 / total_size) percent = int(count * block_size * 100 / total_size)
percent = min(100, percent) percent = min(100, percent)
sys.stdout.write('\r%d%%' % percent) sys.stdout.write('\r%d%%' % percent)
sys.stdout.flush() sys.stdout.flush()
def mkdir_p(path): def mkdir_p(path): # type: (str) -> None
try: try:
os.makedirs(path) os.makedirs(path)
except OSError as exc: except OSError as exc:
@@ -289,12 +297,12 @@ def mkdir_p(path):
raise raise
def unpack(filename, destination): def unpack(filename, destination): # type: (str, str) -> None
info('Extracting {0} to {1}'.format(filename, destination)) info('Extracting {0} to {1}'.format(filename, destination))
if filename.endswith(('.tar.gz', '.tgz')): if filename.endswith(('.tar.gz', '.tgz')):
archive_obj = tarfile.open(filename, 'r:gz') archive_obj = tarfile.open(filename, 'r:gz') # type: Union[TarFile, ZipFile]
elif filename.endswith('zip'): elif filename.endswith('zip'):
archive_obj = zipfile.ZipFile(filename) archive_obj = ZipFile(filename)
else: else:
raise NotImplementedError('Unsupported archive type') raise NotImplementedError('Unsupported archive type')
if sys.version_info.major == 2: if sys.version_info.major == 2:
@@ -304,7 +312,7 @@ def unpack(filename, destination):
archive_obj.extractall(destination) archive_obj.extractall(destination)
def splittype(url): def splittype(url): # type: (str) -> Tuple[Optional[str], str]
match = re.match('([^/:]+):(.*)', url, re.DOTALL) match = re.match('([^/:]+):(.*)', url, re.DOTALL)
if match: if match:
scheme, data = match.groups() scheme, data = match.groups()
@@ -314,13 +322,14 @@ def splittype(url):
# An alternative version of urlretrieve which takes SSL context as an argument # An alternative version of urlretrieve which takes SSL context as an argument
def urlretrieve_ctx(url, filename, reporthook=None, data=None, context=None): def urlretrieve_ctx(url, filename, reporthook=None, data=None, context=None):
# type: (str, str, Optional[Callable[[int, int, int], None]], Optional[bytes], Optional[SSLContext]) -> Tuple[str, addinfourl]
url_type, path = splittype(url) url_type, path = splittype(url)
# urlopen doesn't have context argument in Python <=2.7.9 # urlopen doesn't have context argument in Python <=2.7.9
extra_urlopen_args = {} extra_urlopen_args = {}
if context: if context:
extra_urlopen_args['context'] = context extra_urlopen_args['context'] = context
with contextlib.closing(urlopen(url, data, **extra_urlopen_args)) as fp: with contextlib.closing(urlopen(url, data, **extra_urlopen_args)) as fp: # type: ignore
headers = fp.info() headers = fp.info()
# Just return the local path and the "headers" for file:// # Just return the local path and the "headers" for file://
@@ -364,7 +373,7 @@ def urlretrieve_ctx(url, filename, reporthook=None, data=None, context=None):
# https://github.com/espressif/esp-idf/issues/3819#issuecomment-515167118 # https://github.com/espressif/esp-idf/issues/3819#issuecomment-515167118
# https://github.com/espressif/esp-idf/issues/4063#issuecomment-531490140 # https://github.com/espressif/esp-idf/issues/4063#issuecomment-531490140
# https://stackoverflow.com/a/43046729 # https://stackoverflow.com/a/43046729
def rename_with_retry(path_from, path_to): def rename_with_retry(path_from, path_to): # type: (str, str) -> None
if sys.platform.startswith('win'): if sys.platform.startswith('win'):
retry_count = 100 retry_count = 100
else: else:
@@ -380,7 +389,7 @@ def rename_with_retry(path_from, path_to):
warn('Rename {} to {} failed, retrying...'.format(path_from, path_to)) warn('Rename {} to {} failed, retrying...'.format(path_from, path_to))
def strip_container_dirs(path, levels): def strip_container_dirs(path, levels): # type: (str, int) -> None
assert levels > 0 assert levels > 0
# move the original directory out of the way (add a .tmp suffix) # move the original directory out of the way (add a .tmp suffix)
tmp_path = path + '.tmp' tmp_path = path + '.tmp'
@@ -419,7 +428,7 @@ class DownloadError(RuntimeError):
class IDFToolDownload(object): class IDFToolDownload(object):
def __init__(self, platform_name, url, size, sha256): def __init__(self, platform_name, url, size, sha256): # type: (str, str, int, str) -> None
self.platform_name = platform_name self.platform_name = platform_name
self.url = url self.url = url
self.size = size self.size = size
@@ -435,13 +444,13 @@ class IDFToolVersion(object):
STATUS_VALUES = [STATUS_RECOMMENDED, STATUS_SUPPORTED, STATUS_DEPRECATED] STATUS_VALUES = [STATUS_RECOMMENDED, STATUS_SUPPORTED, STATUS_DEPRECATED]
def __init__(self, version, status): def __init__(self, version, status): # type: (str, str) -> None
self.version = version self.version = version
self.status = status self.status = status
self.downloads = OrderedDict() self.downloads = OrderedDict() # type: OrderedDict[str, IDFToolDownload]
self.latest = False self.latest = False
def __lt__(self, other): def __lt__(self, other): # type: (IDFToolVersion) -> bool
if self.status != other.status: if self.status != other.status:
return self.status > other.status return self.status > other.status
else: else:
@@ -449,13 +458,15 @@ class IDFToolVersion(object):
and other.status == IDFToolVersion.STATUS_RECOMMENDED) and other.status == IDFToolVersion.STATUS_RECOMMENDED)
return self.version < other.version return self.version < other.version
def __eq__(self, other): def __eq__(self, other): # type: (object) -> bool
if not isinstance(other, IDFToolVersion):
return NotImplemented
return self.status == other.status and self.version == other.version return self.status == other.status and self.version == other.version
def add_download(self, platform_name, url, size, sha256): def add_download(self, platform_name, url, size, sha256): # type: (str, str, int, str) -> None
self.downloads[platform_name] = IDFToolDownload(platform_name, url, size, sha256) self.downloads[platform_name] = IDFToolDownload(platform_name, url, size, sha256)
def get_download_for_platform(self, platform_name): # type: (str) -> IDFToolDownload def get_download_for_platform(self, platform_name): # type: (str) -> Optional[IDFToolDownload]
if platform_name in PLATFORM_FROM_NAME.keys(): if platform_name in PLATFORM_FROM_NAME.keys():
platform_name = PLATFORM_FROM_NAME[platform_name] platform_name = PLATFORM_FROM_NAME[platform_name]
if platform_name in self.downloads.keys(): if platform_name in self.downloads.keys():
@@ -465,9 +476,10 @@ class IDFToolVersion(object):
return None return None
def compatible_with_platform(self, platform_name=PYTHON_PLATFORM): def compatible_with_platform(self, platform_name=PYTHON_PLATFORM):
# type: (str) -> bool
return self.get_download_for_platform(platform_name) is not None return self.get_download_for_platform(platform_name) is not None
def get_supported_platforms(self): # type: () -> typing.Set[str] def get_supported_platforms(self): # type: () -> set[str]
return set(self.downloads.keys()) return set(self.downloads.keys())
@@ -481,7 +493,7 @@ OPTIONS_LIST = ['version_cmd',
'license', 'license',
'strip_container_dirs'] 'strip_container_dirs']
IDFToolOptions = namedtuple('IDFToolOptions', OPTIONS_LIST) IDFToolOptions = namedtuple('IDFToolOptions', OPTIONS_LIST) # type: ignore
class IDFTool(object): class IDFTool(object):
@@ -492,16 +504,17 @@ class IDFTool(object):
def __init__(self, name, description, install, info_url, license, version_cmd, version_regex, version_regex_replace=None, def __init__(self, name, description, install, info_url, license, version_cmd, version_regex, version_regex_replace=None,
strip_container_dirs=0): strip_container_dirs=0):
# type: (str, str, str, str, str, list[str], str, Optional[str], int) -> None
self.name = name self.name = name
self.description = description self.description = description
self.versions = OrderedDict() # type: typing.Dict[str, IDFToolVersion] self.versions = OrderedDict() # type: dict[str, IDFToolVersion]
self.version_in_path = None self.version_in_path = None # type: Optional[str]
self.versions_installed = [] self.versions_installed = [] # type: list[str]
if version_regex_replace is None: if version_regex_replace is None:
version_regex_replace = VERSION_REGEX_REPLACE_DEFAULT version_regex_replace = VERSION_REGEX_REPLACE_DEFAULT
self.options = IDFToolOptions(version_cmd, version_regex, version_regex_replace, self.options = IDFToolOptions(version_cmd, version_regex, version_regex_replace,
[], OrderedDict(), install, info_url, license, strip_container_dirs) [], OrderedDict(), install, info_url, license, strip_container_dirs) # type: ignore
self.platform_overrides = [] self.platform_overrides = [] # type: list[dict[str, str]]
self._platform = CURRENT_PLATFORM self._platform = CURRENT_PLATFORM
self._update_current_options() self._update_current_options()
@@ -511,38 +524,38 @@ class IDFTool(object):
result._update_current_options() result._update_current_options()
return result return result
def _update_current_options(self): def _update_current_options(self): # type: () -> None
self._current_options = IDFToolOptions(*self.options) self._current_options = IDFToolOptions(*self.options)
for override in self.platform_overrides: for override in self.platform_overrides:
if self._platform not in override['platforms']: if self._platform not in override['platforms']:
continue continue
override_dict = override.copy() override_dict = override.copy()
del override_dict['platforms'] del override_dict['platforms']
self._current_options = self._current_options._replace(**override_dict) self._current_options = self._current_options._replace(**override_dict) # type: ignore
def add_version(self, version): def add_version(self, version): # type: (IDFToolVersion) -> None
assert(type(version) is IDFToolVersion) assert(type(version) is IDFToolVersion)
self.versions[version.version] = version self.versions[version.version] = version
def get_path(self): # type: () -> str def get_path(self): # type: () -> str
return os.path.join(global_idf_tools_path, 'tools', self.name) return os.path.join(global_idf_tools_path, 'tools', self.name) # type: ignore
def get_path_for_version(self, version): # type: (str) -> str def get_path_for_version(self, version): # type: (str) -> str
assert(version in self.versions) assert(version in self.versions)
return os.path.join(self.get_path(), version) return os.path.join(self.get_path(), version)
def get_export_paths(self, version): # type: (str) -> typing.List[str] def get_export_paths(self, version): # type: (str) -> list[str]
tool_path = self.get_path_for_version(version) tool_path = self.get_path_for_version(version)
return [os.path.join(tool_path, *p) for p in self._current_options.export_paths] return [os.path.join(tool_path, *p) for p in self._current_options.export_paths] # type: ignore
def get_export_vars(self, version): # type: (str) -> typing.Dict[str] def get_export_vars(self, version): # type: (str) -> dict[str, str]
""" """
Get the dictionary of environment variables to be exported, for the given version. Get the dictionary of environment variables to be exported, for the given version.
Expands: Expands:
- ${TOOL_PATH} => the actual path where the version is installed - ${TOOL_PATH} => the actual path where the version is installed
""" """
result = {} result = {}
for k, v in self._current_options.export_vars.items(): for k, v in self._current_options.export_vars.items(): # type: ignore
replace_path = self.get_path_for_version(version).replace('\\', '\\\\') replace_path = self.get_path_for_version(version).replace('\\', '\\\\')
v_repl = re.sub(SUBST_TOOL_PATH_REGEX, replace_path, v) v_repl = re.sub(SUBST_TOOL_PATH_REGEX, replace_path, v)
if v_repl != v: if v_repl != v:
@@ -550,7 +563,7 @@ class IDFTool(object):
result[k] = v_repl result[k] = v_repl
return result return result
def check_version(self, extra_paths=None): # type: (typing.Optional[typing.List[str]]) -> str def check_version(self, extra_paths=None): # type: (Optional[list[str]]) -> str
""" """
Execute the tool, optionally prepending extra_paths to PATH, Execute the tool, optionally prepending extra_paths to PATH,
extract the version string and return it as a result. extract the version string and return it as a result.
@@ -561,7 +574,7 @@ class IDFTool(object):
""" """
# this function can not be called for a different platform # this function can not be called for a different platform
assert self._platform == CURRENT_PLATFORM assert self._platform == CURRENT_PLATFORM
cmd = self._current_options.version_cmd cmd = self._current_options.version_cmd # type: ignore
try: try:
version_cmd_result = run_cmd_check_output(cmd, None, extra_paths) version_cmd_result = run_cmd_check_output(cmd, None, extra_paths)
except OSError: except OSError:
@@ -569,27 +582,27 @@ class IDFTool(object):
raise ToolNotFound('Tool {} not found'.format(self.name)) raise ToolNotFound('Tool {} not found'.format(self.name))
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise ToolExecError('Command {} has returned non-zero exit code ({})\n'.format( raise ToolExecError('Command {} has returned non-zero exit code ({})\n'.format(
' '.join(self._current_options.version_cmd), e.returncode)) ' '.join(self._current_options.version_cmd), e.returncode)) # type: ignore
in_str = version_cmd_result.decode('utf-8') in_str = version_cmd_result.decode('utf-8')
match = re.search(self._current_options.version_regex, in_str) match = re.search(self._current_options.version_regex, in_str) # type: ignore
if not match: if not match:
return UNKNOWN_VERSION return UNKNOWN_VERSION
return re.sub(self._current_options.version_regex, self._current_options.version_regex_replace, match.group(0)) return re.sub(self._current_options.version_regex, self._current_options.version_regex_replace, match.group(0)) # type: ignore
def get_install_type(self): def get_install_type(self): # type: () -> Callable[[str], None]
return self._current_options.install return self._current_options.install # type: ignore
def compatible_with_platform(self): def compatible_with_platform(self): # type: () -> bool
return any([v.compatible_with_platform() for v in self.versions.values()]) return any([v.compatible_with_platform() for v in self.versions.values()])
def get_supported_platforms(self): # type: () -> typing.Set[str] def get_supported_platforms(self): # type: () -> set[str]
result = set() result = set()
for v in self.versions.values(): for v in self.versions.values():
result.update(v.get_supported_platforms()) result.update(v.get_supported_platforms())
return result return result
def get_recommended_version(self): def get_recommended_version(self): # type: () -> Optional[str]
recommended_versions = [k for k, v in self.versions.items() recommended_versions = [k for k, v in self.versions.items()
if v.status == IDFToolVersion.STATUS_RECOMMENDED if v.status == IDFToolVersion.STATUS_RECOMMENDED
and v.compatible_with_platform(self._platform)] and v.compatible_with_platform(self._platform)]
@@ -598,7 +611,7 @@ class IDFTool(object):
return recommended_versions[0] return recommended_versions[0]
return None return None
def get_preferred_installed_version(self): def get_preferred_installed_version(self): # type: () -> Optional[str]
recommended_versions = [k for k in self.versions_installed recommended_versions = [k for k in self.versions_installed
if self.versions[k].status == IDFToolVersion.STATUS_RECOMMENDED if self.versions[k].status == IDFToolVersion.STATUS_RECOMMENDED
and self.versions[k].compatible_with_platform(self._platform)] and self.versions[k].compatible_with_platform(self._platform)]
@@ -607,7 +620,7 @@ class IDFTool(object):
return recommended_versions[0] return recommended_versions[0]
return None return None
def find_installed_versions(self): def find_installed_versions(self): # type: () -> None
""" """
Checks whether the tool can be found in PATH and in global_idf_tools_path. Checks whether the tool can be found in PATH and in global_idf_tools_path.
Writes results to self.version_in_path and self.versions_installed. Writes results to self.version_in_path and self.versions_installed.
@@ -649,7 +662,7 @@ class IDFTool(object):
else: else:
self.versions_installed.append(version) self.versions_installed.append(version)
def download(self, version): def download(self, version): # type: (str) -> None
assert(version in self.versions) assert(version in self.versions)
download_obj = self.versions[version].get_download_for_platform(self._platform) download_obj = self.versions[version].get_download_for_platform(self._platform)
if not download_obj: if not download_obj:
@@ -658,7 +671,7 @@ class IDFTool(object):
url = download_obj.url url = download_obj.url
archive_name = os.path.basename(url) archive_name = os.path.basename(url)
local_path = os.path.join(global_idf_tools_path, 'dist', archive_name) local_path = os.path.join(global_idf_tools_path, 'dist', archive_name) # type: ignore
mkdir_p(os.path.dirname(local_path)) mkdir_p(os.path.dirname(local_path))
if os.path.isfile(local_path): if os.path.isfile(local_path):
@@ -703,14 +716,14 @@ class IDFTool(object):
fatal('Failed to download, and retry count has expired') fatal('Failed to download, and retry count has expired')
raise DownloadError() raise DownloadError()
def install(self, version): def install(self, version): # type: (str) -> None
# Currently this is called after calling 'download' method, so here are a few asserts # Currently this is called after calling 'download' method, so here are a few asserts
# for the conditions which should be true once that method is done. # for the conditions which should be true once that method is done.
assert (version in self.versions) assert (version in self.versions)
download_obj = self.versions[version].get_download_for_platform(self._platform) download_obj = self.versions[version].get_download_for_platform(self._platform)
assert (download_obj is not None) assert (download_obj is not None)
archive_name = os.path.basename(download_obj.url) archive_name = os.path.basename(download_obj.url)
archive_path = os.path.join(global_idf_tools_path, 'dist', archive_name) archive_path = os.path.join(global_idf_tools_path, 'dist', archive_name) # type: ignore
assert (os.path.isfile(archive_path)) assert (os.path.isfile(archive_path))
dest_dir = self.get_path_for_version(version) dest_dir = self.get_path_for_version(version)
if os.path.exists(dest_dir): if os.path.exists(dest_dir):
@@ -718,11 +731,11 @@ class IDFTool(object):
shutil.rmtree(dest_dir) shutil.rmtree(dest_dir)
mkdir_p(dest_dir) mkdir_p(dest_dir)
unpack(archive_path, dest_dir) unpack(archive_path, dest_dir)
if self._current_options.strip_container_dirs: if self._current_options.strip_container_dirs: # type: ignore
strip_container_dirs(dest_dir, self._current_options.strip_container_dirs) strip_container_dirs(dest_dir, self._current_options.strip_container_dirs) # type: ignore
@staticmethod @staticmethod
def check_download_file(download_obj, local_path): def check_download_file(download_obj, local_path): # type: (IDFToolDownload, str) -> bool
expected_sha256 = download_obj.sha256 expected_sha256 = download_obj.sha256
expected_size = download_obj.size expected_size = download_obj.size
file_size, file_sha256 = get_file_size_sha256(local_path) file_size, file_sha256 = get_file_size_sha256(local_path)
@@ -735,16 +748,16 @@ class IDFTool(object):
return True return True
@classmethod @classmethod
def from_json(cls, tool_dict): def from_json(cls, tool_dict): # type: (dict[str, Union[str, list[str], dict[str, str]]]) -> IDFTool
# json.load will return 'str' types in Python 3 and 'unicode' in Python 2 # json.load will return 'str' types in Python 3 and 'unicode' in Python 2
expected_str_type = type(u'') expected_str_type = type(u'')
# Validate json fields # Validate json fields
tool_name = tool_dict.get('name') tool_name = tool_dict.get('name') # type: ignore
if type(tool_name) is not expected_str_type: if type(tool_name) is not expected_str_type:
raise RuntimeError('tool_name is not a string') raise RuntimeError('tool_name is not a string')
description = tool_dict.get('description') description = tool_dict.get('description') # type: ignore
if type(description) is not expected_str_type: if type(description) is not expected_str_type:
raise RuntimeError('description is not a string') raise RuntimeError('description is not a string')
@@ -764,7 +777,7 @@ class IDFTool(object):
if type(export_paths) is not list: if type(export_paths) is not list:
raise RuntimeError('export_paths for tool %s is not a list' % tool_name) raise RuntimeError('export_paths for tool %s is not a list' % tool_name)
export_vars = tool_dict.get('export_vars', {}) export_vars = tool_dict.get('export_vars', {}) # type: ignore
if type(export_vars) is not dict: if type(export_vars) is not dict:
raise RuntimeError('export_vars for tool %s is not a mapping' % tool_name) raise RuntimeError('export_vars for tool %s is not a mapping' % tool_name)
@@ -772,15 +785,15 @@ class IDFTool(object):
if type(versions) is not list: if type(versions) is not list:
raise RuntimeError('versions for tool %s is not an array' % tool_name) raise RuntimeError('versions for tool %s is not an array' % tool_name)
install = tool_dict.get('install', False) install = tool_dict.get('install', False) # type: ignore
if type(install) is not expected_str_type: if type(install) is not expected_str_type:
raise RuntimeError('install for tool %s is not a string' % tool_name) raise RuntimeError('install for tool %s is not a string' % tool_name)
info_url = tool_dict.get('info_url', False) info_url = tool_dict.get('info_url', False) # type: ignore
if type(info_url) is not expected_str_type: if type(info_url) is not expected_str_type:
raise RuntimeError('info_url for tool %s is not a string' % tool_name) raise RuntimeError('info_url for tool %s is not a string' % tool_name)
license = tool_dict.get('license', False) license = tool_dict.get('license', False) # type: ignore
if type(license) is not expected_str_type: if type(license) is not expected_str_type:
raise RuntimeError('license for tool %s is not a string' % tool_name) raise RuntimeError('license for tool %s is not a string' % tool_name)
@@ -788,67 +801,67 @@ class IDFTool(object):
if strip_container_dirs and type(strip_container_dirs) is not int: if strip_container_dirs and type(strip_container_dirs) is not int:
raise RuntimeError('strip_container_dirs for tool %s is not an int' % tool_name) raise RuntimeError('strip_container_dirs for tool %s is not an int' % tool_name)
overrides_list = tool_dict.get('platform_overrides', []) overrides_list = tool_dict.get('platform_overrides', []) # type: ignore
if type(overrides_list) is not list: if type(overrides_list) is not list:
raise RuntimeError('platform_overrides for tool %s is not a list' % tool_name) raise RuntimeError('platform_overrides for tool %s is not a list' % tool_name)
# Create the object # Create the object
tool_obj = cls(tool_name, description, install, info_url, license, tool_obj = cls(tool_name, description, install, info_url, license, # type: ignore
version_cmd, version_regex, version_regex_replace, version_cmd, version_regex, version_regex_replace, # type: ignore
strip_container_dirs) strip_container_dirs) # type: ignore
for path in export_paths: for path in export_paths: # type: ignore
tool_obj.options.export_paths.append(path) tool_obj.options.export_paths.append(path) # type: ignore
for name, value in export_vars.items(): for name, value in export_vars.items(): # type: ignore
tool_obj.options.export_vars[name] = value tool_obj.options.export_vars[name] = value # type: ignore
for index, override in enumerate(overrides_list): for index, override in enumerate(overrides_list):
platforms_list = override.get('platforms') platforms_list = override.get('platforms') # type: ignore
if type(platforms_list) is not list: if type(platforms_list) is not list:
raise RuntimeError('platforms for override %d of tool %s is not a list' % (index, tool_name)) raise RuntimeError('platforms for override %d of tool %s is not a list' % (index, tool_name))
install = override.get('install') install = override.get('install') # type: ignore
if install is not None and type(install) is not expected_str_type: if install is not None and type(install) is not expected_str_type:
raise RuntimeError('install for override %d of tool %s is not a string' % (index, tool_name)) raise RuntimeError('install for override %d of tool %s is not a string' % (index, tool_name))
version_cmd = override.get('version_cmd') version_cmd = override.get('version_cmd') # type: ignore
if version_cmd is not None and type(version_cmd) is not list: if version_cmd is not None and type(version_cmd) is not list:
raise RuntimeError('version_cmd for override %d of tool %s is not a list of strings' % raise RuntimeError('version_cmd for override %d of tool %s is not a list of strings' %
(index, tool_name)) (index, tool_name))
version_regex = override.get('version_regex') version_regex = override.get('version_regex') # type: ignore
if version_regex is not None and (type(version_regex) is not expected_str_type or not version_regex): if version_regex is not None and (type(version_regex) is not expected_str_type or not version_regex):
raise RuntimeError('version_regex for override %d of tool %s is not a non-empty string' % raise RuntimeError('version_regex for override %d of tool %s is not a non-empty string' %
(index, tool_name)) (index, tool_name))
version_regex_replace = override.get('version_regex_replace') version_regex_replace = override.get('version_regex_replace') # type: ignore
if version_regex_replace is not None and type(version_regex_replace) is not expected_str_type: if version_regex_replace is not None and type(version_regex_replace) is not expected_str_type:
raise RuntimeError('version_regex_replace for override %d of tool %s is not a string' % raise RuntimeError('version_regex_replace for override %d of tool %s is not a string' %
(index, tool_name)) (index, tool_name))
export_paths = override.get('export_paths') export_paths = override.get('export_paths') # type: ignore
if export_paths is not None and type(export_paths) is not list: if export_paths is not None and type(export_paths) is not list:
raise RuntimeError('export_paths for override %d of tool %s is not a list' % (index, tool_name)) raise RuntimeError('export_paths for override %d of tool %s is not a list' % (index, tool_name))
export_vars = override.get('export_vars') export_vars = override.get('export_vars') # type: ignore
if export_vars is not None and type(export_vars) is not dict: if export_vars is not None and type(export_vars) is not dict:
raise RuntimeError('export_vars for override %d of tool %s is not a mapping' % (index, tool_name)) raise RuntimeError('export_vars for override %d of tool %s is not a mapping' % (index, tool_name))
tool_obj.platform_overrides.append(override) tool_obj.platform_overrides.append(override) # type: ignore
recommended_versions = {} recommended_versions = {} # type: dict[str, list[str]]
for version_dict in versions: for version_dict in versions: # type: ignore
version = version_dict.get('name') version = version_dict.get('name') # type: ignore
if type(version) is not expected_str_type: if type(version) is not expected_str_type:
raise RuntimeError('version name for tool {} is not a string'.format(tool_name)) raise RuntimeError('version name for tool {} is not a string'.format(tool_name))
version_status = version_dict.get('status') version_status = version_dict.get('status') # type: ignore
if type(version_status) is not expected_str_type and version_status not in IDFToolVersion.STATUS_VALUES: if type(version_status) is not expected_str_type and version_status not in IDFToolVersion.STATUS_VALUES:
raise RuntimeError('tool {} version {} status is not one of {}', tool_name, version, raise RuntimeError('tool {} version {} status is not one of {}', tool_name, version,
IDFToolVersion.STATUS_VALUES) IDFToolVersion.STATUS_VALUES)
version_obj = IDFToolVersion(version, version_status) version_obj = IDFToolVersion(version, version_status)
for platform_id, platform_dict in version_dict.items(): for platform_id, platform_dict in version_dict.items(): # type: ignore
if platform_id in ['name', 'status']: if platform_id in ['name', 'status']:
continue continue
if platform_id not in PLATFORM_FROM_NAME.keys(): if platform_id not in PLATFORM_FROM_NAME.keys():
@@ -875,7 +888,7 @@ class IDFTool(object):
tool_obj._update_current_options() tool_obj._update_current_options()
return tool_obj return tool_obj
def to_json(self): def to_json(self): # type: ignore
versions_array = [] versions_array = []
for version, version_obj in self.versions.items(): for version, version_obj in self.versions.items():
version_json = { version_json = {
@@ -912,19 +925,19 @@ class IDFTool(object):
return tool_json return tool_json
def load_tools_info(): # type: () -> typing.Dict[str, IDFTool] def load_tools_info(): # type: () -> dict[str, IDFTool]
""" """
Load tools metadata from tools.json, return a dictionary: tool name - tool info Load tools metadata from tools.json, return a dictionary: tool name - tool info
""" """
tool_versions_file_name = global_tools_json tool_versions_file_name = global_tools_json
with open(tool_versions_file_name, 'r') as f: with open(tool_versions_file_name, 'r') as f: # type: ignore
tools_info = json.load(f) tools_info = json.load(f)
return parse_tools_info_json(tools_info) return parse_tools_info_json(tools_info) # type: ignore
def parse_tools_info_json(tools_info): def parse_tools_info_json(tools_info): # type: ignore
""" """
Parse and validate the dictionary obtained by loading the tools.json file. Parse and validate the dictionary obtained by loading the tools.json file.
Returns a dictionary of tools (key: tool name, value: IDFTool object). Returns a dictionary of tools (key: tool name, value: IDFTool object).
@@ -945,7 +958,7 @@ def parse_tools_info_json(tools_info):
return tools_dict return tools_dict
def dump_tools_json(tools_info): def dump_tools_json(tools_info): # type: ignore
tools_array = [] tools_array = []
for tool_name, tool_obj in tools_info.items(): for tool_name, tool_obj in tools_info.items():
tool_json = tool_obj.to_json() tool_json = tool_obj.to_json()
@@ -954,10 +967,10 @@ def dump_tools_json(tools_info):
return json.dumps(file_json, indent=2, separators=(',', ': '), sort_keys=True) return json.dumps(file_json, indent=2, separators=(',', ': '), sort_keys=True)
def get_python_env_path(): def get_python_env_path(): # type: () -> Tuple[str, str, str]
python_ver_major_minor = '{}.{}'.format(sys.version_info.major, sys.version_info.minor) python_ver_major_minor = '{}.{}'.format(sys.version_info.major, sys.version_info.minor)
version_file_path = os.path.join(global_idf_path, 'version.txt') version_file_path = os.path.join(global_idf_path, 'version.txt') # type: ignore
if os.path.exists(version_file_path): if os.path.exists(version_file_path):
with open(version_file_path, 'r') as version_file: with open(version_file_path, 'r') as version_file:
idf_version_str = version_file.read() idf_version_str = version_file.read()
@@ -970,12 +983,12 @@ def get_python_env_path():
idf_version_str = '' idf_version_str = ''
match = re.match(r'^v([0-9]+\.[0-9]+).*', idf_version_str) match = re.match(r'^v([0-9]+\.[0-9]+).*', idf_version_str)
if match: if match:
idf_version = match.group(1) idf_version = match.group(1) # type: Optional[str]
else: else:
idf_version = None idf_version = None
# fallback when IDF is a shallow clone # fallback when IDF is a shallow clone
try: try:
with open(os.path.join(global_idf_path, 'components', 'esp_common', 'include', 'esp_idf_version.h')) as f: with open(os.path.join(global_idf_path, 'components', 'esp_common', 'include', 'esp_idf_version.h')) as f: # type: ignore
m = re.search(r'^#define\s+ESP_IDF_VERSION_MAJOR\s+(\d+).+?^#define\s+ESP_IDF_VERSION_MINOR\s+(\d+)', m = re.search(r'^#define\s+ESP_IDF_VERSION_MAJOR\s+(\d+).+?^#define\s+ESP_IDF_VERSION_MINOR\s+(\d+)',
f.read(), re.DOTALL | re.MULTILINE) f.read(), re.DOTALL | re.MULTILINE)
if m: if m:
@@ -989,7 +1002,7 @@ def get_python_env_path():
fatal('IDF version cannot be determined') fatal('IDF version cannot be determined')
raise SystemExit(1) raise SystemExit(1)
idf_python_env_path = os.path.join(global_idf_tools_path, 'python_env', idf_python_env_path = os.path.join(global_idf_tools_path, 'python_env', # type: ignore
'idf{}_py{}_env'.format(idf_version, python_ver_major_minor)) 'idf{}_py{}_env'.format(idf_version, python_ver_major_minor))
if sys.platform == 'win32': if sys.platform == 'win32':
@@ -1005,7 +1018,7 @@ def get_python_env_path():
return idf_python_env_path, idf_python_export_path, virtualenv_python return idf_python_env_path, idf_python_export_path, virtualenv_python
def action_list(args): def action_list(args): # type: ignore
tools_info = load_tools_info() tools_info = load_tools_info()
for name, tool in tools_info.items(): for name, tool in tools_info.items():
if tool.get_install_type() == IDFTool.INSTALL_NEVER: if tool.get_install_type() == IDFTool.INSTALL_NEVER:
@@ -1017,14 +1030,14 @@ def action_list(args):
if not versions_for_platform: if not versions_for_platform:
info(' (no versions compatible with platform {})'.format(PYTHON_PLATFORM)) info(' (no versions compatible with platform {})'.format(PYTHON_PLATFORM))
continue continue
versions_sorted = sorted(versions_for_platform.keys(), key=tool.versions.get, reverse=True) versions_sorted = sorted(versions_for_platform.keys(), key=tool.versions.get, reverse=True) # type: ignore
for version in versions_sorted: for version in versions_sorted:
version_obj = tool.versions[version] version_obj = tool.versions[version]
info(' - {} ({}{})'.format(version, version_obj.status, info(' - {} ({}{})'.format(version, version_obj.status,
', installed' if version in tool.versions_installed else '')) ', installed' if version in tool.versions_installed else ''))
def action_check(args): def action_check(args): # type: ignore
tools_info = load_tools_info() tools_info = load_tools_info()
not_found_list = [] not_found_list = []
info('Checking for installed tools...') info('Checking for installed tools...')
@@ -1050,7 +1063,7 @@ def action_check(args):
raise SystemExit(1) raise SystemExit(1)
def action_export(args): def action_export(args): # type: ignore
tools_info = load_tools_info() tools_info = load_tools_info()
all_tools_found = True all_tools_found = True
export_vars = {} export_vars = {}
@@ -1063,7 +1076,7 @@ def action_export(args):
if tool.version_in_path: if tool.version_in_path:
if tool.version_in_path not in tool.versions: if tool.version_in_path not in tool.versions:
# unsupported version # unsupported version
if args.prefer_system: if args.prefer_system: # type: ignore
warn('using an unsupported version of tool {} found in PATH: {}'.format( warn('using an unsupported version of tool {} found in PATH: {}'.format(
tool.name, tool.version_in_path)) tool.name, tool.version_in_path))
continue continue
@@ -1172,12 +1185,12 @@ def action_export(args):
raise SystemExit(1) raise SystemExit(1)
def apply_url_mirrors(args, tool_download_obj): def apply_url_mirrors(args, tool_download_obj): # type: ignore
apply_mirror_prefix_map(args, tool_download_obj) apply_mirror_prefix_map(args, tool_download_obj)
apply_github_assets_option(tool_download_obj) apply_github_assets_option(tool_download_obj)
def apply_mirror_prefix_map(args, tool_download_obj): def apply_mirror_prefix_map(args, tool_download_obj): # type: ignore
"""Rewrite URL for given tool_obj, given tool_version, and current platform, """Rewrite URL for given tool_obj, given tool_version, and current platform,
if --mirror-prefix-map flag or IDF_MIRROR_PREFIX_MAP environment variable is given. if --mirror-prefix-map flag or IDF_MIRROR_PREFIX_MAP environment variable is given.
""" """
@@ -1204,7 +1217,7 @@ def apply_mirror_prefix_map(args, tool_download_obj):
break break
def apply_github_assets_option(tool_download_obj): def apply_github_assets_option(tool_download_obj): # type: ignore
""" Rewrite URL for given tool_obj if the download URL is an https://github.com/ URL and the variable """ Rewrite URL for given tool_obj if the download URL is an https://github.com/ URL and the variable
IDF_GITHUB_ASSETS is set. The github.com part of the URL will be replaced. IDF_GITHUB_ASSETS is set. The github.com part of the URL will be replaced.
""" """
@@ -1230,7 +1243,7 @@ def apply_github_assets_option(tool_download_obj):
tool_download_obj.url = new_url tool_download_obj.url = new_url
def action_download(args): def action_download(args): # type: ignore
tools_info = load_tools_info() tools_info = load_tools_info()
tools_spec = args.tools tools_spec = args.tools
@@ -1277,9 +1290,9 @@ def action_download(args):
tool_obj.download(tool_version) tool_obj.download(tool_version)
def action_install(args): def action_install(args): # type: ignore
tools_info = load_tools_info() tools_info = load_tools_info()
tools_spec = args.tools tools_spec = args.tools # type: ignore
if not tools_spec or 'required' in tools_spec: if not tools_spec or 'required' in tools_spec:
tools_spec = [k for k, v in tools_info.items() if v.get_install_type() == IDFTool.INSTALL_ALWAYS] tools_spec = [k for k, v in tools_info.items() if v.get_install_type() == IDFTool.INSTALL_ALWAYS]
info('Installing tools: {}'.format(', '.join(tools_spec))) info('Installing tools: {}'.format(', '.join(tools_spec)))
@@ -1319,7 +1332,7 @@ def action_install(args):
tool_obj.install(tool_version) tool_obj.install(tool_version)
def get_wheels_dir(): def get_wheels_dir(): # type: () -> Optional[str]
tools_info = load_tools_info() tools_info = load_tools_info()
wheels_package_name = 'idf-python-wheels' wheels_package_name = 'idf-python-wheels'
if wheels_package_name not in tools_info: if wheels_package_name not in tools_info:
@@ -1334,7 +1347,7 @@ def get_wheels_dir():
return wheels_dir return wheels_dir
def action_install_python_env(args): def action_install_python_env(args): # type: ignore
idf_python_env_path, _, virtualenv_python = get_python_env_path() idf_python_env_path, _, virtualenv_python = get_python_env_path()
is_virtualenv = hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix) is_virtualenv = hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix)
@@ -1376,7 +1389,7 @@ def action_install_python_env(args):
subprocess.check_call(run_args, stdout=sys.stdout, stderr=sys.stderr) subprocess.check_call(run_args, stdout=sys.stdout, stderr=sys.stderr)
def action_add_version(args): def action_add_version(args): # type: ignore
tools_info = load_tools_info() tools_info = load_tools_info()
tool_name = args.tool tool_name = args.tool
tool_obj = tools_info.get(tool_name) tool_obj = tools_info.get(tool_name)
@@ -1420,7 +1433,7 @@ def action_add_version(args):
info('Wrote output to {}'.format(args.output)) info('Wrote output to {}'.format(args.output))
def action_rewrite(args): def action_rewrite(args): # type: ignore
tools_info = load_tools_info() tools_info = load_tools_info()
json_str = dump_tools_json(tools_info) json_str = dump_tools_json(tools_info)
if not args.output: if not args.output:
@@ -1431,7 +1444,7 @@ def action_rewrite(args):
info('Wrote output to {}'.format(args.output)) info('Wrote output to {}'.format(args.output))
def action_validate(args): def action_validate(args): # type: ignore
try: try:
import jsonschema import jsonschema
except ImportError: except ImportError:
@@ -1447,11 +1460,11 @@ def action_validate(args):
# on failure, this will raise an exception with a fairly verbose diagnostic message # on failure, this will raise an exception with a fairly verbose diagnostic message
def action_gen_doc(args): def action_gen_doc(args): # type: ignore
f = args.output f = args.output
tools_info = load_tools_info() tools_info = load_tools_info()
def print_out(text): def print_out(text): # type: (str) -> None
f.write(text + '\n') f.write(text + '\n')
print_out('.. |zwsp| unicode:: U+200B') print_out('.. |zwsp| unicode:: U+200B')
@@ -1528,7 +1541,7 @@ More info: {info_url}
print_out('') print_out('')
def main(argv): def main(argv): # type: (list[str]) -> None
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--quiet', help='Don\'t output diagnostic messages to stdout/stderr', action='store_true') parser.add_argument('--quiet', help='Don\'t output diagnostic messages to stdout/stderr', action='store_true')
@@ -1628,7 +1641,7 @@ def main(argv):
if sys.version_info.major == 2: if sys.version_info.major == 2:
try: try:
global_idf_tools_path.decode('ascii') global_idf_tools_path.decode('ascii') # type: ignore
except UnicodeDecodeError: except UnicodeDecodeError:
fatal('IDF_TOOLS_PATH contains non-ASCII characters: {}'.format(global_idf_tools_path) + fatal('IDF_TOOLS_PATH contains non-ASCII characters: {}'.format(global_idf_tools_path) +
'\nThis is not supported yet with Python 2. ' + '\nThis is not supported yet with Python 2. ' +