forked from platformio/platformio-core
517 lines
17 KiB
Python
517 lines
17 KiB
Python
# Copyright 2014-present Ivan Kravets <me@ikravets.com>
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import base64
|
|
import os
|
|
import re
|
|
import sys
|
|
from imp import load_source
|
|
from multiprocessing import cpu_count
|
|
from os.path import basename, dirname, isdir, isfile, join
|
|
|
|
import click
|
|
import semantic_version
|
|
|
|
from platformio import app, exception, util
|
|
from platformio.managers.package import BasePkgManager, PackageManager
|
|
|
|
PACKAGE_DIR = join(util.get_home_dir(), "packages")
|
|
|
|
|
|
class PlatformManager(BasePkgManager):
|
|
|
|
def __init__(self):
|
|
BasePkgManager.__init__(
|
|
self,
|
|
join(util.get_home_dir(), "platforms"),
|
|
["http://dl.platformio.org/platforms/manifest.json"]
|
|
)
|
|
|
|
@property
|
|
def manifest_name(self):
|
|
return "platform.json"
|
|
|
|
def install(self, # pylint: disable=too-many-arguments,arguments-differ
|
|
name, requirements=None, with_packages=None,
|
|
without_packages=None, skip_default_packages=False):
|
|
manifest_path = BasePkgManager.install(self, name, requirements)
|
|
p = PlatformFactory.newPlatform(manifest_path, requirements)
|
|
p.install_packages(
|
|
with_packages, without_packages, skip_default_packages)
|
|
self.cleanup_packages(p.packages.keys())
|
|
return True
|
|
|
|
def uninstall(self, # pylint: disable=arguments-differ
|
|
name, requirements=None):
|
|
p = PlatformFactory.newPlatform(name, requirements)
|
|
BasePkgManager.uninstall(self, name, requirements)
|
|
self.cleanup_packages(p.packages.keys())
|
|
return True
|
|
|
|
def update(self, # pylint: disable=arguments-differ
|
|
name, requirements=None, only_packages=False):
|
|
if not only_packages:
|
|
BasePkgManager.update(self, name, requirements)
|
|
p = PlatformFactory.newPlatform(name, requirements)
|
|
p.update_packages()
|
|
self.cleanup_packages(p.packages.keys())
|
|
return True
|
|
|
|
def is_outdated(self, name, requirements=None):
|
|
p = PlatformFactory.newPlatform(name, requirements)
|
|
return (p.are_outdated_packages() or
|
|
p.version != self.get_latest_repo_version(name, requirements))
|
|
|
|
def cleanup_packages(self, names):
|
|
self.reset_cache()
|
|
deppkgs = {}
|
|
for manifest in PlatformManager().get_installed():
|
|
p = PlatformFactory.newPlatform(
|
|
manifest['name'], manifest['version'])
|
|
for pkgname, pkgmanifest in p.get_installed_packages().items():
|
|
if pkgname not in deppkgs:
|
|
deppkgs[pkgname] = set()
|
|
deppkgs[pkgname].add(pkgmanifest['version'])
|
|
|
|
pm = PackageManager(PACKAGE_DIR)
|
|
for manifest in pm.get_installed():
|
|
if manifest['name'] not in names:
|
|
continue
|
|
if (manifest['name'] not in deppkgs or
|
|
manifest['version'] not in deppkgs[manifest['name']]):
|
|
pm.uninstall(
|
|
manifest['name'], manifest['version'], trigger_event=False)
|
|
|
|
self.reset_cache()
|
|
return True
|
|
|
|
def get_installed_boards(self):
|
|
boards = []
|
|
for manifest in self.get_installed():
|
|
p = PlatformFactory.newPlatform(manifest['_manifest_path'])
|
|
for config in p.get_boards().values():
|
|
boards.append(config.get_brief_data())
|
|
return boards
|
|
|
|
@staticmethod
|
|
@util.memoized
|
|
def get_registered_boards():
|
|
boards = util.get_api_result("/boards")
|
|
for item in boards:
|
|
# @TODO remove type from API
|
|
item['id'] = item['type']
|
|
del item['type']
|
|
return boards
|
|
|
|
|
|
class PlatformFactory(object):
|
|
|
|
@staticmethod
|
|
def get_clsname(name):
|
|
return "%s%sPlatform" % (name.upper()[0], name.lower()[1:])
|
|
|
|
@staticmethod
|
|
def load_module(name, path):
|
|
module = None
|
|
try:
|
|
module = load_source(
|
|
"platformio.managers.platform.%s" % name, path)
|
|
except ImportError:
|
|
raise exception.UnknownPlatform(name)
|
|
return module
|
|
|
|
@classmethod
|
|
def newPlatform(cls, name, requirements=None):
|
|
platform_dir = None
|
|
if name.endswith("platform.json") and isfile(name):
|
|
platform_dir = dirname(name)
|
|
name = util.load_json(name)['name']
|
|
else:
|
|
_manifest = PlatformManager().max_satisfying_version(
|
|
name, requirements)
|
|
if _manifest:
|
|
platform_dir = dirname(_manifest['_manifest_path'])
|
|
|
|
if not platform_dir:
|
|
raise exception.UnknownPlatform(
|
|
name if not requirements else "%s@%s" % (name, requirements))
|
|
|
|
platform_cls = None
|
|
if isfile(join(platform_dir, "platform.py")):
|
|
platform_cls = getattr(
|
|
cls.load_module(name, join(platform_dir, "platform.py")),
|
|
cls.get_clsname(name)
|
|
)
|
|
else:
|
|
platform_cls = type(
|
|
str(cls.get_clsname(name)), (PlatformBase,), {})
|
|
|
|
_instance = platform_cls(join(platform_dir, "platform.json"))
|
|
assert isinstance(_instance, PlatformBase)
|
|
return _instance
|
|
|
|
|
|
class PlatformPackagesMixin(object):
|
|
|
|
def get_installed_packages(self):
|
|
items = {}
|
|
installed = self.pm.get_installed()
|
|
for name, opts in self.packages.items():
|
|
manifest = None
|
|
for p in installed:
|
|
if (p['name'] != name or not semantic_version.match(
|
|
opts['version'], p['version'])):
|
|
continue
|
|
elif (not manifest or semantic_version.compare(
|
|
p['version'], manifest['version']) == 1):
|
|
manifest = p
|
|
if manifest:
|
|
items[name] = manifest
|
|
return items
|
|
|
|
def install_packages(self, with_packages=None, without_packages=None,
|
|
skip_default_packages=False, silent=False):
|
|
with_packages = set(
|
|
self.pkg_types_to_names(with_packages or []))
|
|
without_packages = set(
|
|
self.pkg_types_to_names(without_packages or []))
|
|
|
|
upkgs = with_packages | without_packages
|
|
ppkgs = set(self.packages.keys())
|
|
if not upkgs.issubset(ppkgs):
|
|
raise exception.UnknownPackage(", ".join(upkgs - ppkgs))
|
|
|
|
for name, opts in self.packages.items():
|
|
if name in without_packages:
|
|
continue
|
|
elif (name in with_packages or
|
|
not (skip_default_packages or opts.get("optional", False))):
|
|
self.pm.install(name, opts.get("version"), silent=silent)
|
|
|
|
return True
|
|
|
|
def update_packages(self):
|
|
for name in self.get_installed_packages():
|
|
self.pm.update(name, self.packages[name]['version'])
|
|
|
|
def are_outdated_packages(self):
|
|
for name, opts in self.get_installed_packages().items():
|
|
if (opts['version'] != self.pm.get_latest_repo_version(
|
|
name, self.packages[name].get("version"))):
|
|
return True
|
|
return False
|
|
|
|
|
|
class PlatformRunMixin(object):
|
|
|
|
LINE_ERROR_RE = re.compile(r"(\s+error|error[:\s]+)", re.I)
|
|
|
|
def run(self, variables, targets, verbose):
|
|
assert isinstance(variables, dict)
|
|
assert isinstance(targets, list)
|
|
|
|
self.configure_default_packages(variables, targets)
|
|
self.install_packages(silent=True)
|
|
|
|
self._verbose = verbose or app.get_setting("force_verbose")
|
|
|
|
if "clean" in targets:
|
|
targets = ["-c", "."]
|
|
|
|
variables['platform_manifest'] = self.manifest_path
|
|
|
|
if "build_script" not in variables:
|
|
variables['build_script'] = self.get_build_script()
|
|
if not isfile(variables['build_script']):
|
|
raise exception.BuildScriptNotFound(variables['build_script'])
|
|
|
|
result = self._run_scons(variables, targets)
|
|
assert "returncode" in result
|
|
|
|
return result
|
|
|
|
def _run_scons(self, variables, targets):
|
|
# pass current PYTHONPATH to SCons
|
|
if "PYTHONPATH" in os.environ:
|
|
_PYTHONPATH = os.environ.get("PYTHONPATH").split(os.pathsep)
|
|
else:
|
|
_PYTHONPATH = []
|
|
for p in os.sys.path:
|
|
if p not in _PYTHONPATH:
|
|
_PYTHONPATH.append(p)
|
|
os.environ['PYTHONPATH'] = os.pathsep.join(_PYTHONPATH)
|
|
|
|
cmd = [
|
|
os.path.normpath(sys.executable),
|
|
join(self.get_package_dir("tool-scons"), "script", "scons"),
|
|
"-Q",
|
|
"-j %d" % self.get_job_nums(),
|
|
"--warn=no-no-parallel-support",
|
|
"-f", join(util.get_source_dir(), "builder", "main.py")
|
|
]
|
|
if not self._verbose and "-c" not in targets:
|
|
cmd.append("--silent")
|
|
cmd += targets
|
|
|
|
# encode and append variables
|
|
for key, value in variables.items():
|
|
cmd.append("%s=%s" % (key.upper(), base64.b64encode(value)))
|
|
|
|
result = util.exec_command(
|
|
cmd,
|
|
stdout=util.AsyncPipe(self.on_run_out),
|
|
stderr=util.AsyncPipe(self.on_run_err)
|
|
)
|
|
return result
|
|
|
|
def on_run_out(self, line):
|
|
self._echo_line(line, level=1)
|
|
|
|
def on_run_err(self, line):
|
|
is_error = self.LINE_ERROR_RE.search(line) is not None
|
|
self._echo_line(line, level=3 if is_error else 2)
|
|
|
|
@staticmethod
|
|
def _echo_line(line, level):
|
|
assert 1 <= level <= 3
|
|
fg = (None, "yellow", "red")[level - 1]
|
|
if level == 1 and "is up to date" in line:
|
|
fg = "green"
|
|
click.secho(line, fg=fg, err=level > 1)
|
|
|
|
@staticmethod
|
|
def get_job_nums():
|
|
try:
|
|
return cpu_count()
|
|
except NotImplementedError:
|
|
return 1
|
|
|
|
|
|
class PlatformBase(PlatformPackagesMixin, PlatformRunMixin):
|
|
|
|
_BOARDS_CACHE = {}
|
|
|
|
def __init__(self, manifest_path):
|
|
self._BOARDS_CACHE = {}
|
|
self.manifest_path = manifest_path
|
|
self._manifest = util.load_json(manifest_path)
|
|
|
|
self.pm = PackageManager(
|
|
PACKAGE_DIR, self._manifest.get("packageRepositories"))
|
|
|
|
self._verbose = False
|
|
|
|
@property
|
|
def name(self):
|
|
return self._manifest['name']
|
|
|
|
@property
|
|
def title(self):
|
|
return self._manifest['title']
|
|
|
|
@property
|
|
def description(self):
|
|
return self._manifest['description']
|
|
|
|
@property
|
|
def version(self):
|
|
return self._manifest['version']
|
|
|
|
@property
|
|
def homepage(self):
|
|
return self._manifest.get("homepage")
|
|
|
|
@property
|
|
def license(self):
|
|
return self._manifest.get("license")
|
|
|
|
@property
|
|
def frameworks(self):
|
|
return self._manifest.get("frameworks")
|
|
|
|
@property
|
|
def manifest(self):
|
|
return self._manifest
|
|
|
|
@property
|
|
def packages(self):
|
|
packages = self._manifest.get("packages", {})
|
|
if "tool-scons" not in packages:
|
|
packages['tool-scons'] = {
|
|
"version": self._manifest.get("engines", {}).get(
|
|
"scons", ">=2.3.0,<2.6.0"),
|
|
"optional": False
|
|
}
|
|
return packages
|
|
|
|
def get_dir(self):
|
|
return dirname(self.manifest_path)
|
|
|
|
def get_build_script(self):
|
|
main_script = join(self.get_dir(), "builder", "main.py")
|
|
if isfile(main_script):
|
|
return main_script
|
|
raise NotImplementedError()
|
|
|
|
def is_embedded(self):
|
|
for opts in self.packages.values():
|
|
if opts.get("type") == "uploader":
|
|
return True
|
|
return False
|
|
|
|
def get_boards(self, id_=None):
|
|
|
|
def _append_board(board_id, manifest_path):
|
|
config = PlatformBoardConfig(manifest_path)
|
|
if "platform" in config and config.get("platform") != self.name:
|
|
return
|
|
elif ("platforms" in config and
|
|
self.name not in config.get("platforms")):
|
|
return
|
|
config.manifest['platform'] = self.name
|
|
self._BOARDS_CACHE[board_id] = config
|
|
|
|
bdirs = (join(util.get_home_dir(), "boards"),
|
|
join(self.get_dir(), "boards"))
|
|
if id_ is None:
|
|
for boards_dir in bdirs:
|
|
if not isdir(boards_dir):
|
|
continue
|
|
for item in sorted(os.listdir(boards_dir)):
|
|
_id = item[:-5]
|
|
if not item.endswith(".json") or _id in self._BOARDS_CACHE:
|
|
continue
|
|
_append_board(_id, join(boards_dir, item))
|
|
else:
|
|
if id_ not in self._BOARDS_CACHE:
|
|
for boards_dir in bdirs:
|
|
if not isdir(boards_dir):
|
|
continue
|
|
manifest_path = join(boards_dir, "%s.json" % id_)
|
|
if not isfile(manifest_path):
|
|
continue
|
|
_append_board(id_, manifest_path)
|
|
if id_ not in self._BOARDS_CACHE:
|
|
raise exception.UnknownBoard(id_)
|
|
return self._BOARDS_CACHE[id_] if id_ else self._BOARDS_CACHE
|
|
|
|
def board_config(self, id_):
|
|
return self.get_boards(id_)
|
|
|
|
def get_package_dir(self, name):
|
|
packages = self.get_installed_packages()
|
|
if name not in packages:
|
|
return None
|
|
return dirname(packages[name]['_manifest_path'])
|
|
|
|
def get_package_version(self, name):
|
|
packages = self.get_installed_packages()
|
|
if name not in packages:
|
|
return None
|
|
return packages[name]['version']
|
|
|
|
def get_package_type(self, name):
|
|
return self.packages[name].get("type")
|
|
|
|
def pkg_types_to_names(self, types):
|
|
names = []
|
|
for type_ in types:
|
|
name = type_
|
|
# lookup by package types
|
|
for _name, _opts in self.packages.items():
|
|
if _opts.get("type") == type_:
|
|
name = None
|
|
names.append(_name)
|
|
# if type is the right name
|
|
if name:
|
|
names.append(name)
|
|
return names
|
|
|
|
def configure_default_packages(self, variables, targets):
|
|
# enbale used frameworks
|
|
for framework in variables.get("framework", "").split(","):
|
|
if not self.frameworks:
|
|
continue
|
|
framework = framework.lower().strip()
|
|
if not framework or framework not in self.frameworks:
|
|
continue
|
|
_pkg_name = self.frameworks[framework]['package']
|
|
self.packages[_pkg_name]['optional'] = False
|
|
|
|
# enable upload tools for upload targets
|
|
if any(["upload" in t for t in targets] + ["program" in targets]):
|
|
for _name, _opts in self.packages.iteritems():
|
|
if _opts.get("type") == "uploader":
|
|
self.packages[_name]['optional'] = False
|
|
elif "uploadlazy" in targets:
|
|
# skip all packages, allow only upload tools
|
|
self.packages[_name]['optional'] = True
|
|
|
|
if "test" in targets and "tool-unity" not in self.packages:
|
|
self.packages['tool-unity'] = {
|
|
"version": "~1.20302.0",
|
|
"optional": False
|
|
}
|
|
|
|
|
|
class PlatformBoardConfig(object):
|
|
|
|
def __init__(self, manifest_path):
|
|
self._id = basename(manifest_path)[:-5]
|
|
assert isfile(manifest_path)
|
|
self.manifest_path = manifest_path
|
|
self._manifest = util.load_json(manifest_path)
|
|
assert set(["name", "url", "vendor"]) <= set(self._manifest.keys())
|
|
|
|
def get(self, path, default=None):
|
|
try:
|
|
value = self._manifest
|
|
for k in path.split("."):
|
|
value = value[k]
|
|
return value
|
|
except KeyError:
|
|
if default is not None:
|
|
return default
|
|
else:
|
|
raise KeyError("Invalid board option '%s'" % path)
|
|
|
|
def __contains__(self, key):
|
|
try:
|
|
self.get(key)
|
|
return True
|
|
except KeyError:
|
|
return False
|
|
|
|
@property
|
|
def id_(self):
|
|
return self._id
|
|
|
|
@property
|
|
def manifest(self):
|
|
return self._manifest
|
|
|
|
def get_brief_data(self):
|
|
return {
|
|
"id": self.id_,
|
|
"name": self._manifest['name'],
|
|
"platform": self._manifest.get("platform"),
|
|
"mcu": self._manifest.get("build", {}).get("mcu", "").upper(),
|
|
"fcpu": int(self._manifest.get("build", {}).get("f_cpu", "")[:-1]),
|
|
"ram": self._manifest.get("upload", {}).get("maximum_ram_size", 0),
|
|
"rom": self._manifest.get("upload", {}).get("maximum_size", 0),
|
|
"frameworks": self._manifest.get("frameworks"),
|
|
"vendor": self._manifest['vendor'],
|
|
"url": self._manifest['url']
|
|
}
|