# Copyright 2014-present Ivan Kravets # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import os import re import sys from imp import load_source from multiprocessing import cpu_count from os.path import basename, dirname, isdir, isfile, join import click import semantic_version from platformio import exception, util from platformio.managers.package import PackageManager class PlatformManager(PackageManager): def __init__(self): PackageManager.__init__( self, join(util.get_home_dir(), "platforms"), ["http://dl.platformio.org/misc/platforms_manifest.json"] ) @staticmethod def get_manifest_name(): return "platform.json" def find_best_platform(self, name, requirements=None): best = None for manifest in self.get_installed(): if manifest['name'] != name: continue elif requirements and not semantic_version.match( requirements, manifest['version']): continue elif (not best or semantic_version.compare( manifest['version'], best['version']) == 1): best = manifest return best def install(self, # pylint: disable=too-many-arguments,arguments-differ name, requirements=None, with_packages=None, without_packages=None, skip_default_packages=False): manifest_path = PackageManager.install(self, name, requirements) return PlatformFactory.newPlatform( manifest_path, requirements).install_packages( with_packages, without_packages, skip_default_packages) def uninstall(self, # pylint: disable=arguments-differ name, requirements=None): if PlatformFactory.newPlatform( name, requirements).uninstall_packages(): return PackageManager.uninstall(self, name, requirements) return False def update(self, name, version): raise NotImplementedError() def is_outdated(self, name, version): raise NotImplementedError() def get_installed_boards(self): boards = [] for manifest in self.get_installed(): p = PlatformFactory.newPlatform(manifest['_manifest_path']) for id_, config in p.get_boards().items(): manifest = config.get_manifest().copy() manifest['id'] = id_ manifest['platform'] = p.get_name() boards.append(manifest) return boards @staticmethod @util.memoized def get_registered_boards(): boards = util.get_api_result("/boards") for item in boards: # @TODO remove type from API item['id'] = item['type'] del item['type'] return boards class PlatformFactory(object): @staticmethod def get_clsname(name): return "%s%sPlatform" % (name.upper()[0], name.lower()[1:]) @staticmethod def load_module(name, path): module = None try: module = load_source( "platformio.managers.platform.%s" % name, path) except ImportError: raise exception.UnknownPlatform(name) return module @classmethod def newPlatform(cls, name, requirements=None): platform_dir = None if name.endswith("platform.json") and isfile(name): platform_dir = dirname(name) name = util.load_json(name)['name'] else: _manifest = PlatformManager().find_best_platform( name, requirements) if _manifest: platform_dir = dirname(_manifest['_manifest_path']) if not platform_dir: raise exception.UnknownPlatform( name if not requirements else "%s@%s" % (name, requirements)) platform_cls = None if isfile(join(platform_dir, "platform.py")): platform_cls = getattr( cls.load_module(name, join(platform_dir, "platform.py")), cls.get_clsname(name) ) else: platform_cls = type( str(cls.get_clsname(name)), (BasePlatform,), {}) _instance = platform_cls(join(platform_dir, "platform.json")) assert isinstance(_instance, BasePlatform) return _instance class PlatformPackagesMixin(object): def get_installed_packages(self): items = {} installed = self.pm.get_installed() for name, opts in self.get_packages().items(): manifest = None for p in installed: if (p['name'] != name or not semantic_version.match( opts['version'], p['version'])): continue elif (not manifest or semantic_version.compare( p['version'], manifest['version']) == 1): manifest = p if manifest: items[name] = manifest return items def install_packages(self, with_packages=None, without_packages=None, skip_default_packages=False, silent=False): with_packages = set( self.pkg_types_to_names(with_packages or [])) without_packages = set( self.pkg_types_to_names(without_packages or [])) upkgs = with_packages | without_packages ppkgs = set(self.get_packages().keys()) if not upkgs.issubset(ppkgs): raise exception.UnknownPackage(", ".join(upkgs - ppkgs)) for name, opts in self.get_packages().items(): if name in without_packages: continue elif (name in with_packages or not (skip_default_packages or opts.get("optional", False))): self.pm.install(name, opts.get("version"), silent=silent) return True def uninstall_packages(self): deppkgs = set() for manifest in PlatformManager().get_installed(): if manifest['name'] == self.get_name(): continue p = PlatformFactory.newPlatform( manifest['name'], manifest['version']) for pkgname, pkgmanifest in p.get_installed_packages().items(): deppkgs.add((pkgname, pkgmanifest['version'])) for manifest in self.pm.get_installed(): if manifest['name'] not in self.get_packages().keys(): continue if (manifest['name'], manifest['version']) not in deppkgs: self.pm.uninstall(manifest['name'], manifest['version']) return True def update_packages(self): outdated = None for pkgname, pkgmanifest in self.get_installed_packages().items(): requirements = self.get_packages()[pkgname]['version'] latest_version = self.pm.get_latest_version( pkgname, requirements) if (not latest_version or pkgmanifest['version'] == latest_version): continue # check other platforms keep_versions = set([latest_version]) for pfmanifest in PlatformManager().get_installed(): if pfmanifest['name'] == self.get_name(): continue p = PlatformFactory.newPlatform( pfmanifest['name'], pfmanifest['version']) if pkgname not in p.get_packages(): continue keep_versions.add(p.pm.get_latest_version( pkgname, p.get_packages()[pkgname]['version'])) outdated = self.pm.update( pkgname, requirements, keep_versions) return outdated def are_outdated_packages(self): for name, opts in self.get_installed_packages().items(): if (opts['version'] != self.pm.get_latest_version( name, self.get_packages()[name].get("version"))): return True return False class BasePlatform(PlatformPackagesMixin): _BOARDS_CACHE = {} LINE_ERROR_RE = re.compile(r"(\s+error|error[:\s]+)", re.I) def __init__(self, manifest_path): self._BOARDS_CACHE = {} self.manifest_path = manifest_path self.manifest = util.load_json(manifest_path) self.pm = PackageManager( repositories=self.manifest.get("packageRepositories")) self._found_error = False self._last_echo_line = None # 1 = errors # 2 = 1 + warnings # 3 = 2 + others self._verbose_level = 3 def get_name(self): return self.manifest['name'] def get_title(self): return self.manifest['title'] def get_description(self): return self.manifest['description'] def get_version(self): return self.manifest['version'] def get_manifest(self): return self.manifest def get_dir(self): return dirname(self.manifest_path) def get_build_script(self): main_script = join(self.get_dir(), "builder", "main.py") if isfile(main_script): return main_script raise NotImplementedError() def is_embedded(self): for opts in self.get_packages().values(): if opts.get("type") == "uploader": return True return False def get_boards(self, id_=None): if id_ is None: boards_dir = join(self.get_dir(), "boards") if not isdir(boards_dir): return {} for item in sorted(os.listdir(boards_dir)): _id = item[:-5] if _id in self._BOARDS_CACHE: continue self._BOARDS_CACHE[_id] = PlatformBoardConfig( join(self.get_dir(), "boards", item) ) else: if id_ not in self._BOARDS_CACHE: self._BOARDS_CACHE[id_] = PlatformBoardConfig( join(self.get_dir(), "boards", "%s.json" % id_) ) return self._BOARDS_CACHE[id_] if id_ else self._BOARDS_CACHE def board_config(self, id_): return self.get_boards(id_) def get_packages(self): return self.manifest.get("packages", {}) def get_frameworks(self): return self.get_manifest().get("frameworks") def get_package_dir(self, name): packages = self.get_installed_packages() if name not in packages: return None return dirname(packages[name]['_manifest_path']) def get_package_version(self, name): packages = self.get_installed_packages() if name not in packages: return None return packages[name]['version'] def get_package_type(self, name): return self.get_packages()[name].get("type") def pkg_types_to_names(self, types): names = [] for type_ in types: name = type_ # lookup by package types for _name, _opts in self.get_packages().items(): if _opts.get("type") == type_: name = None names.append(_name) # if type is the right name if name: names.append(name) return names def configure_default_packages(self, variables, targets): # enbale used frameworks frameworks = self.get_frameworks() for framework in variables.get("framework", "").split(","): framework = framework.lower().strip() if not framework or framework not in frameworks: continue _pkg_name = frameworks[framework]['package'] self.get_packages()[_pkg_name]['optional'] = False # append SCons tool self.get_packages()['tool-scons'] = { "version": self.manifest.get("engines", {}).get( "scons", ">=2.3.0,<2.5.0"), "optional": False } # enable upload tools for upload targets if any(["upload" in t for t in targets] + ["program" in targets]): for _name, _opts in self.get_packages().iteritems(): if _opts.get("type") == "uploader": self.get_packages()[_name]['optional'] = False elif "uploadlazy" in targets: # skip all packages, allow only upload tools self.get_packages()[_name]['optional'] = True def run(self, variables, targets, verbose): assert isinstance(variables, dict) assert isinstance(targets, list) self.configure_default_packages(variables, targets) self.install_packages(silent=True) self._verbose_level = int(verbose) if "clean" in targets: targets.remove("clean") targets.append("-c") variables['platform_manifest'] = self.manifest_path if "build_script" not in variables: variables['build_script'] = self.get_build_script() if not isfile(variables['build_script']): raise exception.BuildScriptNotFound(variables['build_script']) self._found_error = False result = self._run_scons(variables, targets) assert "returncode" in result # if self._found_error: # result['returncode'] = 1 if self._last_echo_line == ".": click.echo("") return result def _run_scons(self, variables, targets): # pass current PYTHONPATH to SCons if "PYTHONPATH" in os.environ: _PYTHONPATH = os.environ.get("PYTHONPATH").split(os.pathsep) else: _PYTHONPATH = [] for p in os.sys.path: if p not in _PYTHONPATH: _PYTHONPATH.append(p) os.environ['PYTHONPATH'] = os.pathsep.join(_PYTHONPATH) cmd = [ os.path.normpath(sys.executable), join(util.get_home_dir(), "packages", "tool-scons", "script", "scons"), "-Q", "-j %d" % self.get_job_nums(), "--warn=no-no-parallel-support", "-f", join(util.get_source_dir(), "builder", "main.py") ] + targets # encode and append variables for key, value in variables.items(): cmd.append("%s=%s" % (key.upper(), base64.b64encode(value))) result = util.exec_command( cmd, stdout=util.AsyncPipe(self.on_run_out), stderr=util.AsyncPipe(self.on_run_err) ) return result def on_run_out(self, line): self._echo_line(line, level=3) def on_run_err(self, line): is_error = self.LINE_ERROR_RE.search(line) is not None if is_error: self._found_error = True self._echo_line(line, level=1 if is_error else 2) def _echo_line(self, line, level): assert 1 <= level <= 3 fg = ("red", "yellow", None)[level - 1] if level == 3 and "is up to date" in line: fg = "green" if level > self._verbose_level: click.secho(".", fg=fg, err=level < 3, nl=False) self._last_echo_line = "." return if self._last_echo_line == ".": click.echo("") self._last_echo_line = line click.secho(line, fg=fg, err=level < 3) @staticmethod def get_job_nums(): try: return cpu_count() except NotImplementedError: return 1 class PlatformBoardConfig(object): def __init__(self, manifest_path): if not isfile(manifest_path): raise exception.UnknownBoard(basename(manifest_path[:-5])) self.manifest_path = manifest_path self.manifest = util.load_json(manifest_path) def get(self, path, default=None): try: value = self.manifest for k in path.split("."): value = value[k] return value except KeyError: if default is not None: return default else: raise KeyError("Invalid board option '%s'" % path) def __contains__(self, key): try: self.get(key) return True except KeyError: return False def get_manifest(self): return self.manifest