Refactor base package manager; Full VCS support as external package item

This commit is contained in:
Ivan Kravets
2016-07-18 01:38:35 +03:00
parent 7b3a235bd7
commit f10202c00b
5 changed files with 479 additions and 355 deletions

View File

@ -14,7 +14,7 @@
import sys
VERSION = (3, 0, "0.dev8")
VERSION = (3, 0, "0.dev9")
__version__ = ".".join([str(s) for s in VERSION])
__title__ = "platformio"

View File

@ -13,6 +13,7 @@
# limitations under the License.
import json
from os.path import basename
import click
@ -79,7 +80,10 @@ def platform_install(platforms, with_package, without_package,
for platform in platforms:
_platform = platform
_version = None
if "@" in platform:
if any([s in platform for s in ("\\", "/")]):
_platform = basename(platform)
_version = platform
elif "@" in platform:
_platform, _version = platform.rsplit("@", 1)
if PlatformManager().install(_platform, _version, with_package,
without_package, skip_default_package):

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from os.path import dirname, isdir, isfile, islink, join
from shutil import copyfile, copytree, rmtree
@ -27,327 +28,6 @@ from platformio.unpacker import FileUnpacker
from platformio.vcsclient import VCSClientFactory
class BasePkgManager(object):
_INSTALLED_CACHE = {}
def __init__(self, package_dir, repositories=None):
self._INSTALLED_CACHE = {}
self.repositories = repositories
self.package_dir = package_dir
if not isdir(self.package_dir):
os.makedirs(self.package_dir)
assert isdir(self.package_dir)
@staticmethod
def reset_cache():
BasePkgManager._INSTALLED_CACHE = {}
@property
def manifest_name(self):
raise NotImplementedError()
@staticmethod
def download(url, dest_dir, sha1=None):
fd = FileDownloader(url, dest_dir)
fd.start()
if sha1:
fd.verify(sha1)
return fd.get_filepath()
@staticmethod
def unpack(source_path, dest_dir):
fu = FileUnpacker(source_path, dest_dir)
return fu.start()
def check_structure(self, pkg_dir):
if isfile(join(pkg_dir, self.manifest_name)):
return True
for root, _, files in os.walk(pkg_dir):
if self.manifest_name not in files:
continue
# copy contents to the root of package directory
for item in os.listdir(root):
item_path = join(root, item)
if isfile(item_path):
copyfile(item_path, join(pkg_dir, item))
elif isdir(item_path):
copytree(item_path, join(pkg_dir, item), symlinks=True)
# remove not used contents
while True:
rmtree(root)
root = dirname(root)
if root == pkg_dir:
break
break
if isfile(join(pkg_dir, self.manifest_name)):
return True
raise exception.PlatformioException(
"Could not find '%s' manifest file in the package" %
self.manifest_name)
@staticmethod
def max_satisfying_repo_version(versions, requirements=None):
item = None
systype = util.get_systype()
if requirements is not None:
requirements = str(requirements)
for v in versions:
if isinstance(v['version'], int):
continue
if v['system'] not in ("all", "*") and systype not in v['system']:
continue
if requirements and not semantic_version.match(
requirements, v['version']):
continue
if item is None or semantic_version.compare(
v['version'], item['version']) == 1:
item = v
return item
def get_latest_repo_version(self, name, requirements):
version = None
for versions in PackageRepoIterator(name, self.repositories):
pkgdata = self.max_satisfying_repo_version(versions, requirements)
if not pkgdata:
continue
if (not version or semantic_version.compare(
pkgdata['version'], version) == 1):
version = pkgdata['version']
return version
def max_satisfying_version(self, name, requirements=None):
best = None
for manifest in self.get_installed():
if manifest['name'] != name:
continue
elif requirements and not semantic_version.match(
requirements, manifest['version']):
continue
elif (not best or semantic_version.compare(
manifest['version'], best['version']) == 1):
best = manifest
return best
def get_installed(self):
if self.package_dir in BasePkgManager._INSTALLED_CACHE:
return BasePkgManager._INSTALLED_CACHE[self.package_dir]
items = []
for p in sorted(os.listdir(self.package_dir)):
manifest_path = join(self.package_dir, p, self.manifest_name)
if not isfile(manifest_path):
continue
manifest = util.load_json(manifest_path)
manifest['_manifest_path'] = manifest_path
assert set(["name", "version"]) <= set(manifest.keys())
items.append(manifest)
BasePkgManager._INSTALLED_CACHE[self.package_dir] = items
return items
def is_installed(self, name, requirements=None):
installed = self.get_installed()
if requirements is None:
return any([p['name'] == name for p in installed])
for p in installed:
if p['name'] != name:
continue
elif semantic_version.match(requirements, p['version']):
return True
return None
def install(self, name, requirements, silent=False, trigger_event=True):
installed = self.is_installed(name, requirements)
if not installed or not silent:
click.echo("Installing %s %s @ %s:" % (
self.manifest_name.split(".")[0],
click.style(name, fg="cyan"),
requirements if requirements else "latest"))
if installed:
if not silent:
click.secho("Already installed", fg="yellow")
return self.max_satisfying_version(
name, requirements).get("_manifest_path")
if "://" in name:
pkg_dir = self._install_from_url(name, requirements)
else:
pkg_dir = self._install_from_piorepo(name, requirements)
if not pkg_dir or not isfile(join(pkg_dir, self.manifest_name)):
raise exception.PackageInstallError(
name, requirements or "latest", util.get_systype())
self.reset_cache()
if trigger_event:
telemetry.on_event(
category=self.__class__.__name__,
action="Install", label=name)
return join(pkg_dir, self.manifest_name)
def _install_from_piorepo(self, name, requirements):
pkg_dir = None
pkgdata = None
versions = None
for versions in PackageRepoIterator(name, self.repositories):
pkgdata = self.max_satisfying_repo_version(versions, requirements)
if not pkgdata:
continue
try:
pkg_dir = self._install_from_url(
pkgdata['url'], requirements, pkgdata.get("sha1"))
break
except Exception as e: # pylint: disable=broad-except
click.secho("Warning! Package Mirror: %s" % e, fg="yellow")
click.secho("Looking for another mirror...", fg="yellow")
if versions is None:
raise exception.UnknownPackage(name)
elif not pkgdata:
if "platform" in self.manifest_name:
raise exception.UndefinedPlatformVersion(
name, requirements or "latest")
else:
raise exception.UndefinedPackageVersion(
name, requirements or "latest", util.get_systype())
return pkg_dir
def _install_from_url(self, url, requirements=None, sha1=None):
pkg_dir = None
tmp_dir = mkdtemp("-package", "installing-", self.package_dir)
# Handle GitHub URL (https://github.com/user/repo.git)
if url.endswith(".git") and not url.startswith("git"):
url = "git+" + url
try:
if url.startswith("file://"):
rmtree(tmp_dir)
copytree(url[7:], tmp_dir)
elif url.startswith(("http://", "https://", "ftp://")):
dlpath = self.download(url, tmp_dir, sha1)
assert isfile(dlpath)
self.unpack(dlpath, tmp_dir)
os.remove(dlpath)
else:
repo = VCSClientFactory.newClient(url)
repo.export(tmp_dir)
self.check_structure(tmp_dir)
pkg_dir = self._install_from_tmp_dir(tmp_dir, requirements)
finally:
if isdir(tmp_dir):
rmtree(tmp_dir)
return pkg_dir
def _install_from_tmp_dir(self, tmp_dir, requirements=None):
tmpmanifest = util.load_json(join(tmp_dir, self.manifest_name))
assert set(["name", "version"]) <= set(tmpmanifest.keys())
name = tmpmanifest['name']
# package should satisfy requirements
if requirements:
assert semantic_version.match(requirements, tmpmanifest['version'])
pkg_dir = join(self.package_dir, name)
if isfile(join(pkg_dir, self.manifest_name)):
manifest = util.load_json(join(pkg_dir, self.manifest_name))
cmp_result = semantic_version.compare(
tmpmanifest['version'], manifest['version'])
if cmp_result == 1:
# if main package version < new package, backup it
os.rename(pkg_dir, join(
self.package_dir, "%s@%s" % (name, manifest['version'])))
elif cmp_result == -1:
pkg_dir = join(
self.package_dir, "%s@%s" % (name, tmpmanifest['version']))
# remove previous/not-satisfied package
if isdir(pkg_dir):
rmtree(pkg_dir)
os.rename(tmp_dir, pkg_dir)
assert isdir(pkg_dir)
return pkg_dir
def uninstall(self, name, requirements=None, trigger_event=True):
click.echo("Uninstalling %s %s @ %s: \t" % (
self.manifest_name.split(".")[0],
click.style(name, fg="cyan"),
requirements if requirements else "latest"), nl=False)
found = False
for manifest in self.get_installed():
if manifest['name'] != name:
continue
if (requirements and not semantic_version.match(
requirements, manifest['version'])):
continue
found = True
if isfile(manifest['_manifest_path']):
pkg_dir = dirname(manifest['_manifest_path'])
if islink(pkg_dir):
os.unlink(pkg_dir)
else:
rmtree(pkg_dir)
if not found:
click.secho("Not installed", fg="yellow")
return False
else:
click.echo("[%s]" % click.style("OK", fg="green"))
self.reset_cache()
if trigger_event:
telemetry.on_event(
category=self.__class__.__name__,
action="Uninstall", label=name)
def update(self, name, requirements=None):
click.echo("Updating %s %s @ %s:" % (
self.manifest_name.split(".")[0],
click.style(name, fg="yellow"),
requirements if requirements else "latest"))
latest_version = self.get_latest_repo_version(name, requirements)
if latest_version is None:
click.secho("Ignored! '%s' is not listed in repository" % name,
fg="yellow")
return
current = None
for manifest in self.get_installed():
if manifest['name'] != name:
continue
if (requirements and not semantic_version.match(
requirements, manifest['version'])):
continue
if (not current or semantic_version.compare(
manifest['version'], current['version']) == 1):
current = manifest
if current is None:
return
current_version = current['version']
click.echo("Versions: Current=%s, Latest=%s \t " %
(current_version, latest_version), nl=False)
if current_version == latest_version:
click.echo("[%s]" % (click.style("Up-to-date", fg="green")))
return True
else:
click.echo("[%s]" % (click.style("Out-of-date", fg="red")))
self.install(name, latest_version, trigger_event=False)
telemetry.on_event(
category=self.__class__.__name__,
action="Update", label=name)
return True
class PackageRepoIterator(object):
_MANIFEST_CACHE = {}
@ -389,6 +69,391 @@ class PackageRepoIterator(object):
return self.next()
class PkgRepoMixin(object):
@staticmethod
def max_satisfying_repo_version(versions, requirements=None):
item = None
systype = util.get_systype()
if requirements is not None:
requirements = str(requirements)
for v in versions:
if isinstance(v['version'], int):
continue
if v['system'] not in ("all", "*") and systype not in v['system']:
continue
if requirements and not semantic_version.match(
requirements, v['version']):
continue
if item is None or semantic_version.compare(
v['version'], item['version']) == 1:
item = v
return item
def get_latest_repo_version(self, name, requirements):
version = None
for versions in PackageRepoIterator(name, self.repositories):
pkgdata = self.max_satisfying_repo_version(versions, requirements)
if not pkgdata:
continue
if (not version or semantic_version.compare(
pkgdata['version'], version) == 1):
version = pkgdata['version']
return version
class PkgInstallerMixin(object):
VCS_MANIFEST_NAME = ".piopkgmanager.json"
def get_manifest_path(self, pkg_dir):
if not isdir(pkg_dir):
return None
manifest_path = join(pkg_dir, self.manifest_name)
if isfile(manifest_path):
return manifest_path
for item in os.listdir(pkg_dir):
if not isdir(join(pkg_dir, item)):
continue
if isfile(join(pkg_dir, item, self.VCS_MANIFEST_NAME)):
return join(pkg_dir, item, self.VCS_MANIFEST_NAME)
return None
def manifest_exists(self, pkg_dir):
return self.get_manifest_path(pkg_dir) is not None
def load_manifest(self, pkg_dir):
manifest_path = self.get_manifest_path(pkg_dir)
if manifest_path:
manifest = util.load_json(manifest_path)
manifest['_manifest_path'] = manifest_path
manifest['__pkg_dir'] = pkg_dir
return manifest
return None
def _install_from_piorepo(self, name, requirements):
pkg_dir = None
pkgdata = None
versions = None
for versions in PackageRepoIterator(name, self.repositories):
pkgdata = self.max_satisfying_repo_version(versions, requirements)
if not pkgdata:
continue
try:
pkg_dir = self._install_from_url(
name, pkgdata['url'], requirements, pkgdata.get("sha1"))
break
except Exception as e: # pylint: disable=broad-except
click.secho("Warning! Package Mirror: %s" % e, fg="yellow")
click.secho("Looking for another mirror...", fg="yellow")
if versions is None:
raise exception.UnknownPackage(name)
elif not pkgdata:
if "platform" in self.manifest_name:
raise exception.UndefinedPlatformVersion(
name, requirements or "latest")
else:
raise exception.UndefinedPackageVersion(
name, requirements or "latest", util.get_systype())
return pkg_dir
def _install_from_url(self, name, url, requirements=None, sha1=None):
pkg_dir = None
tmp_dir = mkdtemp("-package", "installing-", self.package_dir)
# Handle GitHub URL (https://github.com/user/repo.git)
if url.endswith(".git") and not url.startswith("git"):
url = "git+" + url
try:
if url.startswith("file://"):
url = url[7:]
if isfile(url):
self.unpack(url, tmp_dir)
else:
rmtree(tmp_dir)
copytree(url, tmp_dir)
elif url.startswith(("http://", "https://", "ftp://")):
dlpath = self.download(url, tmp_dir, sha1)
assert isfile(dlpath)
self.unpack(dlpath, tmp_dir)
os.remove(dlpath)
else:
vcs = VCSClientFactory.newClient(tmp_dir, url)
assert vcs.export()
with open(join(vcs.storage_dir,
self.VCS_MANIFEST_NAME), "w") as fp:
json.dump({
"name": name,
"version": "0.0.0+rev%s" % vcs.get_latest_revision(),
"url": url}, fp)
self._check_pkg_structure(tmp_dir)
pkg_dir = self._install_from_tmp_dir(tmp_dir, requirements)
finally:
if isdir(tmp_dir):
rmtree(tmp_dir)
return pkg_dir
def _check_pkg_structure(self, pkg_dir):
if self.manifest_exists(pkg_dir):
return True
for root, _, _ in os.walk(pkg_dir):
if not self.manifest_exists(root):
continue
# copy contents to the root of package directory
for item in os.listdir(root):
item_path = join(root, item)
if isfile(item_path):
copyfile(item_path, join(pkg_dir, item))
elif isdir(item_path):
copytree(item_path, join(pkg_dir, item), symlinks=True)
# remove not used contents
while True:
rmtree(root)
root = dirname(root)
if root == pkg_dir:
break
break
if self.manifest_exists(pkg_dir):
return True
raise exception.PlatformioException(
"Could not find '%s' manifest file in the package" %
self.manifest_name)
def _install_from_tmp_dir(self, tmp_dir, requirements=None):
tmpmanifest = self.load_manifest(tmp_dir)
assert set(["name", "version"]) <= set(tmpmanifest.keys())
name = tmpmanifest['name']
pkg_dir = join(self.package_dir, name)
# package should satisfy requirements
if requirements:
assert semantic_version.match(
requirements, tmpmanifest['version'])
if self.manifest_exists(pkg_dir):
manifest = self.load_manifest(pkg_dir)
cmp_result = semantic_version.compare(
tmpmanifest['version'], manifest['version'])
if cmp_result == 1:
# if main package version < new package, backup it
os.rename(pkg_dir, join(
self.package_dir, "%s@%s" % (name, manifest['version'])))
elif cmp_result == -1:
pkg_dir = join(
self.package_dir, "%s@%s" % (name, tmpmanifest['version']))
# remove previous/not-satisfied package
if isdir(pkg_dir):
rmtree(pkg_dir)
os.rename(tmp_dir, pkg_dir)
assert isdir(pkg_dir)
return pkg_dir
class BasePkgManager(PkgRepoMixin, PkgInstallerMixin):
_INSTALLED_CACHE = {}
def __init__(self, package_dir, repositories=None):
self._INSTALLED_CACHE = {}
self.repositories = repositories
self.package_dir = package_dir
if not isdir(self.package_dir):
os.makedirs(self.package_dir)
assert isdir(self.package_dir)
@property
def manifest_name(self):
raise NotImplementedError()
@staticmethod
def reset_cache():
BasePkgManager._INSTALLED_CACHE = {}
@staticmethod
def download(url, dest_dir, sha1=None):
fd = FileDownloader(url, dest_dir)
fd.start()
if sha1:
fd.verify(sha1)
return fd.get_filepath()
@staticmethod
def unpack(source_path, dest_dir):
fu = FileUnpacker(source_path, dest_dir)
return fu.start()
def print_message(self, message, nl=True):
click.echo("%s: %s" % (self.__class__.__name__, message), nl=nl)
def get_installed(self):
if self.package_dir in BasePkgManager._INSTALLED_CACHE:
return BasePkgManager._INSTALLED_CACHE[self.package_dir]
items = []
for p in sorted(os.listdir(self.package_dir)):
manifest = self.load_manifest(join(self.package_dir, p))
if not manifest:
continue
assert set(["name", "version"]) <= set(manifest.keys())
items.append(manifest)
BasePkgManager._INSTALLED_CACHE[self.package_dir] = items
return items
def is_installed(self, name, requirements=None):
installed = self.get_installed()
reqspec = None
if requirements:
try:
reqspec = semantic_version.Spec(requirements)
except ValueError:
pass
if not reqspec:
return any([p['name'] == name for p in installed])
for p in installed:
if p['name'] != name:
continue
elif reqspec.match(semantic_version.Version(p['version'])):
return True
return None
def max_installed_version(self, name, requirements=None):
best = None
reqspec = None
if requirements:
try:
reqspec = semantic_version.Spec(requirements)
except ValueError:
pass
for manifest in self.get_installed():
if manifest['name'] != name:
continue
elif reqspec and not reqspec.match(
semantic_version.Version(manifest['version'])):
continue
elif (not best or semantic_version.compare(
manifest['version'], best['version']) == 1):
best = manifest
if best:
return best.get("__pkg_dir")
return None
def install(self, name, requirements, silent=False, trigger_event=True):
installed = self.is_installed(name, requirements)
if not installed or not silent:
self.print_message("Installing %s @ %s:" % (
click.style(name, fg="cyan"),
requirements if requirements else "latest"))
if installed:
if not silent:
click.secho("Already installed", fg="yellow")
return self.max_installed_version(
name, requirements)
if (requirements and any([s in requirements for s in ("\\", "/")]) and
"://" not in requirements and (
isfile(requirements) or isdir(requirements))):
requirements = "file://" + requirements
if requirements and "://" in requirements:
pkg_dir = self._install_from_url(name, requirements)
else:
pkg_dir = self._install_from_piorepo(name, requirements)
if not pkg_dir or not self.manifest_exists(pkg_dir):
raise exception.PackageInstallError(
name, requirements or "latest", util.get_systype())
self.reset_cache()
if trigger_event:
telemetry.on_event(
category=self.__class__.__name__,
action="Install", label=name)
return pkg_dir
def uninstall(self, name, requirements=None, trigger_event=True):
self.print_message("Uninstalling %s @ %s: \t" % (
click.style(name, fg="cyan"),
requirements if requirements else "latest"), nl=False)
found = False
for manifest in self.get_installed():
if manifest['name'] != name:
continue
if (requirements and not semantic_version.match(
requirements, manifest['version'])):
continue
found = True
if isdir(manifest['__pkg_dir']):
if islink(manifest['__pkg_dir']):
os.unlink(manifest['__pkg_dir'])
else:
rmtree(manifest['__pkg_dir'])
if not found:
click.secho("Not installed", fg="yellow")
return False
else:
click.echo("[%s]" % click.style("OK", fg="green"))
self.reset_cache()
if trigger_event:
telemetry.on_event(
category=self.__class__.__name__,
action="Uninstall", label=name)
def update(self, name, requirements=None):
self.print_message("Updating %s @ %s:" % (
click.style(name, fg="yellow"),
requirements if requirements else "latest"))
latest_version = self.get_latest_repo_version(name, requirements)
if latest_version is None:
click.secho(
"Ignored! '%s' is not listed in registry" % name,
fg="yellow")
return
current = None
for manifest in self.get_installed():
if manifest['name'] != name:
continue
if (requirements and not semantic_version.match(
requirements, manifest['version'])):
continue
if (not current or semantic_version.compare(
manifest['version'], current['version']) == 1):
current = manifest
if current is None:
return
current_version = current['version']
click.echo("Versions: Current=%s, Latest=%s \t " %
(current_version, latest_version), nl=False)
if current_version == latest_version:
click.echo("[%s]" % (click.style("Up-to-date", fg="green")))
return True
else:
click.echo("[%s]" % (click.style("Out-of-date", fg="red")))
self.install(name, latest_version, trigger_event=False)
telemetry.on_event(
category=self.__class__.__name__,
action="Update", label=name)
return True
class PackageManager(BasePkgManager):
@property

View File

@ -45,8 +45,8 @@ class PlatformManager(BasePkgManager):
def install(self, # pylint: disable=too-many-arguments,arguments-differ
name, requirements=None, with_packages=None,
without_packages=None, skip_default_packages=False):
manifest_path = BasePkgManager.install(self, name, requirements)
p = PlatformFactory.newPlatform(manifest_path, requirements)
platform_dir = BasePkgManager.install(self, name, requirements)
p = PlatformFactory.newPlatform(self.get_manifest_path(platform_dir))
p.install_packages(
with_packages, without_packages, skip_default_packages)
self.cleanup_packages(p.packages.keys())
@ -99,7 +99,8 @@ class PlatformManager(BasePkgManager):
def get_installed_boards(self):
boards = []
for manifest in self.get_installed():
p = PlatformFactory.newPlatform(manifest['_manifest_path'])
p = PlatformFactory.newPlatform(
self.get_manifest_path(manifest['__pkg_dir']))
for config in p.get_boards().values():
boards.append(config.get_brief_data())
return boards
@ -138,10 +139,8 @@ class PlatformFactory(object):
platform_dir = dirname(name)
name = util.load_json(name)['name']
else:
_manifest = PlatformManager().max_satisfying_version(
platform_dir = PlatformManager().max_installed_version(
name, requirements)
if _manifest:
platform_dir = dirname(_manifest['_manifest_path'])
if not platform_dir:
raise exception.UnknownPlatform(
@ -169,9 +168,17 @@ class PlatformPackagesMixin(object):
installed = self.pm.get_installed()
for name, opts in self.packages.items():
manifest = None
reqspec = None
try:
reqspec = semantic_version.Spec(opts['version'])
except ValueError:
pass
for p in installed:
if (p['name'] != name or not semantic_version.match(
opts['version'], p['version'])):
if p['name'] != name:
continue
if reqspec and not reqspec.match(
semantic_version.Version(p['version'])):
continue
elif (not manifest or semantic_version.compare(
p['version'], manifest['version']) == 1):
@ -413,7 +420,7 @@ class PlatformBase(PlatformPackagesMixin, PlatformRunMixin):
packages = self.get_installed_packages()
if name not in packages:
return None
return dirname(packages[name]['_manifest_path'])
return packages[name]['__pkg_dir']
def get_package_version(self, name):
packages = self.get_installed_packages()

View File

@ -12,25 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from os import listdir
from os.path import isdir, join
from platform import system
from subprocess import check_call
from sys import modules
from urlparse import urlsplit, urlunsplit
from platformio import util
from platformio.exception import PlatformioException
class VCSClientFactory(object):
@staticmethod
def newClient(url):
scheme, netloc, path, query, fragment = urlsplit(url)
type_ = scheme
if "+" in type_:
type_, scheme = type_.split("+", 1)
url = urlunsplit((scheme, netloc, path, query, None))
clsname = "%sClient" % type_.title()
obj = getattr(modules[__name__], clsname)(url, fragment)
def newClient(src_dir, remote_url=None, branch=None):
clsnametpl = "%sClient"
vcscls = None
type_ = None
if remote_url:
scheme, netloc, path, query, branch = urlsplit(remote_url)
type_ = scheme
if "+" in type_:
type_, scheme = type_.split("+", 1)
remote_url = urlunsplit((scheme, netloc, path, query, None))
vcscls = getattr(modules[__name__], clsnametpl % type_.title())
elif isdir(src_dir):
for item in listdir(src_dir):
if not isdir(join(src_dir, item)) or not item.startswith("."):
continue
try:
vcscls = getattr(
modules[__name__], clsnametpl % item[1:].title())
except AttributeError:
pass
assert vcscls
obj = vcscls(src_dir, remote_url, branch)
assert isinstance(obj, VCSClientBase)
return obj
@ -39,59 +56,90 @@ class VCSClientBase(object):
command = None
def __init__(self, url, branch=None):
self.url = url
def __init__(self, src_dir, remote_url=None, branch=None):
self.src_dir = src_dir
self.remote_url = remote_url
self.branch = branch
self.check_client()
def check_client(self):
try:
assert self.command
assert self.run_cmd(["--version"]) == 0
assert self.run_cmd(["--version"])
except (AssertionError, OSError):
raise PlatformioException(
"VCS: `%s` client is not installed in your system" %
self.command)
return True
def export(self, dst_dir):
@property
def storage_dir(self):
return join(self.src_dir, "." + self.command)
def export(self):
raise NotImplementedError
def run_cmd(self, args):
return check_call([self.command] + args, shell=system() == "Windows")
def get_latest_revision(self):
raise NotImplementedError
def run_cmd(self, args, **kwargs):
args = [self.command] + args
kwargs['shell'] = system() == "Windows"
return check_call(args, **kwargs) == 0
def get_cmd_output(self, args, **kwargs):
args = [self.command] + args
result = util.exec_command(args, **kwargs)
if result['returncode'] == 0:
return result['out']
raise PlatformioException(
"VCS: Could not receive an output from `%s` command (%s)" % (
args, result))
class GitClient(VCSClientBase):
command = "git"
def export(self, dst_dir):
def export(self):
args = ["clone", "--recursive", "--depth", "1"]
if self.branch:
args.extend(["--branch", self.branch])
args.extend([self.url, dst_dir])
self.run_cmd(args)
args.extend([self.remote_url, self.src_dir])
return self.run_cmd(args)
def get_latest_revision(self):
return self.get_cmd_output(["rev-parse", "--short", "HEAD"],
cwd=self.src_dir).strip()
class HgClient(VCSClientBase):
command = "hg"
def export(self, dst_dir):
def export(self):
args = ["clone"]
if self.branch:
args.extend(["--updaterev", self.branch])
args.extend([self.url, dst_dir])
self.run_cmd(args)
args.extend([self.remote_url, self.src_dir])
return self.run_cmd(args)
def get_latest_revision(self):
return self.get_cmd_output(["identify", "--id"],
cwd=self.src_dir).strip()
class SvnClient(VCSClientBase):
command = "svn"
def export(self, dst_dir):
def export(self):
args = ["export", "--force"]
if self.branch:
args.extend(["--revision", self.branch])
args.extend([self.url, dst_dir])
self.run_cmd(args)
args.extend([self.remote_url, self.src_dir])
return self.run_cmd(args)
def get_latest_revision(self):
return self.get_cmd_output(["info", "-r", "HEAD"],
cwd=self.src_dir).strip()