Switch legacy platform manager to the new

This commit is contained in:
Ivan Kravets
2020-08-15 23:11:01 +03:00
parent bb6fb3fdf8
commit 04694b4126
25 changed files with 470 additions and 1271 deletions

View File

@ -45,10 +45,10 @@ def _dump_includes(env):
# includes from toolchains
p = env.PioPlatform()
includes["toolchain"] = []
for name in p.get_installed_packages():
if p.get_package_type(name) != "toolchain":
for pkg in p.get_installed_packages():
if p.get_package_type(pkg.metadata.name) != "toolchain":
continue
toolchain_dir = glob_escape(p.get_package_dir(name))
toolchain_dir = glob_escape(pkg.path)
toolchain_incglobs = [
os.path.join(toolchain_dir, "*", "include", "c++", "*"),
os.path.join(toolchain_dir, "*", "include", "c++", "*", "*-*-*"),

View File

@ -22,6 +22,7 @@ from SCons.Script import COMMAND_LINE_TARGETS # pylint: disable=import-error
from platformio import fs, util
from platformio.compat import WINDOWS
from platformio.package.meta import PackageItem
from platformio.platform.exception import UnknownBoard
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectOptions
@ -63,32 +64,30 @@ def GetFrameworkScript(env, framework):
def LoadPioPlatform(env):
p = env.PioPlatform()
installed_packages = p.get_installed_packages()
# Ensure real platform name
env["PIOPLATFORM"] = p.name
# Add toolchains and uploaders to $PATH and $*_LIBRARY_PATH
systype = util.get_systype()
for name in installed_packages:
type_ = p.get_package_type(name)
for pkg in p.get_installed_packages():
type_ = p.get_package_type(pkg.metadata.name)
if type_ not in ("toolchain", "uploader", "debugger"):
continue
pkg_dir = p.get_package_dir(name)
env.PrependENVPath(
"PATH",
os.path.join(pkg_dir, "bin")
if os.path.isdir(os.path.join(pkg_dir, "bin"))
else pkg_dir,
os.path.join(pkg.path, "bin")
if os.path.isdir(os.path.join(pkg.path, "bin"))
else pkg.path,
)
if (
not WINDOWS
and os.path.isdir(os.path.join(pkg_dir, "lib"))
and os.path.isdir(os.path.join(pkg.path, "lib"))
and type_ != "toolchain"
):
env.PrependENVPath(
"DYLD_LIBRARY_PATH" if "darwin" in systype else "LD_LIBRARY_PATH",
os.path.join(pkg_dir, "lib"),
os.path.join(pkg.path, "lib"),
)
# Platform specific LD Scripts
@ -133,6 +132,7 @@ def LoadPioPlatform(env):
def PrintConfiguration(env): # pylint: disable=too-many-statements
platform = env.PioPlatform()
pkg_metadata = PackageItem(platform.get_dir()).metadata
board_config = env.BoardConfig() if "BOARD" in env else None
def _get_configuration_data():
@ -147,11 +147,12 @@ def PrintConfiguration(env): # pylint: disable=too-many-statements
)
def _get_plaform_data():
data = ["PLATFORM: %s (%s)" % (platform.title, platform.version)]
if platform.src_version:
data.append("#" + platform.src_version)
if int(ARGUMENTS.get("PIOVERBOSE", 0)) and platform.src_url:
data.append("(%s)" % platform.src_url)
data = [
"PLATFORM: %s (%s)"
% (platform.title, pkg_metadata.version or platform.version)
]
if int(ARGUMENTS.get("PIOVERBOSE", 0)) and pkg_metadata.spec.external:
data.append("(%s)" % pkg_metadata.spec.url)
if board_config:
data.extend([">", board_config.get("name")])
return data

View File

@ -19,7 +19,7 @@ from tabulate import tabulate
from platformio import fs
from platformio.compat import dump_json_to_unicode
from platformio.managers.platform import PlatformManager
from platformio.package.manager.platform import PlatformPackageManager
@click.command("boards", short_help="Embedded Board Explorer")
@ -71,7 +71,7 @@ def print_boards(boards):
def _get_boards(installed=False):
pm = PlatformManager()
pm = PlatformPackageManager()
return pm.get_installed_boards() if installed else pm.get_all_boards()

View File

@ -25,7 +25,7 @@ from platformio.commands.home.rpc.handlers.app import AppRPC
from platformio.commands.home.rpc.handlers.piocore import PIOCoreRPC
from platformio.compat import PY2, get_filesystem_encoding
from platformio.ide.projectgenerator import ProjectGenerator
from platformio.managers.platform import PlatformManager
from platformio.package.manager.platform import PlatformPackageManager
from platformio.project.config import ProjectConfig
from platformio.project.exception import ProjectError
from platformio.project.helpers import get_project_dir, is_platformio_project
@ -105,7 +105,7 @@ class ProjectRPC(object):
return (os.path.sep).join(path.split(os.path.sep)[-2:])
result = []
pm = PlatformManager()
pm = PlatformPackageManager()
for project_dir in AppRPC.load_state()["storage"]["recentProjects"]:
if not os.path.isdir(project_dir):
continue
@ -148,8 +148,9 @@ class ProjectRPC(object):
@staticmethod
def get_project_examples():
result = []
for manifest in PlatformManager().get_installed():
examples_dir = os.path.join(manifest["__pkg_dir"], "examples")
pm = PlatformPackageManager()
for pkg in pm.get_installed():
examples_dir = os.path.join(pkg.path, "examples")
if not os.path.isdir(examples_dir):
continue
items = []
@ -172,6 +173,7 @@ class ProjectRPC(object):
"description": project_description,
}
)
manifest = pm.load_manifest(pkg)
result.append(
{
"platform": {

View File

@ -15,7 +15,7 @@
import os
from platformio.compat import ci_strings_are_equal
from platformio.managers.platform import PlatformManager
from platformio.package.manager.platform import PlatformPackageManager
from platformio.package.meta import PackageSpec
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectConfig
@ -28,9 +28,9 @@ def get_builtin_libs(storage_names=None):
items = []
storage_names = storage_names or []
pm = PlatformManager()
for manifest in pm.get_installed():
p = PlatformFactory.new(manifest["__pkg_dir"])
pm = PlatformPackageManager()
for pkg in pm.get_installed():
p = PlatformFactory.new(pkg)
for storage in p.get_lib_storages():
if storage_names and storage["name"] not in storage_names:
continue

View File

@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from os.path import dirname, isdir
import os
import click
from platformio import app, util
from platformio.commands.boards import print_boards
from platformio.compat import dump_json_to_unicode
from platformio.managers.platform import PlatformManager
from platformio.package.manager.platform import PlatformPackageManager
from platformio.package.meta import PackageItem, PackageSpec
from platformio.platform.exception import UnknownPlatform
from platformio.platform.factory import PlatformFactory
@ -48,7 +49,7 @@ def _print_platforms(platforms):
if "version" in platform:
if "__src_url" in platform:
click.echo(
"Version: #%s (%s)" % (platform["version"], platform["__src_url"])
"Version: %s (%s)" % (platform["version"], platform["__src_url"])
)
else:
click.echo("Version: " + platform["version"])
@ -56,11 +57,7 @@ def _print_platforms(platforms):
def _get_registry_platforms():
platforms = util.get_api_result("/platforms", cache_valid="7d")
pm = PlatformManager()
for platform in platforms or []:
platform["versions"] = pm.get_all_repo_versions(platform["name"])
return platforms
return util.get_api_result("/platforms", cache_valid="7d")
def _get_platform_data(*args, **kwargs):
@ -91,7 +88,9 @@ def _get_installed_platform_data(platform, with_boards=True, expose_packages=Tru
# return data
# overwrite VCS version and add extra fields
manifest = PlatformManager().load_manifest(dirname(p.manifest_path))
manifest = PlatformPackageManager().legacy_load_manifest(
os.path.dirname(p.manifest_path)
)
assert manifest
for key in manifest:
if key == "version" or key.startswith("__"):
@ -104,13 +103,15 @@ def _get_installed_platform_data(platform, with_boards=True, expose_packages=Tru
return data
data["packages"] = []
installed_pkgs = p.get_installed_packages()
for name, opts in p.packages.items():
installed_pkgs = {
pkg.metadata.name: p.pm.load_manifest(pkg) for pkg in p.get_installed_packages()
}
for name, options in p.packages.items():
item = dict(
name=name,
type=p.get_package_type(name),
requirements=opts.get("version"),
optional=opts.get("optional") is True,
requirements=options.get("version"),
optional=options.get("optional") is True,
)
if name in installed_pkgs:
for key, value in installed_pkgs[name].items():
@ -147,13 +148,13 @@ def _get_registry_platform_data( # pylint: disable=unused-argument
forDesktop=_data["forDesktop"],
frameworks=_data["frameworks"],
packages=_data["packages"],
versions=_data["versions"],
versions=_data.get("versions"),
)
if with_boards:
data["boards"] = [
board
for board in PlatformManager().get_registered_boards()
for board in PlatformPackageManager().get_registered_boards()
if board["platform"] == _data["name"]
]
@ -213,12 +214,10 @@ def platform_frameworks(query, json_output):
@click.option("--json-output", is_flag=True)
def platform_list(json_output):
platforms = []
pm = PlatformManager()
for manifest in pm.get_installed():
pm = PlatformPackageManager()
for pkg in pm.get_installed():
platforms.append(
_get_installed_platform_data(
manifest["__pkg_dir"], with_boards=False, expose_packages=False
)
_get_installed_platform_data(pkg, with_boards=False, expose_packages=False)
)
platforms = sorted(platforms, key=lambda manifest: manifest["name"])
@ -300,6 +299,7 @@ def platform_show(platform, json_output): # pylint: disable=too-many-branches
@click.option("--without-package", multiple=True)
@click.option("--skip-default-package", is_flag=True)
@click.option("--with-all-packages", is_flag=True)
@click.option("-s", "--silent", is_flag=True, help="Suppress progress reporting")
@click.option(
"-f",
"--force",
@ -312,21 +312,24 @@ def platform_install( # pylint: disable=too-many-arguments
without_package,
skip_default_package,
with_all_packages,
silent,
force,
):
pm = PlatformManager()
pm = PlatformPackageManager()
for platform in platforms:
if pm.install(
name=platform,
pkg = pm.install(
spec=platform,
with_packages=with_package,
without_packages=without_package,
skip_default_package=skip_default_package,
with_all_packages=with_all_packages,
silent=silent,
force=force,
):
)
if pkg and not silent:
click.secho(
"The platform '%s' has been successfully installed!\n"
"The rest of packages will be installed automatically "
"The rest of the packages will be installed later "
"depending on your build environment." % platform,
fg="green",
)
@ -335,11 +338,11 @@ def platform_install( # pylint: disable=too-many-arguments
@cli.command("uninstall", short_help="Uninstall development platform")
@click.argument("platforms", nargs=-1, required=True, metavar="[PLATFORM...]")
def platform_uninstall(platforms):
pm = PlatformManager()
pm = PlatformPackageManager()
for platform in platforms:
if pm.uninstall(platform):
click.secho(
"The platform '%s' has been successfully uninstalled!" % platform,
"The platform '%s' has been successfully removed!" % platform,
fg="green",
)
@ -358,41 +361,40 @@ def platform_uninstall(platforms):
@click.option(
"--dry-run", is_flag=True, help="Do not update, only check for the new versions"
)
@click.option("-s", "--silent", is_flag=True, help="Suppress progress reporting")
@click.option("--json-output", is_flag=True)
def platform_update( # pylint: disable=too-many-locals
platforms, only_packages, only_check, dry_run, json_output
def platform_update( # pylint: disable=too-many-locals, too-many-arguments
platforms, only_packages, only_check, dry_run, silent, json_output
):
pm = PlatformManager()
pkg_dir_to_name = {}
if not platforms:
platforms = []
for manifest in pm.get_installed():
platforms.append(manifest["__pkg_dir"])
pkg_dir_to_name[manifest["__pkg_dir"]] = manifest.get(
"title", manifest["name"]
)
pm = PlatformPackageManager()
platforms = platforms or pm.get_installed()
only_check = dry_run or only_check
if only_check and json_output:
result = []
for platform in platforms:
pkg_dir = platform if isdir(platform) else None
requirements = None
url = None
if not pkg_dir:
name, requirements, url = pm.parse_pkg_uri(platform)
pkg_dir = pm.get_package_dir(name, requirements, url)
if not pkg_dir:
spec = None
pkg = None
if isinstance(platform, PackageItem):
pkg = platform
else:
spec = PackageSpec(platform)
pkg = pm.get_package(spec)
if not pkg:
continue
latest = pm.outdated(pkg_dir, requirements)
if not latest and not PlatformFactory.new(pkg_dir).are_outdated_packages():
outdated = pm.outdated(pkg, spec)
if (
not outdated.is_outdated(allow_incompatible=True)
and not PlatformFactory.new(pkg).are_outdated_packages()
):
continue
data = _get_installed_platform_data(
pkg_dir, with_boards=False, expose_packages=False
pkg, with_boards=False, expose_packages=False
)
if latest:
data["versionLatest"] = latest
if outdated.is_outdated(allow_incompatible=True):
data["versionLatest"] = (
str(outdated.latest) if outdated.latest else None
)
result.append(data)
return click.echo(dump_json_to_unicode(result))
@ -401,10 +403,17 @@ def platform_update( # pylint: disable=too-many-locals
for platform in platforms:
click.echo(
"Platform %s"
% click.style(pkg_dir_to_name.get(platform, platform), fg="cyan")
% click.style(
platform.metadata.name
if isinstance(platform, PackageItem)
else platform,
fg="cyan",
)
)
click.echo("--------")
pm.update(platform, only_packages=only_packages, only_check=only_check)
pm.update(
platform, only_packages=only_packages, only_check=only_check, silent=silent
)
click.echo()
return True

View File

@ -23,7 +23,7 @@ from tabulate import tabulate
from platformio import fs
from platformio.commands.platform import platform_install as cli_platform_install
from platformio.ide.projectgenerator import ProjectGenerator
from platformio.managers.platform import PlatformManager
from platformio.package.manager.platform import PlatformPackageManager
from platformio.platform.exception import UnknownBoard
from platformio.project.config import ProjectConfig
from platformio.project.exception import NotPlatformIOProjectError
@ -109,7 +109,7 @@ def project_idedata(project_dir, environment, json_output):
def validate_boards(ctx, param, value): # pylint: disable=W0613
pm = PlatformManager()
pm = PlatformPackageManager()
for id_ in value:
try:
pm.board_config(id_)
@ -367,7 +367,7 @@ def fill_project_envs(
if all(cond):
used_boards.append(config.get(section, "board"))
pm = PlatformManager()
pm = PlatformPackageManager()
used_platforms = []
modified = False
for id_ in board_ids:
@ -404,7 +404,9 @@ def fill_project_envs(
def _install_dependent_platforms(ctx, platforms):
installed_platforms = [p["name"] for p in PlatformManager().get_installed()]
installed_platforms = [
pkg.metadata.name for pkg in PlatformPackageManager().get_installed()
]
if set(platforms) <= set(installed_platforms):
return
ctx.invoke(

View File

@ -26,8 +26,8 @@ from platformio.commands.system.completion import (
install_completion_code,
uninstall_completion_code,
)
from platformio.managers.platform import PlatformManager
from platformio.package.manager.library import LibraryPackageManager
from platformio.package.manager.platform import PlatformPackageManager
from platformio.package.manager.tool import ToolPackageManager
from platformio.project.config import ProjectConfig
@ -77,7 +77,7 @@ def system_info(json_output):
}
data["dev_platform_nums"] = {
"title": "Development Platforms",
"value": len(PlatformManager().get_installed()),
"value": len(PlatformPackageManager().get_installed()),
}
data["package_tool_nums"] = {
"title": "Tools & Toolchains",

View File

@ -25,9 +25,9 @@ from platformio.commands.lib.command import CTX_META_STORAGE_DIRS_KEY
from platformio.commands.lib.command import lib_update as cmd_lib_update
from platformio.commands.platform import platform_update as cmd_platform_update
from platformio.commands.upgrade import get_latest_version
from platformio.managers.platform import PlatformManager
from platformio.package.manager.core import update_core_packages
from platformio.package.manager.library import LibraryPackageManager
from platformio.package.manager.platform import PlatformPackageManager
from platformio.package.manager.tool import ToolPackageManager
from platformio.package.meta import PackageSpec
from platformio.platform.factory import PlatformFactory
@ -271,24 +271,16 @@ def check_internal_updates(ctx, what): # pylint: disable=too-many-branches
util.internet_on(raise_exception=True)
outdated_items = []
pm = PlatformManager() if what == "platforms" else LibraryPackageManager()
if isinstance(pm, PlatformManager):
for manifest in pm.get_installed():
if manifest["name"] in outdated_items:
continue
conds = [
pm.outdated(manifest["__pkg_dir"]),
what == "platforms"
and PlatformFactory.new(manifest["__pkg_dir"]).are_outdated_packages(),
]
if any(conds):
outdated_items.append(manifest["name"])
else:
for pkg in pm.get_installed():
if pkg.metadata.name in outdated_items:
continue
if pm.outdated(pkg).is_outdated():
outdated_items.append(pkg.metadata.name)
pm = PlatformPackageManager() if what == "platforms" else LibraryPackageManager()
for pkg in pm.get_installed():
if pkg.metadata.name in outdated_items:
continue
conds = [
pm.outdated(pkg).is_outdated(),
what == "platforms" and PlatformFactory.new(pkg).are_outdated_packages(),
]
if any(conds):
outdated_items.append(pkg.metadata.name)
if not outdated_items:
return

View File

@ -1,816 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
import os
import re
import shutil
from os.path import basename, getsize, isdir, isfile, islink, join, realpath
from tempfile import mkdtemp
import click
import requests
import semantic_version
from platformio import __version__, app, exception, fs, util
from platformio.compat import hashlib_encode_data
from platformio.package.download import FileDownloader
from platformio.package.exception import ManifestException
from platformio.package.lockfile import LockFile
from platformio.package.manifest.parser import ManifestParserFactory
from platformio.package.unpack import FileUnpacker
from platformio.package.vcsclient import VCSClientFactory
# pylint: disable=too-many-arguments, too-many-return-statements
class PackageRepoIterator(object):
def __init__(self, package, repositories):
assert isinstance(repositories, list)
self.package = package
self.repositories = iter(repositories)
def __iter__(self):
return self
def __next__(self):
return self.next() # pylint: disable=not-callable
@staticmethod
@util.memoized(expire="60s")
def load_manifest(url):
r = None
try:
r = requests.get(url, headers={"User-Agent": app.get_user_agent()})
r.raise_for_status()
return r.json()
except: # pylint: disable=bare-except
pass
finally:
if r:
r.close()
return None
def next(self):
repo = next(self.repositories)
manifest = repo if isinstance(repo, dict) else self.load_manifest(repo)
if manifest and self.package in manifest:
return manifest[self.package]
return next(self)
class PkgRepoMixin(object):
PIO_VERSION = semantic_version.Version(util.pepver_to_semver(__version__))
@staticmethod
def is_system_compatible(valid_systems):
if not valid_systems or "*" in valid_systems:
return True
if not isinstance(valid_systems, list):
valid_systems = list([valid_systems])
return util.get_systype() in valid_systems
def max_satisfying_repo_version(self, versions, requirements=None):
item = None
reqspec = None
try:
reqspec = (
semantic_version.SimpleSpec(requirements) if requirements else None
)
except ValueError:
pass
for v in versions:
if not self.is_system_compatible(v.get("system")):
continue
# if "platformio" in v.get("engines", {}):
# if PkgRepoMixin.PIO_VERSION not in requirements.SimpleSpec(
# v['engines']['platformio']):
# continue
specver = semantic_version.Version(v["version"])
if reqspec and specver not in reqspec:
continue
if not item or semantic_version.Version(item["version"]) < specver:
item = v
return item
def get_latest_repo_version( # pylint: disable=unused-argument
self, name, requirements, silent=False
):
version = None
for versions in PackageRepoIterator(name, self.repositories):
pkgdata = self.max_satisfying_repo_version(versions, requirements)
if not pkgdata:
continue
if (
not version
or semantic_version.compare(pkgdata["version"], version) == 1
):
version = pkgdata["version"]
return version
def get_all_repo_versions(self, name):
result = []
for versions in PackageRepoIterator(name, self.repositories):
result.extend([semantic_version.Version(v["version"]) for v in versions])
return [str(v) for v in sorted(set(result))]
class PkgInstallerMixin(object):
SRC_MANIFEST_NAME = ".piopkgmanager.json"
TMP_FOLDER_PREFIX = "_tmp_installing-"
FILE_CACHE_VALID = None # for example, 1 week = "7d"
FILE_CACHE_MAX_SIZE = 1024 * 1024 * 50 # 50 Mb
MEMORY_CACHE = {} # cache for package manifests and read dirs
def cache_get(self, key, default=None):
return self.MEMORY_CACHE.get(key, default)
def cache_set(self, key, value):
self.MEMORY_CACHE[key] = value
def cache_reset(self):
self.MEMORY_CACHE.clear()
def read_dirs(self, src_dir):
cache_key = "read_dirs-%s" % src_dir
result = self.cache_get(cache_key)
if result:
return result
result = [
join(src_dir, name)
for name in sorted(os.listdir(src_dir))
if isdir(join(src_dir, name))
]
self.cache_set(cache_key, result)
return result
def download(self, url, dest_dir, sha1=None):
cache_key_fname = app.ContentCache.key_from_args(url, "fname")
cache_key_data = app.ContentCache.key_from_args(url, "data")
if self.FILE_CACHE_VALID:
with app.ContentCache() as cc:
fname = str(cc.get(cache_key_fname))
cache_path = cc.get_cache_path(cache_key_data)
if fname and isfile(cache_path):
dst_path = join(dest_dir, fname)
shutil.copy(cache_path, dst_path)
click.echo("Using cache: %s" % cache_path)
return dst_path
with_progress = not app.is_disabled_progressbar()
try:
fd = FileDownloader(url, dest_dir)
fd.start(with_progress=with_progress)
except IOError as e:
raise_error = not with_progress
if with_progress:
try:
fd = FileDownloader(url, dest_dir)
fd.start(with_progress=False)
except IOError:
raise_error = True
if raise_error:
click.secho(
"Error: Please read http://bit.ly/package-manager-ioerror",
fg="red",
err=True,
)
raise e
if sha1:
fd.verify(sha1)
dst_path = fd.get_filepath()
if (
not self.FILE_CACHE_VALID
or getsize(dst_path) > PkgInstallerMixin.FILE_CACHE_MAX_SIZE
):
return dst_path
with app.ContentCache() as cc:
cc.set(cache_key_fname, basename(dst_path), self.FILE_CACHE_VALID)
cc.set(cache_key_data, "DUMMY", self.FILE_CACHE_VALID)
shutil.copy(dst_path, cc.get_cache_path(cache_key_data))
return dst_path
@staticmethod
def unpack(source_path, dest_dir):
with_progress = not app.is_disabled_progressbar()
try:
with FileUnpacker(source_path) as fu:
return fu.unpack(dest_dir, with_progress=with_progress)
except IOError as e:
if not with_progress:
raise e
with FileUnpacker(source_path) as fu:
return fu.unpack(dest_dir, with_progress=False)
@staticmethod
def parse_semver_version(value, raise_exception=False):
try:
try:
return semantic_version.Version(value)
except ValueError:
if "." not in str(value) and not str(value).isdigit():
raise ValueError("Invalid SemVer version %s" % value)
return semantic_version.Version.coerce(value)
except ValueError as e:
if raise_exception:
raise e
return None
@staticmethod
def parse_pkg_uri(text, requirements=None): # pylint: disable=too-many-branches
text = str(text)
name, url = None, None
# Parse requirements
req_conditions = [
"@" in text,
not requirements,
":" not in text or text.rfind("/") < text.rfind("@"),
]
if all(req_conditions):
text, requirements = text.rsplit("@", 1)
# Handle PIO Library Registry ID
if text.isdigit():
text = "id=" + text
# Parse custom name
elif "=" in text and not text.startswith("id="):
name, text = text.split("=", 1)
# Parse URL
# if valid URL with scheme vcs+protocol://
if "+" in text and text.find("+") < text.find("://"):
url = text
elif "/" in text or "\\" in text:
git_conditions = [
# Handle GitHub URL (https://github.com/user/package)
text.startswith("https://github.com/")
and not text.endswith((".zip", ".tar.gz")),
(text.split("#", 1)[0] if "#" in text else text).endswith(".git"),
]
hg_conditions = [
# Handle Developer Mbed URL
# (https://developer.mbed.org/users/user/code/package/)
# (https://os.mbed.com/users/user/code/package/)
text.startswith("https://developer.mbed.org"),
text.startswith("https://os.mbed.com"),
]
if any(git_conditions):
url = "git+" + text
elif any(hg_conditions):
url = "hg+" + text
elif "://" not in text and (isfile(text) or isdir(text)):
url = "file://" + text
elif "://" in text:
url = text
# Handle short version of GitHub URL
elif text.count("/") == 1:
url = "git+https://github.com/" + text
# Parse name from URL
if url and not name:
_url = url.split("#", 1)[0] if "#" in url else url
if _url.endswith(("\\", "/")):
_url = _url[:-1]
name = basename(_url)
if "." in name and not name.startswith("."):
name = name.rsplit(".", 1)[0]
return (name or text, requirements, url)
@staticmethod
def get_install_dirname(manifest):
name = re.sub(r"[^\da-z\_\-\. ]", "_", manifest["name"], flags=re.I)
if "id" in manifest:
name += "_ID%d" % manifest["id"]
return str(name)
@classmethod
def get_src_manifest_path(cls, pkg_dir):
if not isdir(pkg_dir):
return None
for item in os.listdir(pkg_dir):
if not isdir(join(pkg_dir, item)):
continue
if isfile(join(pkg_dir, item, cls.SRC_MANIFEST_NAME)):
return join(pkg_dir, item, cls.SRC_MANIFEST_NAME)
return None
def get_manifest_path(self, pkg_dir):
if not isdir(pkg_dir):
return None
for name in self.manifest_names:
manifest_path = join(pkg_dir, name)
if isfile(manifest_path):
return manifest_path
return None
def manifest_exists(self, pkg_dir):
return self.get_manifest_path(pkg_dir) or self.get_src_manifest_path(pkg_dir)
def load_manifest(self, pkg_dir): # pylint: disable=too-many-branches
cache_key = "load_manifest-%s" % pkg_dir
result = self.cache_get(cache_key)
if result:
return result
manifest = {}
src_manifest = None
manifest_path = self.get_manifest_path(pkg_dir)
src_manifest_path = self.get_src_manifest_path(pkg_dir)
if src_manifest_path:
src_manifest = fs.load_json(src_manifest_path)
if not manifest_path and not src_manifest_path:
return None
try:
manifest = ManifestParserFactory.new_from_file(manifest_path).as_dict()
except ManifestException:
pass
if src_manifest:
if "version" in src_manifest:
manifest["version"] = src_manifest["version"]
manifest["__src_url"] = src_manifest["url"]
# handle a custom package name
autogen_name = self.parse_pkg_uri(manifest["__src_url"])[0]
if "name" not in manifest or autogen_name != src_manifest["name"]:
manifest["name"] = src_manifest["name"]
if "name" not in manifest:
manifest["name"] = basename(pkg_dir)
if "version" not in manifest:
manifest["version"] = "0.0.0"
manifest["__pkg_dir"] = realpath(pkg_dir)
self.cache_set(cache_key, manifest)
return manifest
def get_installed(self):
items = []
for pkg_dir in self.read_dirs(self.package_dir):
if self.TMP_FOLDER_PREFIX in pkg_dir:
continue
manifest = self.load_manifest(pkg_dir)
if not manifest:
continue
assert "name" in manifest
items.append(manifest)
return items
def get_package(self, name, requirements=None, url=None):
pkg_id = int(name[3:]) if name.startswith("id=") else 0
best = None
for manifest in self.get_installed():
if url:
if manifest.get("__src_url") != url:
continue
elif pkg_id and manifest.get("id") != pkg_id:
continue
elif not pkg_id and manifest["name"] != name:
continue
elif not PkgRepoMixin.is_system_compatible(manifest.get("system")):
continue
# strict version or VCS HASH
if requirements and requirements == manifest["version"]:
return manifest
try:
if requirements and not semantic_version.SimpleSpec(requirements).match(
self.parse_semver_version(manifest["version"], raise_exception=True)
):
continue
if not best or (
self.parse_semver_version(manifest["version"], raise_exception=True)
> self.parse_semver_version(best["version"], raise_exception=True)
):
best = manifest
except ValueError:
pass
return best
def get_package_dir(self, name, requirements=None, url=None):
manifest = self.get_package(name, requirements, url)
return (
manifest.get("__pkg_dir")
if manifest and isdir(manifest.get("__pkg_dir"))
else None
)
def get_package_by_dir(self, pkg_dir):
for manifest in self.get_installed():
if manifest["__pkg_dir"] == realpath(pkg_dir):
return manifest
return None
def find_pkg_root(self, src_dir):
if self.manifest_exists(src_dir):
return src_dir
for root, _, _ in os.walk(src_dir):
if self.manifest_exists(root):
return root
raise exception.MissingPackageManifest(", ".join(self.manifest_names))
def _install_from_piorepo(self, name, requirements):
pkg_dir = None
pkgdata = None
versions = None
last_exc = None
for versions in PackageRepoIterator(name, self.repositories):
pkgdata = self.max_satisfying_repo_version(versions, requirements)
if not pkgdata:
continue
try:
pkg_dir = self._install_from_url(
name, pkgdata["url"], requirements, pkgdata.get("sha1")
)
break
except Exception as e: # pylint: disable=broad-except
last_exc = e
click.secho("Warning! Package Mirror: %s" % e, fg="yellow")
click.secho("Looking for another mirror...", fg="yellow")
if versions is None:
util.internet_on(raise_exception=True)
raise exception.UnknownPackage(
name + (". Error -> %s" % last_exc if last_exc else "")
)
if not pkgdata:
raise exception.UndefinedPackageVersion(
requirements or "latest", util.get_systype()
)
return pkg_dir
def _install_from_url(self, name, url, requirements=None, sha1=None, track=False):
tmp_dir = mkdtemp("-package", self.TMP_FOLDER_PREFIX, self.package_dir)
src_manifest_dir = None
src_manifest = {"name": name, "url": url, "requirements": requirements}
try:
if url.startswith("file://"):
_url = url[7:]
if isfile(_url):
self.unpack(_url, tmp_dir)
else:
fs.rmtree(tmp_dir)
shutil.copytree(_url, tmp_dir, symlinks=True)
elif url.startswith(("http://", "https://")):
dlpath = self.download(url, tmp_dir, sha1)
assert isfile(dlpath)
self.unpack(dlpath, tmp_dir)
os.remove(dlpath)
else:
vcs = VCSClientFactory.new(tmp_dir, url)
assert vcs.export()
src_manifest_dir = vcs.storage_dir
src_manifest["version"] = vcs.get_current_revision()
_tmp_dir = tmp_dir
if not src_manifest_dir:
_tmp_dir = self.find_pkg_root(tmp_dir)
src_manifest_dir = join(_tmp_dir, ".pio")
# write source data to a special manifest
if track:
self._update_src_manifest(src_manifest, src_manifest_dir)
return self._install_from_tmp_dir(_tmp_dir, requirements)
finally:
if isdir(tmp_dir):
fs.rmtree(tmp_dir)
return None
def _update_src_manifest(self, data, src_dir):
if not isdir(src_dir):
os.makedirs(src_dir)
src_manifest_path = join(src_dir, self.SRC_MANIFEST_NAME)
_data = {}
if isfile(src_manifest_path):
_data = fs.load_json(src_manifest_path)
_data.update(data)
with open(src_manifest_path, "w") as fp:
json.dump(_data, fp)
def _install_from_tmp_dir( # pylint: disable=too-many-branches
self, tmp_dir, requirements=None
):
tmp_manifest = self.load_manifest(tmp_dir)
assert set(["name", "version"]) <= set(tmp_manifest)
pkg_dirname = self.get_install_dirname(tmp_manifest)
pkg_dir = join(self.package_dir, pkg_dirname)
cur_manifest = self.load_manifest(pkg_dir)
tmp_semver = self.parse_semver_version(tmp_manifest["version"])
cur_semver = None
if cur_manifest:
cur_semver = self.parse_semver_version(cur_manifest["version"])
# package should satisfy requirements
if requirements:
mismatch_error = "Package version %s doesn't satisfy requirements %s" % (
tmp_manifest["version"],
requirements,
)
try:
assert tmp_semver and tmp_semver in semantic_version.SimpleSpec(
requirements
), mismatch_error
except (AssertionError, ValueError):
assert tmp_manifest["version"] == requirements, mismatch_error
# check if package already exists
if cur_manifest:
# 0-overwrite, 1-rename, 2-fix to a version
action = 0
if "__src_url" in cur_manifest:
if cur_manifest["__src_url"] != tmp_manifest.get("__src_url"):
action = 1
elif "__src_url" in tmp_manifest:
action = 2
else:
if tmp_semver and (not cur_semver or tmp_semver > cur_semver):
action = 1
elif tmp_semver and cur_semver and tmp_semver != cur_semver:
action = 2
# rename
if action == 1:
target_dirname = "%s@%s" % (pkg_dirname, cur_manifest["version"])
if "__src_url" in cur_manifest:
target_dirname = "%s@src-%s" % (
pkg_dirname,
hashlib.md5(
hashlib_encode_data(cur_manifest["__src_url"])
).hexdigest(),
)
shutil.move(pkg_dir, join(self.package_dir, target_dirname))
# fix to a version
elif action == 2:
target_dirname = "%s@%s" % (pkg_dirname, tmp_manifest["version"])
if "__src_url" in tmp_manifest:
target_dirname = "%s@src-%s" % (
pkg_dirname,
hashlib.md5(
hashlib_encode_data(tmp_manifest["__src_url"])
).hexdigest(),
)
pkg_dir = join(self.package_dir, target_dirname)
# remove previous/not-satisfied package
if isdir(pkg_dir):
fs.rmtree(pkg_dir)
shutil.copytree(tmp_dir, pkg_dir, symlinks=True)
try:
shutil.rmtree(tmp_dir)
except: # pylint: disable=bare-except
pass
assert isdir(pkg_dir)
self.cache_reset()
return pkg_dir
class BasePkgManager(PkgRepoMixin, PkgInstallerMixin):
# Handle circle dependencies
INSTALL_HISTORY = None
def __init__(self, package_dir, repositories=None):
self.repositories = repositories
self.package_dir = package_dir
if not isdir(self.package_dir):
os.makedirs(self.package_dir)
assert isdir(self.package_dir)
@property
def manifest_names(self):
raise NotImplementedError()
def print_message(self, message, nl=True):
click.echo("%s: %s" % (self.__class__.__name__, message), nl=nl)
def outdated(self, pkg_dir, requirements=None):
"""
Has 3 different results:
`None` - unknown package, VCS is detached to commit
`False` - package is up-to-date
`String` - a found latest version
"""
if not isdir(pkg_dir):
return None
latest = None
manifest = self.load_manifest(pkg_dir)
# skip detached package to a specific version
if "@" in pkg_dir and "__src_url" not in manifest and not requirements:
return None
if "__src_url" in manifest:
try:
vcs = VCSClientFactory.new(pkg_dir, manifest["__src_url"], silent=True)
except (AttributeError, exception.PlatformioException):
return None
if not vcs.can_be_updated:
return None
latest = vcs.get_latest_revision()
else:
try:
latest = self.get_latest_repo_version(
"id=%d" % manifest["id"] if "id" in manifest else manifest["name"],
requirements,
silent=True,
)
except (exception.PlatformioException, ValueError):
return None
if not latest:
return None
up_to_date = False
try:
assert "__src_url" not in manifest
up_to_date = self.parse_semver_version(
manifest["version"], raise_exception=True
) >= self.parse_semver_version(latest, raise_exception=True)
except (AssertionError, ValueError):
up_to_date = latest == manifest["version"]
return False if up_to_date else latest
def install(
self, name, requirements=None, silent=False, after_update=False, force=False
): # pylint: disable=unused-argument
pkg_dir = None
# interprocess lock
with LockFile(self.package_dir):
self.cache_reset()
name, requirements, url = self.parse_pkg_uri(name, requirements)
package_dir = self.get_package_dir(name, requirements, url)
# avoid circle dependencies
if not self.INSTALL_HISTORY:
self.INSTALL_HISTORY = []
history_key = "%s-%s-%s" % (name, requirements or "", url or "")
if history_key in self.INSTALL_HISTORY:
return package_dir
self.INSTALL_HISTORY.append(history_key)
if package_dir and force:
self.uninstall(package_dir)
package_dir = None
if not package_dir or not silent:
msg = "Installing " + click.style(name, fg="cyan")
if requirements:
msg += " @ " + requirements
self.print_message(msg)
if package_dir:
if not silent:
click.secho(
"{name} @ {version} is already installed".format(
**self.load_manifest(package_dir)
),
fg="yellow",
)
return package_dir
if url:
pkg_dir = self._install_from_url(name, url, requirements, track=True)
else:
pkg_dir = self._install_from_piorepo(name, requirements)
if not pkg_dir or not self.manifest_exists(pkg_dir):
raise exception.PackageInstallError(
name, requirements or "*", util.get_systype()
)
manifest = self.load_manifest(pkg_dir)
assert manifest
click.secho(
"{name} @ {version} has been successfully installed!".format(
**manifest
),
fg="green",
)
return pkg_dir
def uninstall(
self, package, requirements=None, after_update=False
): # pylint: disable=unused-argument
# interprocess lock
with LockFile(self.package_dir):
self.cache_reset()
if isdir(package) and self.get_package_by_dir(package):
pkg_dir = package
else:
name, requirements, url = self.parse_pkg_uri(package, requirements)
pkg_dir = self.get_package_dir(name, requirements, url)
if not pkg_dir:
raise exception.UnknownPackage(
"%s @ %s" % (package, requirements or "*")
)
manifest = self.load_manifest(pkg_dir)
click.echo(
"Uninstalling %s @ %s: \t"
% (click.style(manifest["name"], fg="cyan"), manifest["version"]),
nl=False,
)
if islink(pkg_dir):
os.unlink(pkg_dir)
else:
fs.rmtree(pkg_dir)
self.cache_reset()
# unfix package with the same name
pkg_dir = self.get_package_dir(manifest["name"])
if pkg_dir and "@" in pkg_dir:
shutil.move(
pkg_dir, join(self.package_dir, self.get_install_dirname(manifest))
)
self.cache_reset()
click.echo("[%s]" % click.style("OK", fg="green"))
return True
def update(self, package, requirements=None, only_check=False):
self.cache_reset()
if isdir(package) and self.get_package_by_dir(package):
pkg_dir = package
else:
pkg_dir = self.get_package_dir(*self.parse_pkg_uri(package))
if not pkg_dir:
raise exception.UnknownPackage("%s @ %s" % (package, requirements or "*"))
manifest = self.load_manifest(pkg_dir)
name = manifest["name"]
click.echo(
"{} {:<40} @ {:<15}".format(
"Checking" if only_check else "Updating",
click.style(manifest["name"], fg="cyan"),
manifest["version"],
),
nl=False,
)
if not util.internet_on():
click.echo("[%s]" % (click.style("Off-line", fg="yellow")))
return None
latest = self.outdated(pkg_dir, requirements)
if latest:
click.echo("[%s]" % (click.style(latest, fg="red")))
elif latest is False:
click.echo("[%s]" % (click.style("Up-to-date", fg="green")))
else:
click.echo("[%s]" % (click.style("Detached", fg="yellow")))
if only_check or not latest:
return True
if "__src_url" in manifest:
vcs = VCSClientFactory.new(pkg_dir, manifest["__src_url"])
assert vcs.update()
self._update_src_manifest(
dict(version=vcs.get_current_revision()), vcs.storage_dir
)
else:
self.uninstall(pkg_dir, after_update=True)
self.install(name, latest, after_update=True)
return True
class PackageManager(BasePkgManager):
@property
def manifest_names(self):
return ["package.json"]

View File

@ -12,201 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-public-methods, too-many-instance-attributes
from os.path import isdir, isfile, join
from platformio import app, exception, util
from platformio.managers.package import BasePkgManager, PackageManager
# Backward compatibility with legacy dev-platforms
from platformio.platform.base import PlatformBase # pylint: disable=unused-import
from platformio.platform.exception import UnknownBoard, UnknownPlatform
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectConfig
class PlatformManager(BasePkgManager):
def __init__(self, package_dir=None, repositories=None):
if not repositories:
repositories = [
"https://dl.bintray.com/platformio/dl-platforms/manifest.json",
"{0}://dl.platformio.org/platforms/manifest.json".format(
"https" if app.get_setting("strict_ssl") else "http"
),
]
self.config = ProjectConfig.get_instance()
BasePkgManager.__init__(
self, package_dir or self.config.get_optional_dir("platforms"), repositories
)
@property
def manifest_names(self):
return ["platform.json"]
def get_manifest_path(self, pkg_dir):
if not isdir(pkg_dir):
return None
for name in self.manifest_names:
manifest_path = join(pkg_dir, name)
if isfile(manifest_path):
return manifest_path
return None
def install(
self,
name,
requirements=None,
with_packages=None,
without_packages=None,
skip_default_package=False,
with_all_packages=False,
after_update=False,
silent=False,
force=False,
**_
): # pylint: disable=too-many-arguments, arguments-differ
platform_dir = BasePkgManager.install(
self, name, requirements, silent=silent, force=force
)
p = PlatformFactory.new(platform_dir)
if with_all_packages:
with_packages = list(p.packages.keys())
# don't cleanup packages or install them after update
# we check packages for updates in def update()
if after_update:
p.install_python_packages()
p.on_installed()
return True
p.install_packages(
with_packages,
without_packages,
skip_default_package,
silent=silent,
force=force,
)
p.install_python_packages()
p.on_installed()
return self.cleanup_packages(list(p.packages))
def uninstall(self, package, requirements=None, after_update=False):
if isdir(package):
pkg_dir = package
else:
name, requirements, url = self.parse_pkg_uri(package, requirements)
pkg_dir = self.get_package_dir(name, requirements, url)
if not pkg_dir:
raise UnknownPlatform(package)
p = PlatformFactory.new(pkg_dir)
BasePkgManager.uninstall(self, pkg_dir, requirements)
p.uninstall_python_packages()
p.on_uninstalled()
# don't cleanup packages or install them after update
# we check packages for updates in def update()
if after_update:
return True
return self.cleanup_packages(list(p.packages))
def update( # pylint: disable=arguments-differ
self, package, requirements=None, only_check=False, only_packages=False
):
if isdir(package):
pkg_dir = package
else:
name, requirements, url = self.parse_pkg_uri(package, requirements)
pkg_dir = self.get_package_dir(name, requirements, url)
if not pkg_dir:
raise UnknownPlatform(package)
p = PlatformFactory.new(pkg_dir)
pkgs_before = list(p.get_installed_packages())
missed_pkgs = set()
if not only_packages:
BasePkgManager.update(self, pkg_dir, requirements, only_check)
p = PlatformFactory.new(pkg_dir)
missed_pkgs = set(pkgs_before) & set(p.packages)
missed_pkgs -= set(p.get_installed_packages())
p.update_packages(only_check)
self.cleanup_packages(list(p.packages))
if missed_pkgs:
p.install_packages(
with_packages=list(missed_pkgs), skip_default_package=True
)
return True
def cleanup_packages(self, names):
self.cache_reset()
deppkgs = {}
for manifest in PlatformManager().get_installed():
p = PlatformFactory.new(manifest["__pkg_dir"])
for pkgname, pkgmanifest in p.get_installed_packages().items():
if pkgname not in deppkgs:
deppkgs[pkgname] = set()
deppkgs[pkgname].add(pkgmanifest["version"])
pm = PackageManager(self.config.get_optional_dir("packages"))
for manifest in pm.get_installed():
if manifest["name"] not in names:
continue
if (
manifest["name"] not in deppkgs
or manifest["version"] not in deppkgs[manifest["name"]]
):
try:
pm.uninstall(manifest["__pkg_dir"], after_update=True)
except exception.UnknownPackage:
pass
self.cache_reset()
return True
@util.memoized(expire="5s")
def get_installed_boards(self):
boards = []
for manifest in self.get_installed():
p = PlatformFactory.new(manifest["__pkg_dir"])
for config in p.get_boards().values():
board = config.get_brief_data()
if board not in boards:
boards.append(board)
return boards
@staticmethod
def get_registered_boards():
return util.get_api_result("/boards", cache_valid="7d")
def get_all_boards(self):
boards = self.get_installed_boards()
know_boards = ["%s:%s" % (b["platform"], b["id"]) for b in boards]
try:
for board in self.get_registered_boards():
key = "%s:%s" % (board["platform"], board["id"])
if key not in know_boards:
boards.append(board)
except (exception.APIRequestError, exception.InternetIsOffline):
pass
return sorted(boards, key=lambda b: b["name"])
def board_config(self, id_, platform=None):
for manifest in self.get_installed_boards():
if manifest["id"] == id_ and (
not platform or manifest["platform"] == platform
):
return manifest
for manifest in self.get_registered_boards():
if manifest["id"] == id_ and (
not platform or manifest["platform"] == platform
):
return manifest
raise UnknownBoard(id_)

View File

@ -42,17 +42,26 @@ class PackageManagerInstallMixin(object):
with FileUnpacker(src) as fu:
return fu.unpack(dst, with_progress=False)
def install(self, spec, silent=False, force=False):
def install(self, spec, silent=False, skip_dependencies=False, force=False):
try:
self.lock()
pkg = self._install(spec, silent=silent, force=force)
pkg = self._install(
spec, silent=silent, skip_dependencies=skip_dependencies, force=force
)
self.memcache_reset()
self.cleanup_expired_downloads()
return pkg
finally:
self.unlock()
def _install(self, spec, search_filters=None, silent=False, force=False):
def _install( # pylint: disable=too-many-arguments
self,
spec,
search_filters=None,
silent=False,
skip_dependencies=False,
force=False,
):
spec = self.ensure_spec(spec)
# avoid circle dependencies
@ -104,11 +113,12 @@ class PackageManagerInstallMixin(object):
)
self.memcache_reset()
self._install_dependencies(pkg, silent)
if not skip_dependencies:
self.install_dependencies(pkg, silent)
self._INSTALL_HISTORY[spec] = pkg
return pkg
def _install_dependencies(self, pkg, silent=False):
def install_dependencies(self, pkg, silent=False):
assert isinstance(pkg, PackageItem)
manifest = self.load_manifest(pkg)
if not manifest.get("dependencies"):

View File

@ -91,6 +91,9 @@ class PackageManageRegistryMixin(object):
self.print_multi_package_issue(packages, spec)
package, version = self.find_best_registry_version(packages, spec)
if not package or not version:
raise UnknownPackageError(spec.humanize())
pkgfile = self._pick_compatible_pkg_file(version["files"]) if version else None
if not pkgfile:
raise UnknownPackageError(spec.humanize())
@ -189,7 +192,7 @@ class PackageManageRegistryMixin(object):
return (package, version)
if not spec.requirements:
return None
return (None, None)
# if the custom version requirements, check ALL package versions
for package in packages:
@ -206,7 +209,7 @@ class PackageManageRegistryMixin(object):
if version:
return (package, version)
time.sleep(1)
return None
return (None, None)
def pick_best_registry_version(self, versions, spec=None):
assert not spec or isinstance(spec, PackageSpec)

View File

@ -44,7 +44,7 @@ class PackageManagerUninstallMixin(object):
# firstly, remove dependencies
if not skip_dependencies:
self._uninstall_dependencies(pkg, silent)
self.uninstall_dependencies(pkg, silent)
if os.path.islink(pkg.path):
os.unlink(pkg.path)
@ -72,7 +72,7 @@ class PackageManagerUninstallMixin(object):
return pkg
def _uninstall_dependencies(self, pkg, silent=False):
def uninstall_dependencies(self, pkg, silent=False):
assert isinstance(pkg, PackageItem)
manifest = self.load_manifest(pkg)
if not manifest.get("dependencies"):

View File

@ -74,17 +74,24 @@ class PackageManagerUpdateMixin(object):
).version
)
def update(self, from_spec, to_spec=None, only_check=False, silent=False):
def update( # pylint: disable=too-many-arguments
self,
from_spec,
to_spec=None,
only_check=False,
silent=False,
show_incompatible=True,
):
pkg = self.get_package(from_spec)
if not pkg or not pkg.metadata:
raise UnknownPackageError(from_spec)
if not silent:
click.echo(
"{} {:<45} {:<30}".format(
"{} {:<45} {:<35}".format(
"Checking" if only_check else "Updating",
click.style(pkg.metadata.spec.humanize(), fg="cyan"),
"%s (%s)" % (pkg.metadata.version, to_spec.requirements)
"%s @ %s" % (pkg.metadata.version, to_spec.requirements)
if to_spec and to_spec.requirements
else str(pkg.metadata.version),
),
@ -97,17 +104,9 @@ class PackageManagerUpdateMixin(object):
outdated = self.outdated(pkg, to_spec)
if not silent:
self.print_outdated_state(outdated)
self.print_outdated_state(outdated, show_incompatible)
up_to_date = any(
[
outdated.detached,
not outdated.latest,
outdated.latest and outdated.current == outdated.latest,
outdated.wanted and outdated.current == outdated.wanted,
]
)
if only_check or up_to_date:
if only_check or not outdated.is_outdated(allow_incompatible=False):
return pkg
try:
@ -117,18 +116,26 @@ class PackageManagerUpdateMixin(object):
self.unlock()
@staticmethod
def print_outdated_state(outdated):
def print_outdated_state(outdated, show_incompatible=True):
if outdated.detached:
return click.echo("[%s]" % (click.style("Detached", fg="yellow")))
if not outdated.latest or outdated.current == outdated.latest:
if (
not outdated.latest
or outdated.current == outdated.latest
or (not show_incompatible and outdated.current == outdated.wanted)
):
return click.echo("[%s]" % (click.style("Up-to-date", fg="green")))
if outdated.wanted and outdated.current == outdated.wanted:
return click.echo(
"[%s]"
% (click.style("Incompatible (%s)" % outdated.latest, fg="yellow"))
"[%s]" % (click.style("Incompatible %s" % outdated.latest, fg="yellow"))
)
return click.echo(
"[%s]" % (click.style(str(outdated.wanted or outdated.latest), fg="red"))
"[%s]"
% (
click.style(
"Outdated %s" % str(outdated.wanted or outdated.latest), fg="red"
)
)
)
def _update(self, pkg, outdated, silent=False):

View File

@ -190,6 +190,10 @@ class BasePackageManager( # pylint: disable=too-many-public-methods
return metadata
def get_installed(self):
cache_key = "get_installed"
if self.memcache_get(cache_key):
return self.memcache_get(cache_key)
result = []
for name in sorted(os.listdir(self.package_dir)):
pkg_dir = os.path.join(self.package_dir, name)
@ -213,6 +217,8 @@ class BasePackageManager( # pylint: disable=too-many-public-methods
except MissingPackageManifestError:
pass
result.append(pkg)
self.memcache_set(cache_key, result)
return result
def get_package(self, spec):

View File

@ -12,8 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio import util
from platformio.exception import APIRequestError, InternetIsOffline
from platformio.package.exception import UnknownPackageError
from platformio.package.manager.base import BasePackageManager
from platformio.package.manager.tool import ToolPackageManager
from platformio.package.meta import PackageType
from platformio.platform.exception import IncompatiblePlatform, UnknownBoard
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectConfig
@ -28,3 +34,161 @@ class PlatformPackageManager(BasePackageManager): # pylint: disable=too-many-an
@property
def manifest_names(self):
return PackageType.get_manifest_map()[PackageType.PLATFORM]
def install( # pylint: disable=arguments-differ, too-many-arguments
self,
spec,
with_packages=None,
without_packages=None,
skip_default_package=False,
with_all_packages=False,
silent=False,
force=False,
):
pkg = super(PlatformPackageManager, self).install(
spec, silent=silent, force=force, skip_dependencies=True
)
try:
p = PlatformFactory.new(pkg)
p.ensure_engine_compatible()
except IncompatiblePlatform as e:
super(PlatformPackageManager, self).uninstall(
pkg, silent=silent, skip_dependencies=True
)
raise e
if with_all_packages:
with_packages = list(p.packages)
p.install_packages(
with_packages,
without_packages,
skip_default_package,
silent=silent,
force=force,
)
p.install_python_packages()
p.on_installed()
self.cleanup_packages(list(p.packages))
return pkg
def uninstall(self, spec, silent=False, skip_dependencies=False):
pkg = self.get_package(spec)
if not pkg or not pkg.metadata:
raise UnknownPackageError(spec)
p = PlatformFactory.new(pkg)
assert super(PlatformPackageManager, self).uninstall(
pkg, silent=silent, skip_dependencies=True
)
if not skip_dependencies:
p.uninstall_python_packages()
p.on_uninstalled()
self.cleanup_packages(list(p.packages))
return pkg
def update( # pylint: disable=arguments-differ, too-many-arguments
self,
from_spec,
to_spec=None,
only_check=False,
silent=False,
show_incompatible=True,
only_packages=False,
):
pkg = self.get_package(from_spec)
if not pkg or not pkg.metadata:
raise UnknownPackageError(from_spec)
p = PlatformFactory.new(pkg)
pkgs_before = [item.metadata.name for item in p.get_installed_packages()]
new_pkg = None
missed_pkgs = set()
if not only_packages:
new_pkg = super(PlatformPackageManager, self).update(
from_spec,
to_spec,
only_check=only_check,
silent=silent,
show_incompatible=show_incompatible,
)
p = PlatformFactory.new(new_pkg)
missed_pkgs = set(pkgs_before) & set(p.packages)
missed_pkgs -= set(
item.metadata.name for item in p.get_installed_packages()
)
p.update_packages(only_check)
self.cleanup_packages(list(p.packages))
if missed_pkgs:
p.install_packages(
with_packages=list(missed_pkgs), skip_default_package=True
)
return new_pkg or pkg
def cleanup_packages(self, names):
self.memcache_reset()
deppkgs = {}
for platform in PlatformPackageManager().get_installed():
p = PlatformFactory.new(platform)
for pkg in p.get_installed_packages():
if pkg.metadata.name not in deppkgs:
deppkgs[pkg.metadata.name] = set()
deppkgs[pkg.metadata.name].add(pkg.metadata.version)
pm = ToolPackageManager()
for pkg in pm.get_installed():
if pkg.metadata.name not in names:
continue
if (
pkg.metadata.name not in deppkgs
or pkg.metadata.version not in deppkgs[pkg.metadata.name]
):
try:
pm.uninstall(pkg.metadata.spec)
except UnknownPackageError:
pass
self.memcache_reset()
return True
@util.memoized(expire="5s")
def get_installed_boards(self):
boards = []
for pkg in self.get_installed():
p = PlatformFactory.new(pkg)
for config in p.get_boards().values():
board = config.get_brief_data()
if board not in boards:
boards.append(board)
return boards
@staticmethod
def get_registered_boards():
return util.get_api_result("/boards", cache_valid="7d")
def get_all_boards(self):
boards = self.get_installed_boards()
know_boards = ["%s:%s" % (b["platform"], b["id"]) for b in boards]
try:
for board in self.get_registered_boards():
key = "%s:%s" % (board["platform"], board["id"])
if key not in know_boards:
boards.append(board)
except (APIRequestError, InternetIsOffline):
pass
return sorted(boards, key=lambda b: b["name"])
def board_config(self, id_, platform=None):
for manifest in self.get_installed_boards():
if manifest["id"] == id_ and (
not platform or manifest["platform"] == platform
):
return manifest
for manifest in self.get_registered_boards():
if manifest["id"] == id_ and (
not platform or manifest["platform"] == platform
):
return manifest
raise UnknownBoard(id_)

View File

@ -19,10 +19,9 @@ from platformio.project.config import ProjectConfig
class ToolPackageManager(BasePackageManager): # pylint: disable=too-many-ancestors
def __init__(self, package_dir=None):
self.config = ProjectConfig.get_instance()
super(ToolPackageManager, self).__init__(
PackageType.TOOL, package_dir or self.config.get_optional_dir("packages"),
)
if not package_dir:
package_dir = ProjectConfig.get_instance().get_optional_dir("packages")
super(ToolPackageManager, self).__init__(PackageType.TOOL, package_dir)
@property
def manifest_names(self):

View File

@ -13,9 +13,62 @@
# limitations under the License.
from platformio.package.exception import UnknownPackageError
from platformio.package.meta import PackageSpec
class PlatformPackagesMixin(object):
def get_package_spec(self, name):
version = self.packages[name].get("version", "")
if any(c in version for c in (":", "/", "@")):
return PackageSpec("%s=%s" % (name, version))
return PackageSpec(
owner=self.packages[name].get("owner"), name=name, requirements=version
)
def get_package(self, name):
if not name:
return None
return self.pm.get_package(self.get_package_spec(name))
def get_package_dir(self, name):
pkg = self.get_package(name)
return pkg.path if pkg else None
def get_package_version(self, name):
pkg = self.get_package(name)
return str(pkg.metadata.version) if pkg else None
def get_installed_packages(self):
result = []
for name in self.packages:
pkg = self.get_package(name)
if pkg:
result.append(pkg)
return result
def dump_used_packages(self):
result = []
for name, options in self.packages.items():
if options.get("optional"):
continue
pkg = self.get_package(name)
if not pkg or not pkg.metadata:
continue
item = {"name": pkg.metadata.name, "version": str(pkg.metadata.version)}
if pkg.metadata.spec.external:
item["src_url"] = pkg.metadata.spec.url
result.append(item)
return result
def autoinstall_runtime_packages(self):
for name, options in self.packages.items():
if options.get("optional", False):
continue
if self.get_package(name):
continue
self.pm.install(self.get_package_spec(name))
return True
def install_packages( # pylint: disable=too-many-arguments
self,
with_packages=None,
@ -24,31 +77,25 @@ class PlatformPackagesMixin(object):
silent=False,
force=False,
):
with_packages = set(self.find_pkg_names(with_packages or []))
without_packages = set(self.find_pkg_names(without_packages or []))
with_packages = set(self._find_pkg_names(with_packages or []))
without_packages = set(self._find_pkg_names(without_packages or []))
upkgs = with_packages | without_packages
ppkgs = set(self.packages)
if not upkgs.issubset(ppkgs):
raise UnknownPackageError(", ".join(upkgs - ppkgs))
for name, opts in self.packages.items():
version = opts.get("version", "")
for name, options in self.packages.items():
if name in without_packages:
continue
if name in with_packages or not (
skip_default_package or opts.get("optional", False)
skip_default_package or options.get("optional", False)
):
if ":" in version:
self.pm.install(
"%s=%s" % (name, version), silent=silent, force=force
)
else:
self.pm.install(name, version, silent=silent, force=force)
self.pm.install(self.get_package_spec(name), silent=silent, force=force)
return True
def find_pkg_names(self, candidates):
def _find_pkg_names(self, candidates):
result = []
for candidate in candidates:
found = False
@ -73,54 +120,18 @@ class PlatformPackagesMixin(object):
return result
def update_packages(self, only_check=False):
for name, manifest in self.get_installed_packages().items():
requirements = self.packages[name].get("version", "")
if ":" in requirements:
_, requirements, __ = self.pm.parse_pkg_uri(requirements)
self.pm.update(manifest["__pkg_dir"], requirements, only_check)
def get_installed_packages(self):
items = {}
for name in self.packages:
pkg_dir = self.get_package_dir(name)
if pkg_dir:
items[name] = self.pm.load_manifest(pkg_dir)
return items
for pkg in self.get_installed_packages():
self.pm.update(
pkg,
to_spec=self.get_package_spec(pkg.metadata.name),
only_check=only_check,
show_incompatible=False,
)
def are_outdated_packages(self):
for name, manifest in self.get_installed_packages().items():
requirements = self.packages[name].get("version", "")
if ":" in requirements:
_, requirements, __ = self.pm.parse_pkg_uri(requirements)
if self.pm.outdated(manifest["__pkg_dir"], requirements):
for pkg in self.get_installed_packages():
if self.pm.outdated(
pkg, self.get_package_spec(pkg.metadata.name)
).is_outdated(allow_incompatible=False):
return True
return False
def get_package_dir(self, name):
version = self.packages[name].get("version", "")
if ":" in version:
return self.pm.get_package_dir(
*self.pm.parse_pkg_uri("%s=%s" % (name, version))
)
return self.pm.get_package_dir(name, version)
def get_package_version(self, name):
pkg_dir = self.get_package_dir(name)
if not pkg_dir:
return None
return self.pm.load_manifest(pkg_dir).get("version")
def dump_used_packages(self):
result = []
for name, options in self.packages.items():
if options.get("optional"):
continue
pkg_dir = self.get_package_dir(name)
if not pkg_dir:
continue
manifest = self.pm.load_manifest(pkg_dir)
item = {"name": manifest["name"], "version": manifest["version"]}
if manifest.get("__src_url"):
item["src_url"] = manifest.get("__src_url")
result.append(item)
return result

View File

@ -50,12 +50,14 @@ class PlatformRunMixin(object):
assert isinstance(variables, dict)
assert isinstance(targets, list)
self.ensure_engine_compatible()
options = self.config.items(env=variables["pioenv"], as_dict=True)
if "framework" in options:
# support PIO Core 3.0 dev/platforms
options["pioframework"] = options["framework"]
self.configure_default_packages(options, targets)
self.install_packages(silent=True)
self.autoinstall_runtime_packages()
self._report_non_sensitive_data(options, targets)
@ -84,8 +86,6 @@ class PlatformRunMixin(object):
for item in self.dump_used_packages()
]
topts["platform"] = {"name": self.name, "version": self.version}
if self.src_version:
topts["platform"]["src_version"] = self.src_version
telemetry.send_run_environment(topts, targets)
def _run_scons(self, variables, targets, jobs):

View File

@ -19,11 +19,11 @@ import click
import semantic_version
from platformio import __version__, fs, proc, util
from platformio.managers.package import PackageManager
from platformio.package.manager.tool import ToolPackageManager
from platformio.platform._packages import PlatformPackagesMixin
from platformio.platform._run import PlatformRunMixin
from platformio.platform.board import PlatformBoardConfig
from platformio.platform.exception import UnknownBoard
from platformio.platform.exception import IncompatiblePlatform, UnknownBoard
from platformio.project.config import ProjectConfig
@ -44,20 +44,7 @@ class PlatformBase( # pylint: disable=too-many-instance-attributes,too-many-pub
self._custom_packages = None
self.config = ProjectConfig.get_instance()
self.pm = PackageManager(
self.config.get_optional_dir("packages"), self.package_repositories
)
self._src_manifest = None
src_manifest_path = self.pm.get_src_manifest_path(self.get_dir())
if src_manifest_path:
self._src_manifest = fs.load_json(src_manifest_path)
# if self.engines and "platformio" in self.engines:
# if self.PIO_VERSION not in semantic_version.SimpleSpec(
# self.engines['platformio']):
# raise exception.IncompatiblePlatform(self.name,
# str(self.PIO_VERSION))
self.pm = ToolPackageManager(self.config.get_optional_dir("packages"))
@property
def name(self):
@ -75,14 +62,6 @@ class PlatformBase( # pylint: disable=too-many-instance-attributes,too-many-pub
def version(self):
return self._manifest["version"]
@property
def src_version(self):
return self._src_manifest.get("version") if self._src_manifest else None
@property
def src_url(self):
return self._src_manifest.get("url") if self._src_manifest else None
@property
def homepage(self):
return self._manifest.get("homepage")
@ -103,10 +82,6 @@ class PlatformBase( # pylint: disable=too-many-instance-attributes,too-many-pub
def engines(self):
return self._manifest.get("engines")
@property
def package_repositories(self):
return self._manifest.get("packageRepositories")
@property
def manifest(self):
return self._manifest
@ -114,21 +89,32 @@ class PlatformBase( # pylint: disable=too-many-instance-attributes,too-many-pub
@property
def packages(self):
packages = self._manifest.get("packages", {})
for item in self._custom_packages or []:
name = item
version = "*"
if "@" in item:
name, version = item.split("@", 2)
name = name.strip()
if name not in packages:
packages[name] = {}
packages[name].update({"version": version.strip(), "optional": False})
for spec in self._custom_packages or []:
spec = self.pm.ensure_spec(spec)
if spec.external:
version = spec.url
else:
version = str(spec.requirements) or "*"
if spec.name not in packages:
packages[spec.name] = {}
packages[spec.name].update(
{"owner": spec.owner, "version": version, "optional": False}
)
return packages
@property
def python_packages(self):
return self._manifest.get("pythonPackages")
def ensure_engine_compatible(self):
if not self.engines or "platformio" not in self.engines:
return True
if self.PIO_VERSION in semantic_version.SimpleSpec(self.engines["platformio"]):
return True
raise IncompatiblePlatform(
self.name, str(self.PIO_VERSION), self.engines["platformio"]
)
def get_dir(self):
return os.path.dirname(self.manifest_path)
@ -218,10 +204,10 @@ class PlatformBase( # pylint: disable=too-many-instance-attributes,too-many-pub
for opts in (self.frameworks or {}).values():
if "package" not in opts:
continue
pkg_dir = self.get_package_dir(opts["package"])
if not pkg_dir or not os.path.isdir(os.path.join(pkg_dir, "libraries")):
pkg = self.get_package(opts["package"])
if not pkg or not os.path.isdir(os.path.join(pkg.path, "libraries")):
continue
libs_dir = os.path.join(pkg_dir, "libraries")
libs_dir = os.path.join(pkg.path, "libraries")
storages[libs_dir] = opts["package"]
libcores_dir = os.path.join(libs_dir, "__cores__")
if not os.path.isdir(libcores_dir):

View File

@ -26,7 +26,10 @@ class UnknownPlatform(PlatformException):
class IncompatiblePlatform(PlatformException):
MESSAGE = "Development platform '{0}' is not compatible with PIO Core v{1}"
MESSAGE = (
"Development platform '{0}' is not compatible with PlatformIO Core v{1} and "
"depends on PlatformIO Core {2}.\n"
)
class UnknownBoard(PlatformException):

View File

@ -16,7 +16,7 @@ import os
import re
from platformio.compat import load_python_module
from platformio.package.manager.platform import PlatformPackageManager
from platformio.package.meta import PackageItem
from platformio.platform.base import PlatformBase
from platformio.platform.exception import UnknownPlatform
@ -36,9 +36,16 @@ class PlatformFactory(object):
@classmethod
def new(cls, pkg_or_spec):
pkg = PlatformPackageManager().get_package(
"file://%s" % pkg_or_spec if os.path.isdir(pkg_or_spec) else pkg_or_spec
)
if isinstance(pkg_or_spec, PackageItem):
pkg = pkg_or_spec
else:
from platformio.package.manager.platform import ( # pylint: disable=import-outside-toplevel
PlatformPackageManager,
)
pkg = PlatformPackageManager().get_package(
"file://%s" % pkg_or_spec if os.path.isdir(pkg_or_spec) else pkg_or_spec
)
if not pkg:
raise UnknownPlatform(pkg_or_spec)

View File

@ -14,8 +14,9 @@
import json
from platformio import exception
from platformio.commands import platform as cli_platform
from platformio.package.exception import UnknownPackageError
from platformio.platform.exception import IncompatiblePlatform
def test_search_json_output(clirunner, validate_cliresult, isolated_pio_core):
@ -39,22 +40,30 @@ def test_search_raw_output(clirunner, validate_cliresult):
def test_install_unknown_version(clirunner):
result = clirunner.invoke(cli_platform.platform_install, ["atmelavr@99.99.99"])
assert result.exit_code != 0
assert isinstance(result.exception, exception.UndefinedPackageVersion)
assert isinstance(result.exception, UnknownPackageError)
def test_install_unknown_from_registry(clirunner):
result = clirunner.invoke(cli_platform.platform_install, ["unknown-platform"])
assert result.exit_code != 0
assert isinstance(result.exception, exception.UnknownPackage)
assert isinstance(result.exception, UnknownPackageError)
def test_install_incompatbile(clirunner, validate_cliresult, isolated_pio_core):
result = clirunner.invoke(
cli_platform.platform_install, ["atmelavr@1.2.0", "--skip-default-package"],
)
assert result.exit_code != 0
assert isinstance(result.exception, IncompatiblePlatform)
def test_install_known_version(clirunner, validate_cliresult, isolated_pio_core):
result = clirunner.invoke(
cli_platform.platform_install,
["atmelavr@1.2.0", "--skip-default-package", "--with-package", "tool-avrdude"],
["atmelavr@2.0.0", "--skip-default-package", "--with-package", "tool-avrdude"],
)
validate_cliresult(result)
assert "atmelavr @ 1.2.0" in result.output
assert "atmelavr @ 2.0.0" in result.output
assert "Installing tool-avrdude @" in result.output
assert len(isolated_pio_core.join("packages").listdir()) == 1
@ -63,7 +72,7 @@ def test_install_from_vcs(clirunner, validate_cliresult, isolated_pio_core):
result = clirunner.invoke(
cli_platform.platform_install,
[
"https://github.com/platformio/" "platform-espressif8266.git",
"https://github.com/platformio/platform-espressif8266.git",
"--skip-default-package",
],
)
@ -90,7 +99,7 @@ def test_list_raw_output(clirunner, validate_cliresult):
def test_update_check(clirunner, validate_cliresult, isolated_pio_core):
result = clirunner.invoke(
cli_platform.platform_update, ["--only-check", "--json-output"]
cli_platform.platform_update, ["--dry-run", "--json-output"]
)
validate_cliresult(result)
output = json.loads(result.output)
@ -102,9 +111,9 @@ def test_update_check(clirunner, validate_cliresult, isolated_pio_core):
def test_update_raw(clirunner, validate_cliresult, isolated_pio_core):
result = clirunner.invoke(cli_platform.platform_update)
validate_cliresult(result)
assert "Uninstalling atmelavr @ 1.2.0:" in result.output
assert "PlatformManager: Installing atmelavr @" in result.output
assert len(isolated_pio_core.join("packages").listdir()) == 1
assert "Removing atmelavr @ 2.0.0:" in result.output
assert "Platform Manager: Installing platformio/atmelavr @" in result.output
assert len(isolated_pio_core.join("packages").listdir()) == 2
def test_uninstall(clirunner, validate_cliresult, isolated_pio_core):

View File

@ -13,13 +13,13 @@
# limitations under the License.
import json
import os
import re
from time import time
from platformio import app, maintenance
from platformio.__main__ import cli as cli_pio
from platformio.commands import upgrade as cmd_upgrade
from platformio.managers.platform import PlatformManager
def test_check_pio_upgrade(clirunner, isolated_pio_core, validate_cliresult):
@ -89,7 +89,8 @@ def test_check_and_update_libraries(clirunner, isolated_pio_core, validate_clire
assert "There are the new updates for libraries (ArduinoJson)" in result.output
assert "Please wait while updating libraries" in result.output
assert re.search(
r"Updating bblanchon/ArduinoJson\s+6\.12\.0\s+\[[\d\.]+\]", result.output
r"Updating bblanchon/ArduinoJson\s+6\.12\.0\s+\[Outdated [\d\.]+\]",
result.output,
)
# check updated version
@ -102,12 +103,11 @@ def test_check_platform_updates(clirunner, isolated_pio_core, validate_cliresult
# install obsolete platform
result = clirunner.invoke(cli_pio, ["platform", "install", "native"])
validate_cliresult(result)
os.remove(str(isolated_pio_core.join("platforms", "native", ".piopm")))
manifest_path = isolated_pio_core.join("platforms", "native", "platform.json")
manifest = json.loads(manifest_path.read())
manifest["version"] = "0.0.0"
manifest_path.write(json.dumps(manifest))
# reset cached manifests
PlatformManager().cache_reset()
# reset check time
interval = int(app.get_setting("check_platforms_interval")) * 3600 * 24
@ -141,7 +141,7 @@ def test_check_and_update_platforms(clirunner, isolated_pio_core, validate_clire
validate_cliresult(result)
assert "There are the new updates for platforms (native)" in result.output
assert "Please wait while updating platforms" in result.output
assert re.search(r"Updating native\s+@ 0.0.0\s+\[[\d\.]+\]", result.output)
assert re.search(r"Updating native\s+0.0.0\s+\[Outdated [\d\.]+\]", result.output)
# check updated version
result = clirunner.invoke(cli_pio, ["platform", "list", "--json-output"])