Initial version of a new package manager

This commit is contained in:
Ivan Kravets
2020-07-31 15:42:26 +03:00
parent abc0489ac6
commit d329aef876
27 changed files with 2074 additions and 407 deletions

View File

@ -25,8 +25,9 @@ from time import time
import requests
from platformio import __version__, exception, fs, lockfile, proc
from platformio import __version__, exception, fs, proc
from platformio.compat import WINDOWS, dump_json_to_unicode, hashlib_encode_data
from platformio.package.lockfile import LockFile
from platformio.project.helpers import (
get_default_projects_dir,
get_project_cache_dir,
@ -125,7 +126,7 @@ class State(object):
def _lock_state_file(self):
if not self.lock:
return
self._lockfile = lockfile.LockFile(self.path)
self._lockfile = LockFile(self.path)
try:
self._lockfile.acquire()
except IOError:
@ -143,6 +144,9 @@ class State(object):
def as_dict(self):
return self._storage
def keys(self):
return self._storage.keys()
def get(self, key, default=True):
return self._storage.get(key, default)
@ -187,7 +191,7 @@ class ContentCache(object):
def _lock_dbindex(self):
if not self.cache_dir:
os.makedirs(self.cache_dir)
self._lockfile = lockfile.LockFile(self.cache_dir)
self._lockfile = LockFile(self.cache_dir)
try:
self._lockfile.acquire()
except: # pylint: disable=bare-except

View File

@ -16,7 +16,7 @@ import os
import time
from platformio import __accounts_api__, app
from platformio.clients.rest import RESTClient
from platformio.clients.http import HTTPClient
from platformio.exception import PlatformioException
@ -35,7 +35,7 @@ class AccountAlreadyAuthorized(AccountError):
MESSAGE = "You are already authorized with {0} account."
class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
class AccountClient(HTTPClient): # pylint:disable=too-many-public-methods
SUMMARY_CACHE_TTL = 60 * 60 * 24 * 7
@ -67,7 +67,7 @@ class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
token = self.fetch_authentication_token()
headers["Authorization"] = "Bearer %s" % token
kwargs["headers"] = headers
return self.send_request(*args, **kwargs)
return self.request_json_data(*args, **kwargs)
def login(self, username, password):
try:
@ -79,11 +79,11 @@ class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
app.get_state_item("account", {}).get("email", "")
)
result = self.send_request(
data = self.request_json_data(
"post", "/v1/login", data={"username": username, "password": password},
)
app.set_state_item("account", result)
return result
app.set_state_item("account", data)
return data
def login_with_code(self, client_id, code, redirect_uri):
try:
@ -95,7 +95,7 @@ class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
app.get_state_item("account", {}).get("email", "")
)
result = self.send_request(
result = self.request_json_data(
"post",
"/v1/login/code",
data={"client_id": client_id, "code": code, "redirect_uri": redirect_uri},
@ -107,7 +107,7 @@ class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
refresh_token = self.get_refresh_token()
self.delete_local_session()
try:
self.send_request(
self.request_json_data(
"post", "/v1/logout", data={"refresh_token": refresh_token},
)
except AccountError:
@ -133,7 +133,7 @@ class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
app.get_state_item("account", {}).get("email", "")
)
return self.send_request(
return self.request_json_data(
"post",
"/v1/registration",
data={
@ -153,7 +153,9 @@ class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
).get("auth_token")
def forgot_password(self, username):
return self.send_request("post", "/v1/forgot", data={"username": username},)
return self.request_json_data(
"post", "/v1/forgot", data={"username": username},
)
def get_profile(self):
return self.send_auth_request("get", "/v1/profile",)
@ -276,15 +278,15 @@ class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
return auth.get("access_token")
if auth.get("refresh_token"):
try:
result = self.send_request(
data = self.request_json_data(
"post",
"/v1/login",
headers={
"Authorization": "Bearer %s" % auth.get("refresh_token")
},
)
app.set_state_item("account", result)
return result.get("auth").get("access_token")
app.set_state_item("account", data)
return data.get("auth").get("access_token")
except AccountError:
self.delete_local_session()
raise AccountNotAuthorized()

View File

@ -19,11 +19,11 @@ from platformio import app, util
from platformio.exception import PlatformioException
class RESTClientError(PlatformioException):
class HTTPClientError(PlatformioException):
pass
class RESTClient(object):
class HTTPClient(object):
def __init__(self, base_url):
if base_url.endswith("/"):
base_url = base_url[:-1]
@ -33,19 +33,29 @@ class RESTClient(object):
retry = Retry(
total=5,
backoff_factor=1,
method_whitelist=list(Retry.DEFAULT_METHOD_WHITELIST) + ["POST"],
status_forcelist=[500, 502, 503, 504],
# method_whitelist=list(Retry.DEFAULT_METHOD_WHITELIST) + ["POST"],
status_forcelist=[413, 429, 500, 502, 503, 504],
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
self._session.mount(base_url, adapter)
def __del__(self):
if not self._session:
return
self._session.close()
self._session = None
def send_request(self, method, path, **kwargs):
# check internet before and resolve issue with 60 seconds timeout
# check Internet before and resolve issue with 60 seconds timeout
# print(self, method, path, kwargs)
util.internet_on(raise_exception=True)
try:
response = getattr(self._session, method)(self.base_url + path, **kwargs)
return getattr(self._session, method)(self.base_url + path, **kwargs)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
raise RESTClientError(e)
raise HTTPClientError(e)
def request_json_data(self, *args, **kwargs):
response = self.send_request(*args, **kwargs)
return self.raise_error_from_response(response)
@staticmethod
@ -59,4 +69,4 @@ class RESTClient(object):
message = response.json()["message"]
except (KeyError, ValueError):
message = response.text
raise RESTClientError(message)
raise HTTPClientError(message)

View File

@ -14,13 +14,18 @@
from platformio import __registry_api__, fs
from platformio.clients.account import AccountClient
from platformio.clients.rest import RESTClient
from platformio.package.spec import PackageType
from platformio.clients.http import HTTPClient
from platformio.package.meta import PackageType
try:
from urllib.parse import quote
except ImportError:
from urllib import quote
# pylint: disable=too-many-arguments
class RegistryClient(RESTClient):
class RegistryClient(HTTPClient):
def __init__(self):
super(RegistryClient, self).__init__(base_url=__registry_api__)
@ -30,7 +35,7 @@ class RegistryClient(RESTClient):
token = AccountClient().fetch_authentication_token()
headers["Authorization"] = "Bearer %s" % token
kwargs["headers"] = headers
return self.send_request(*args, **kwargs)
return self.request_json_data(*args, **kwargs)
def publish_package(
self, archive_path, owner=None, released_at=None, private=False, notify=True
@ -41,7 +46,7 @@ class RegistryClient(RESTClient):
account.get_account_info(offline=True).get("profile").get("username")
)
with open(archive_path, "rb") as fp:
response = self.send_auth_request(
return self.send_auth_request(
"post",
"/v3/packages/%s/%s" % (owner, PackageType.from_archive(archive_path)),
params={
@ -57,7 +62,6 @@ class RegistryClient(RESTClient):
},
data=fp,
)
return response
def unpublish_package( # pylint: disable=redefined-builtin
self, type, name, owner=None, version=None, undo=False
@ -70,10 +74,9 @@ class RegistryClient(RESTClient):
path = "/v3/packages/%s/%s/%s" % (owner, type, name)
if version:
path += "/" + version
response = self.send_auth_request(
return self.send_auth_request(
"delete", path, params={"undo": 1 if undo else 0},
)
return response
def update_resource(self, urn, private):
return self.send_auth_request(
@ -96,3 +99,40 @@ class RegistryClient(RESTClient):
return self.send_auth_request(
"get", "/v3/resources", params={"owner": owner} if owner else None
)
def list_packages(self, query=None, filters=None, page=None):
assert query or filters
search_query = []
if filters:
valid_filters = (
"authors",
"keywords",
"frameworks",
"platforms",
"headers",
"ids",
"names",
"owners",
"types",
)
assert set(filters.keys()) <= set(valid_filters)
for name, values in filters.items():
for value in set(
values if isinstance(values, (list, tuple)) else [values]
):
search_query.append("%s:%s" % (name[:-1], value))
if query:
search_query.append(query)
params = dict(query=quote(" ".join(search_query)))
if page:
params["page"] = int(page)
return self.request_json_data("get", "/v3/packages", params=params)
def get_package(self, type_, owner, name, version=None):
return self.request_json_data(
"get",
"/v3/packages/{owner}/{type}/{name}".format(
type=type_, owner=owner, name=quote(name)
),
params=dict(version=version) if version else None,
)

View File

@ -18,8 +18,8 @@ from datetime import datetime
import click
from platformio.clients.registry import RegistryClient
from platformio.package.meta import PackageSpec, PackageType
from platformio.package.pack import PackagePacker
from platformio.package.spec import PackageSpec, PackageType
def validate_datetime(ctx, param, value): # pylint: disable=unused-argument
@ -106,7 +106,7 @@ def package_unpublish(package, type, undo): # pylint: disable=redefined-builtin
response = RegistryClient().unpublish_package(
type=type,
name=spec.name,
owner=spec.ownername,
owner=spec.owner,
version=spec.requirements,
undo=undo,
)

View File

@ -119,39 +119,6 @@ class PackageInstallError(PlatformIOPackageException):
)
class ExtractArchiveItemError(PlatformIOPackageException):
MESSAGE = (
"Could not extract `{0}` to `{1}`. Try to disable antivirus "
"tool or check this solution -> http://bit.ly/faq-package-manager"
)
class UnsupportedArchiveType(PlatformIOPackageException):
MESSAGE = "Can not unpack file '{0}'"
class FDUnrecognizedStatusCode(PlatformIOPackageException):
MESSAGE = "Got an unrecognized status code '{0}' when downloaded {1}"
class FDSizeMismatch(PlatformIOPackageException):
MESSAGE = (
"The size ({0:d} bytes) of downloaded file '{1}' "
"is not equal to remote size ({2:d} bytes)"
)
class FDSHASumMismatch(PlatformIOPackageException):
MESSAGE = (
"The 'sha1' sum '{0}' of downloaded file '{1}' is not equal to remote '{2}'"
)
#
# Library
#

View File

@ -26,12 +26,12 @@ import semantic_version
from platformio import __version__, app, exception, fs, util
from platformio.compat import hashlib_encode_data
from platformio.downloader import FileDownloader
from platformio.lockfile import LockFile
from platformio.package.download import FileDownloader
from platformio.package.exception import ManifestException
from platformio.package.lockfile import LockFile
from platformio.package.manifest.parser import ManifestParserFactory
from platformio.unpacker import FileUnpacker
from platformio.vcsclient import VCSClientFactory
from platformio.package.unpack import FileUnpacker
from platformio.package.vcsclient import VCSClientFactory
# pylint: disable=too-many-arguments, too-many-return-statements

View File

@ -23,11 +23,7 @@ import click
import requests
from platformio import app, fs, util
from platformio.exception import (
FDSHASumMismatch,
FDSizeMismatch,
FDUnrecognizedStatusCode,
)
from platformio.package.exception import PackageException
class FileDownloader(object):
@ -41,7 +37,11 @@ class FileDownloader(object):
verify=sys.version_info >= (2, 7, 9),
)
if self._request.status_code != 200:
raise FDUnrecognizedStatusCode(self._request.status_code, url)
raise PackageException(
"Got the unrecognized status code '{0}' when downloaded {1}".format(
self._request.status_code, url
)
)
disposition = self._request.headers.get("content-disposition")
if disposition and "filename=" in disposition:
@ -74,21 +74,21 @@ class FileDownloader(object):
def start(self, with_progress=True, silent=False):
label = "Downloading"
itercontent = self._request.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE)
f = open(self._destination, "wb")
fp = open(self._destination, "wb")
try:
if not with_progress or self.get_size() == -1:
if not silent:
click.echo("%s..." % label)
for chunk in itercontent:
if chunk:
f.write(chunk)
fp.write(chunk)
else:
chunks = int(math.ceil(self.get_size() / float(io.DEFAULT_BUFFER_SIZE)))
with click.progressbar(length=chunks, label=label) as pb:
for _ in pb:
f.write(next(itercontent))
fp.write(next(itercontent))
finally:
f.close()
fp.close()
self._request.close()
if self.get_lmtime():
@ -96,15 +96,40 @@ class FileDownloader(object):
return True
def verify(self, sha1=None):
def verify(self, checksum=None):
_dlsize = getsize(self._destination)
if self.get_size() != -1 and _dlsize != self.get_size():
raise FDSizeMismatch(_dlsize, self._fname, self.get_size())
if not sha1:
return None
checksum = fs.calculate_file_hashsum("sha1", self._destination)
if sha1.lower() != checksum.lower():
raise FDSHASumMismatch(checksum, self._fname, sha1)
raise PackageException(
(
"The size ({0:d} bytes) of downloaded file '{1}' "
"is not equal to remote size ({2:d} bytes)"
).format(_dlsize, self._fname, self.get_size())
)
if not checksum:
return True
checksum_len = len(checksum)
hash_algo = None
if checksum_len == 32:
hash_algo = "md5"
elif checksum_len == 40:
hash_algo = "sha1"
elif checksum_len == 64:
hash_algo = "sha256"
if not hash_algo:
raise PackageException(
"Could not determine checksum algorithm by %s" % checksum
)
dl_checksum = fs.calculate_file_hashsum(hash_algo, self._destination)
if checksum.lower() != dl_checksum.lower():
raise PackageException(
"The checksum '{0}' of the downloaded file '{1}' "
"does not match to the remote '{2}'".format(
dl_checksum, self._fname, checksum
)
)
return True
def _preserve_filemtime(self, lmdate):

View File

@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.exception import PlatformioException
from platformio import util
from platformio.exception import PlatformioException, UserSideException
class PackageException(PlatformioException):
@ -44,3 +45,16 @@ class ManifestValidationError(ManifestException):
"https://docs.platformio.org/page/librarymanager/config.html"
% self.messages
)
class MissingPackageManifestError(ManifestException):
MESSAGE = "Could not find one of '{0}' manifest files in the package"
class UnknownPackageError(UserSideException):
MESSAGE = (
"Could not find a package with '{0}' requirements for your system '%s'"
% util.get_systype()
)

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,95 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import os
import tempfile
import time
import click
from platformio import app, compat
from platformio.package.download import FileDownloader
from platformio.package.lockfile import LockFile
class PackageManagerDownloadMixin(object):
DOWNLOAD_CACHE_EXPIRE = 86400 * 30 # keep package in a local cache for 1 month
def compute_download_path(self, *args):
request_hash = hashlib.new("sha256")
for arg in args:
request_hash.update(compat.hashlib_encode_data(arg))
dl_path = os.path.join(self.get_download_dir(), request_hash.hexdigest())
return dl_path
def get_download_usagedb_path(self):
return os.path.join(self.get_download_dir(), "usage.db")
def set_download_utime(self, path, utime=None):
with app.State(self.get_download_usagedb_path(), lock=True) as state:
state[os.path.basename(path)] = int(time.time() if not utime else utime)
def cleanup_expired_downloads(self):
with app.State(self.get_download_usagedb_path(), lock=True) as state:
# remove outdated
for fname in list(state.keys()):
if state[fname] > (time.time() - self.DOWNLOAD_CACHE_EXPIRE):
continue
del state[fname]
dl_path = os.path.join(self.get_download_dir(), fname)
if os.path.isfile(dl_path):
os.remove(dl_path)
def download(self, url, checksum=None, silent=False):
dl_path = self.compute_download_path(url, checksum or "")
if os.path.isfile(dl_path):
self.set_download_utime(dl_path)
return dl_path
with_progress = not silent and not app.is_disabled_progressbar()
tmp_path = tempfile.mkstemp(dir=self.get_download_dir())[1]
try:
with LockFile(dl_path):
try:
fd = FileDownloader(url)
fd.set_destination(tmp_path)
fd.start(with_progress=with_progress, silent=silent)
except IOError as e:
raise_error = not with_progress
if with_progress:
try:
fd = FileDownloader(url)
fd.set_destination(tmp_path)
fd.start(with_progress=False, silent=silent)
except IOError:
raise_error = True
if raise_error:
click.secho(
"Error: Please read http://bit.ly/package-manager-ioerror",
fg="red",
err=True,
)
raise e
if checksum:
fd.verify(checksum)
os.rename(tmp_path, dl_path)
finally:
if os.path.isfile(tmp_path):
os.remove(tmp_path)
assert os.path.isfile(dl_path)
self.set_download_utime(dl_path)
return dl_path

View File

@ -0,0 +1,282 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import os
import shutil
import tempfile
import click
from platformio import app, compat, fs, util
from platformio.package.exception import PackageException, UnknownPackageError
from platformio.package.lockfile import LockFile
from platformio.package.meta import PackageSourceItem, PackageSpec
from platformio.package.unpack import FileUnpacker
from platformio.package.vcsclient import VCSClientFactory
class PackageManagerInstallMixin(object):
INSTALL_HISTORY = None # avoid circle dependencies
@staticmethod
def unpack(src, dst):
with_progress = not app.is_disabled_progressbar()
try:
with FileUnpacker(src) as fu:
return fu.unpack(dst, with_progress=with_progress)
except IOError as e:
if not with_progress:
raise e
with FileUnpacker(src) as fu:
return fu.unpack(dst, with_progress=False)
def install(self, spec, silent=False):
with LockFile(self.package_dir):
pkg = self._install(spec, silent=silent)
self.memcache_reset()
self.cleanup_expired_downloads()
return pkg
def _install(self, spec, search_filters=None, silent=False):
spec = self.ensure_spec(spec)
# avoid circle dependencies
if not self.INSTALL_HISTORY:
self.INSTALL_HISTORY = []
if spec in self.INSTALL_HISTORY:
return None
self.INSTALL_HISTORY.append(spec)
# check if package is already installed
pkg = self.get_package(spec)
if pkg:
if not silent:
click.secho(
"{name} @ {version} is already installed".format(
**pkg.metadata.as_dict()
),
fg="yellow",
)
return pkg
if not silent:
msg = "Installing %s" % click.style(spec.humanize(), fg="cyan")
self.print_message(msg)
if spec.url:
pkg = self.install_from_url(spec.url, spec, silent=silent)
else:
pkg = self.install_from_registry(spec, search_filters, silent=silent)
if not pkg or not pkg.metadata:
raise PackageException(
"Could not install package '%s' for '%s' system"
% (spec.humanize(), util.get_systype())
)
if not silent:
self.print_message(
click.style(
"{name} @ {version} has been successfully installed!".format(
**pkg.metadata.as_dict()
),
fg="green",
)
)
self.memcache_reset()
self.install_dependencies(pkg, silent)
return pkg
def install_dependencies(self, pkg, silent=False):
assert isinstance(pkg, PackageSourceItem)
manifest = self.load_manifest(pkg)
if not manifest.get("dependencies"):
return
if not silent:
self.print_message(click.style("Installing dependencies...", fg="yellow"))
for dependency in manifest.get("dependencies"):
if not self.install_dependency(dependency, silent) and not silent:
click.secho(
"Warning! Could not install dependency %s for package '%s'"
% (dependency, pkg.metadata.name),
fg="yellow",
)
def install_dependency(self, dependency, silent=False):
spec = PackageSpec(
name=dependency.get("name"), requirements=dependency.get("version")
)
search_filters = {
key: value
for key, value in dependency.items()
if key in ("authors", "platforms", "frameworks")
}
return self._install(spec, search_filters=search_filters or None, silent=silent)
def install_from_url(self, url, spec, checksum=None, silent=False):
spec = self.ensure_spec(spec)
tmp_dir = tempfile.mkdtemp(prefix="pkg-installing-", dir=self.get_tmp_dir())
vcs = None
try:
if url.startswith("file://"):
_url = url[7:]
if os.path.isfile(_url):
self.unpack(_url, tmp_dir)
else:
fs.rmtree(tmp_dir)
shutil.copytree(_url, tmp_dir, symlinks=True)
elif url.startswith(("http://", "https://")):
dl_path = self.download(url, checksum, silent=silent)
assert os.path.isfile(dl_path)
self.unpack(dl_path, tmp_dir)
else:
vcs = VCSClientFactory.newClient(tmp_dir, url)
assert vcs.export()
root_dir = self.find_pkg_root(tmp_dir, spec)
pkg_item = PackageSourceItem(
root_dir,
self.build_metadata(
root_dir, spec, vcs.get_current_revision() if vcs else None
),
)
pkg_item.dump_meta()
return self._install_tmp_pkg(pkg_item)
finally:
if os.path.isdir(tmp_dir):
fs.rmtree(tmp_dir)
def _install_tmp_pkg(self, tmp_pkg):
assert isinstance(tmp_pkg, PackageSourceItem)
# validate package version and declared requirements
if (
tmp_pkg.metadata.spec.requirements
and tmp_pkg.metadata.version not in tmp_pkg.metadata.spec.requirements
):
raise PackageException(
"Package version %s doesn't satisfy requirements %s based on %s"
% (
tmp_pkg.metadata.version,
tmp_pkg.metadata.spec.requirements,
tmp_pkg.metadata,
)
)
dst_pkg = PackageSourceItem(
os.path.join(self.package_dir, tmp_pkg.get_safe_dirname())
)
# what to do with existing package?
action = "overwrite"
if dst_pkg.metadata and dst_pkg.metadata.spec.url:
if dst_pkg.metadata.spec.url != tmp_pkg.metadata.spec.url:
action = "detach-existing"
elif tmp_pkg.metadata.spec.url:
action = "detach-new"
elif dst_pkg.metadata and dst_pkg.metadata.version != tmp_pkg.metadata.version:
action = (
"detach-existing"
if tmp_pkg.metadata.version > dst_pkg.metadata.version
else "detach-new"
)
def _cleanup_dir(path):
if os.path.isdir(path):
fs.rmtree(path)
if action == "detach-existing":
target_dirname = "%s@%s" % (
tmp_pkg.get_safe_dirname(),
dst_pkg.metadata.version,
)
if dst_pkg.metadata.spec.url:
target_dirname = "%s@src-%s" % (
tmp_pkg.get_safe_dirname(),
hashlib.md5(
compat.hashlib_encode_data(dst_pkg.metadata.spec.url)
).hexdigest(),
)
# move existing into the new place
pkg_dir = os.path.join(self.package_dir, target_dirname)
_cleanup_dir(pkg_dir)
shutil.move(dst_pkg.path, pkg_dir)
# move new source to the destination location
_cleanup_dir(dst_pkg.path)
shutil.move(tmp_pkg.path, dst_pkg.path)
return PackageSourceItem(dst_pkg.path)
if action == "detach-new":
target_dirname = "%s@%s" % (
tmp_pkg.get_safe_dirname(),
tmp_pkg.metadata.version,
)
if tmp_pkg.metadata.spec.url:
target_dirname = "%s@src-%s" % (
tmp_pkg.get_safe_dirname(),
hashlib.md5(
compat.hashlib_encode_data(tmp_pkg.metadata.spec.url)
).hexdigest(),
)
pkg_dir = os.path.join(self.package_dir, target_dirname)
_cleanup_dir(pkg_dir)
shutil.move(tmp_pkg.path, pkg_dir)
return PackageSourceItem(pkg_dir)
# otherwise, overwrite existing
_cleanup_dir(dst_pkg.path)
shutil.move(tmp_pkg.path, dst_pkg.path)
return PackageSourceItem(dst_pkg.path)
def uninstall(self, path_or_spec, silent=False):
with LockFile(self.package_dir):
pkg = (
PackageSourceItem(path_or_spec)
if os.path.isdir(path_or_spec)
else self.get_package(path_or_spec)
)
if not pkg or not pkg.metadata:
raise UnknownPackageError(path_or_spec)
if not silent:
self.print_message(
"Uninstalling %s @ %s: \t"
% (click.style(pkg.metadata.name, fg="cyan"), pkg.metadata.version),
nl=False,
)
if os.path.islink(pkg.path):
os.unlink(pkg.path)
else:
fs.rmtree(pkg.path)
self.memcache_reset()
# unfix detached-package with the same name
detached_pkg = self.get_package(PackageSpec(name=pkg.metadata.name))
if (
detached_pkg
and "@" in detached_pkg.path
and not os.path.isdir(
os.path.join(self.package_dir, detached_pkg.get_safe_dirname())
)
):
shutil.move(
detached_pkg.path,
os.path.join(self.package_dir, detached_pkg.get_safe_dirname()),
)
self.memcache_reset()
if not silent:
click.echo("[%s]" % click.style("OK", fg="green"))
return True

View File

@ -0,0 +1,190 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import click
from platformio.clients.http import HTTPClient
from platformio.clients.registry import RegistryClient
from platformio.package.exception import UnknownPackageError
from platformio.package.meta import PackageMetaData, PackageSpec
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
class RegistryFileMirrorsIterator(object):
HTTP_CLIENT_INSTANCES = {}
def __init__(self, download_url):
self.download_url = download_url
self._url_parts = urlparse(download_url)
self._base_url = "%s://%s" % (self._url_parts.scheme, self._url_parts.netloc)
self._visited_mirrors = []
def __iter__(self):
return self
def __next__(self):
http = self.get_http_client()
response = http.send_request(
"head",
self._url_parts.path,
allow_redirects=False,
params=dict(bypass=",".join(self._visited_mirrors))
if self._visited_mirrors
else None,
)
stop_conditions = [
response.status_code not in (302, 307),
not response.headers.get("Location"),
not response.headers.get("X-PIO-Mirror"),
response.headers.get("X-PIO-Mirror") in self._visited_mirrors,
]
if any(stop_conditions):
raise StopIteration
self._visited_mirrors.append(response.headers.get("X-PIO-Mirror"))
return (
response.headers.get("Location"),
response.headers.get("X-PIO-Content-SHA256"),
)
def get_http_client(self):
if self._base_url not in RegistryFileMirrorsIterator.HTTP_CLIENT_INSTANCES:
RegistryFileMirrorsIterator.HTTP_CLIENT_INSTANCES[
self._base_url
] = HTTPClient(self._base_url)
return RegistryFileMirrorsIterator.HTTP_CLIENT_INSTANCES[self._base_url]
class PackageManageRegistryMixin(object):
def install_from_registry(self, spec, search_filters=None, silent=False):
packages = self.search_registry_packages(spec, search_filters)
if not packages:
raise UnknownPackageError(spec.humanize())
if len(packages) > 1 and not silent:
self.print_multi_package_issue(packages, spec)
package, version = self.find_best_registry_version(packages, spec)
pkgfile = self._pick_compatible_pkg_file(version["files"]) if version else None
if not pkgfile:
raise UnknownPackageError(spec.humanize())
for url, checksum in RegistryFileMirrorsIterator(pkgfile["download_url"]):
try:
return self.install_from_url(
url,
PackageSpec(
owner=package["owner"]["username"],
id=package["id"],
name=package["name"],
),
checksum or pkgfile["checksum"]["sha256"],
silent=silent,
)
except Exception as e: # pylint: disable=broad-except
click.secho("Warning! Package Mirror: %s" % e, fg="yellow")
click.secho("Looking for another mirror...", fg="yellow")
return None
def get_registry_client_instance(self):
if not self._registry_client:
self._registry_client = RegistryClient()
return self._registry_client
def search_registry_packages(self, spec, filters=None):
filters = filters or {}
if spec.id:
filters["ids"] = str(spec.id)
else:
filters["types"] = self.pkg_type
filters["names"] = '"%s"' % spec.name.lower()
if spec.owner:
filters["owners"] = spec.owner.lower()
return self.get_registry_client_instance().list_packages(filters=filters)[
"items"
]
def fetch_registry_package_versions(self, owner, name):
return self.get_registry_client_instance().get_package(
self.pkg_type, owner, name
)["versions"]
@staticmethod
def print_multi_package_issue(packages, spec):
click.secho(
"Warning! More than one package has been found by ", fg="yellow", nl=False
)
click.secho(spec.humanize(), fg="cyan", nl=False)
click.secho(" requirements:", fg="yellow")
for item in packages:
click.echo(
" - {owner}/{name} @ {version}".format(
owner=click.style(item["owner"]["username"], fg="cyan"),
name=item["name"],
version=item["version"]["name"],
)
)
click.secho(
"Please specify detailed REQUIREMENTS using package owner and version "
"(showed above) to avoid project compatibility issues.",
fg="yellow",
)
def find_best_registry_version(self, packages, spec):
# find compatible version within the latest package versions
for package in packages:
version = self._pick_best_pkg_version([package["version"]], spec)
if version:
return (package, version)
if not spec.requirements:
return None
# if the custom version requirements, check ALL package versions
for package in packages:
version = self._pick_best_pkg_version(
self.fetch_registry_package_versions(
package["owner"]["username"], package["name"]
),
spec,
)
if version:
return (package, version)
time.sleep(1)
return None
def _pick_best_pkg_version(self, versions, spec):
best = None
for version in versions:
semver = PackageMetaData.to_semver(version["name"])
if spec.requirements and semver not in spec.requirements:
continue
if not any(
self.is_system_compatible(f.get("system")) for f in version["files"]
):
continue
if not best or (semver > PackageMetaData.to_semver(best["name"])):
best = version
return best
def _pick_compatible_pkg_file(self, version_files):
for item in version_files:
if self.is_system_compatible(item.get("system")):
return item
return None

View File

@ -0,0 +1,233 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import datetime
import click
import semantic_version
from platformio import fs, util
from platformio.commands import PlatformioCLI
from platformio.package.exception import ManifestException, MissingPackageManifestError
from platformio.package.manager._download import PackageManagerDownloadMixin
from platformio.package.manager._install import PackageManagerInstallMixin
from platformio.package.manager._registry import PackageManageRegistryMixin
from platformio.package.manifest.parser import ManifestParserFactory
from platformio.package.meta import (
PackageMetaData,
PackageSourceItem,
PackageSpec,
PackageType,
)
from platformio.project.helpers import get_project_cache_dir
class BasePackageManager(
PackageManagerDownloadMixin, PackageManageRegistryMixin, PackageManagerInstallMixin
):
MEMORY_CACHE = {}
def __init__(self, pkg_type, package_dir):
self.pkg_type = pkg_type
self.package_dir = self.ensure_dir_exists(package_dir)
self.MEMORY_CACHE = {}
self._download_dir = None
self._tmp_dir = None
self._registry_client = None
def memcache_get(self, key, default=None):
return self.MEMORY_CACHE.get(key, default)
def memcache_set(self, key, value):
self.MEMORY_CACHE[key] = value
def memcache_reset(self):
self.MEMORY_CACHE.clear()
@staticmethod
def is_system_compatible(value):
if not value or "*" in value:
return True
return util.items_in_list(value, util.get_systype())
@staticmethod
def generate_rand_version():
return datetime.now().strftime("0.0.0+%Y%m%d%H%M%S")
@staticmethod
def ensure_dir_exists(path):
if not os.path.isdir(path):
os.makedirs(path)
assert os.path.isdir(path)
return path
@staticmethod
def ensure_spec(spec):
return spec if isinstance(spec, PackageSpec) else PackageSpec(spec)
@property
def manifest_names(self):
raise NotImplementedError
def print_message(self, message, nl=True):
click.echo("%s: %s" % (self.__class__.__name__, message), nl=nl)
def get_download_dir(self):
if not self._download_dir:
self._download_dir = self.ensure_dir_exists(
os.path.join(get_project_cache_dir(), "downloads")
)
return self._download_dir
def get_tmp_dir(self):
if not self._tmp_dir:
self._tmp_dir = self.ensure_dir_exists(
os.path.join(get_project_cache_dir(), "tmp")
)
return self._tmp_dir
def find_pkg_root(self, path, spec): # pylint: disable=unused-argument
if self.manifest_exists(path):
return path
for root, _, _ in os.walk(path):
if self.manifest_exists(root):
return root
raise MissingPackageManifestError(", ".join(self.manifest_names))
def get_manifest_path(self, pkg_dir):
if not os.path.isdir(pkg_dir):
return None
for name in self.manifest_names:
manifest_path = os.path.join(pkg_dir, name)
if os.path.isfile(manifest_path):
return manifest_path
return None
def manifest_exists(self, pkg_dir):
return self.get_manifest_path(pkg_dir)
def load_manifest(self, src):
path = src.path if isinstance(src, PackageSourceItem) else src
cache_key = "load_manifest-%s" % path
result = self.memcache_get(cache_key)
if result:
return result
candidates = (
[os.path.join(path, name) for name in self.manifest_names]
if os.path.isdir(path)
else [path]
)
for item in candidates:
if not os.path.isfile(item):
continue
try:
result = ManifestParserFactory.new_from_file(item).as_dict()
self.memcache_set(cache_key, result)
return result
except ManifestException as e:
if not PlatformioCLI.in_silence():
click.secho(str(e), fg="yellow")
raise MissingPackageManifestError(", ".join(self.manifest_names))
def build_legacy_spec(self, pkg_dir):
# find src manifest
src_manifest_name = ".piopkgmanager.json"
src_manifest_path = None
for name in os.listdir(pkg_dir):
if not os.path.isfile(os.path.join(pkg_dir, name, src_manifest_name)):
continue
src_manifest_path = os.path.join(pkg_dir, name, src_manifest_name)
break
if src_manifest_path:
src_manifest = fs.load_json(src_manifest_path)
return PackageSpec(
name=src_manifest.get("name"),
url=src_manifest.get("url"),
requirements=src_manifest.get("requirements"),
)
# fall back to a package manifest
manifest = self.load_manifest(pkg_dir)
return PackageSpec(name=manifest.get("name"))
def build_metadata(self, pkg_dir, spec, vcs_revision=None):
manifest = self.load_manifest(pkg_dir)
metadata = PackageMetaData(
type=self.pkg_type,
name=manifest.get("name"),
version=manifest.get("version"),
spec=spec,
)
if not metadata.name or spec.is_custom_name():
metadata.name = spec.name
if vcs_revision:
metadata.version = "%s+sha.%s" % (
metadata.version if metadata.version else "0.0.0",
vcs_revision,
)
if not metadata.version:
metadata.version = self.generate_rand_version()
return metadata
def get_installed(self):
result = []
for name in os.listdir(self.package_dir):
pkg_dir = os.path.join(self.package_dir, name)
if not os.path.isdir(pkg_dir):
continue
pkg = PackageSourceItem(pkg_dir)
if not pkg.metadata:
try:
spec = self.build_legacy_spec(pkg_dir)
pkg.metadata = self.build_metadata(pkg_dir, spec)
except MissingPackageManifestError:
pass
if pkg.metadata:
result.append(pkg)
return result
def get_package(self, spec):
def _ci_strings_are_equal(a, b):
if a == b:
return True
if not a or not b:
return False
return a.strip().lower() == b.strip().lower()
spec = self.ensure_spec(spec)
best = None
for pkg in self.get_installed():
skip_conditions = [
spec.owner
and not _ci_strings_are_equal(spec.owner, pkg.metadata.spec.owner),
spec.url and spec.url != pkg.metadata.spec.url,
spec.id and spec.id != pkg.metadata.spec.id,
not spec.id
and not spec.url
and not _ci_strings_are_equal(spec.name, pkg.metadata.name),
]
if any(skip_conditions):
continue
if self.pkg_type == PackageType.TOOL:
# TODO: check "system" for pkg
pass
assert isinstance(pkg.metadata.version, semantic_version.Version)
if spec.requirements and pkg.metadata.version not in spec.requirements:
continue
if not best or (pkg.metadata.version > best.metadata.version):
best = pkg
return best

View File

@ -0,0 +1,64 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from platformio.package.exception import MissingPackageManifestError
from platformio.package.manager.base import BasePackageManager
from platformio.package.meta import PackageSpec, PackageType
from platformio.project.helpers import get_project_global_lib_dir
class LibraryPackageManager(BasePackageManager):
def __init__(self, package_dir=None):
super(LibraryPackageManager, self).__init__(
PackageType.LIBRARY, package_dir or get_project_global_lib_dir()
)
@property
def manifest_names(self):
return PackageType.get_manifest_map()[PackageType.LIBRARY]
def find_pkg_root(self, path, spec):
try:
return super(LibraryPackageManager, self).find_pkg_root(path, spec)
except MissingPackageManifestError:
pass
assert isinstance(spec, PackageSpec)
root_dir = self.find_library_root(path)
# automatically generate library manifest
with open(os.path.join(root_dir, "library.json"), "w") as fp:
json.dump(
dict(name=spec.name, version=self.generate_rand_version(),),
fp,
indent=2,
)
return root_dir
@staticmethod
def find_library_root(path):
for root, dirs, files in os.walk(path):
if not files and len(dirs) == 1:
continue
for fname in files:
if not fname.endswith((".c", ".cpp", ".h", ".S")):
continue
if os.path.isdir(os.path.join(os.path.dirname(root), "src")):
return os.path.dirname(root)
return root
return path

View File

@ -0,0 +1,30 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.package.manager.base import BasePackageManager
from platformio.package.meta import PackageType
from platformio.project.config import ProjectConfig
class PlatformPackageManager(BasePackageManager):
def __init__(self, package_dir=None):
self.config = ProjectConfig.get_instance()
super(PlatformPackageManager, self).__init__(
PackageType.PLATFORM,
package_dir or self.config.get_optional_dir("platforms"),
)
@property
def manifest_names(self):
return PackageType.get_manifest_map()[PackageType.PLATFORM]

View File

@ -0,0 +1,29 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.package.manager.base import BasePackageManager
from platformio.package.meta import PackageType
from platformio.project.config import ProjectConfig
class ToolPackageManager(BasePackageManager):
def __init__(self, package_dir=None):
self.config = ProjectConfig.get_instance()
super(ToolPackageManager, self).__init__(
PackageType.TOOL, package_dir or self.config.get_optional_dir("packages"),
)
@property
def manifest_names(self):
return PackageType.get_manifest_map()[PackageType.TOOL]

382
platformio/package/meta.py Normal file
View File

@ -0,0 +1,382 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import re
import tarfile
import semantic_version
from platformio.compat import get_object_members, string_types
from platformio.package.manifest.parser import ManifestFileType
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
class PackageType(object):
LIBRARY = "library"
PLATFORM = "platform"
TOOL = "tool"
@classmethod
def items(cls):
return get_object_members(cls)
@classmethod
def get_manifest_map(cls):
return {
cls.PLATFORM: (ManifestFileType.PLATFORM_JSON,),
cls.LIBRARY: (
ManifestFileType.LIBRARY_JSON,
ManifestFileType.LIBRARY_PROPERTIES,
ManifestFileType.MODULE_JSON,
),
cls.TOOL: (ManifestFileType.PACKAGE_JSON,),
}
@classmethod
def from_archive(cls, path):
assert path.endswith("tar.gz")
manifest_map = cls.get_manifest_map()
with tarfile.open(path, mode="r:gz") as tf:
for t in sorted(cls.items().values()):
for manifest in manifest_map[t]:
try:
if tf.getmember(manifest):
return t
except KeyError:
pass
return None
class PackageSpec(object):
def __init__( # pylint: disable=redefined-builtin,too-many-arguments
self, raw=None, owner=None, id=None, name=None, requirements=None, url=None
):
self.owner = owner
self.id = id
self.name = name
self._requirements = None
self.url = url
if requirements:
self.requirements = requirements
self._name_is_custom = False
self._parse(raw)
def __eq__(self, other):
return all(
[
self.owner == other.owner,
self.id == other.id,
self.name == other.name,
self.requirements == other.requirements,
self.url == other.url,
]
)
def __repr__(self):
return (
"PackageSpec <owner={owner} id={id} name={name} "
"requirements={requirements} url={url}>".format(**self.as_dict())
)
@property
def requirements(self):
return self._requirements
@requirements.setter
def requirements(self, value):
if not value:
self._requirements = None
return
self._requirements = (
value
if isinstance(value, semantic_version.SimpleSpec)
else semantic_version.SimpleSpec(value)
)
def humanize(self):
if self.url:
result = self.url
elif self.id:
result = "id:%d" % self.id
else:
result = ""
if self.owner:
result = self.owner + "/"
result += self.name
if self.requirements:
result += " @ " + str(self.requirements)
return result
def is_custom_name(self):
return self._name_is_custom
def as_dict(self):
return dict(
owner=self.owner,
id=self.id,
name=self.name,
requirements=str(self.requirements) if self.requirements else None,
url=self.url,
)
def _parse(self, raw):
if raw is None:
return
if not isinstance(raw, string_types):
raw = str(raw)
raw = raw.strip()
parsers = (
self._parse_requirements,
self._parse_custom_name,
self._parse_id,
self._parse_owner,
self._parse_url,
)
for parser in parsers:
if raw is None:
break
raw = parser(raw)
# if name is not custom, parse it from URL
if not self.name and self.url:
self.name = self._parse_name_from_url(self.url)
elif raw:
# the leftover is a package name
self.name = raw
def _parse_requirements(self, raw):
if "@" not in raw:
return raw
tokens = raw.rsplit("@", 1)
if any(s in tokens[1] for s in (":", "/")):
return raw
self.requirements = tokens[1].strip()
return tokens[0].strip()
def _parse_custom_name(self, raw):
if "=" not in raw or raw.startswith("id="):
return raw
tokens = raw.split("=", 1)
if "/" in tokens[0]:
return raw
self.name = tokens[0].strip()
self._name_is_custom = True
return tokens[1].strip()
def _parse_id(self, raw):
if raw.isdigit():
self.id = int(raw)
return None
if raw.startswith("id="):
return self._parse_id(raw[3:])
return raw
def _parse_owner(self, raw):
if raw.count("/") != 1 or "@" in raw:
return raw
tokens = raw.split("/", 1)
self.owner = tokens[0].strip()
self.name = tokens[1].strip()
return None
def _parse_url(self, raw):
if not any(s in raw for s in ("@", ":", "/")):
return raw
self.url = raw.strip()
parts = urlparse(self.url)
# if local file or valid URL with scheme vcs+protocol://
if parts.scheme == "file" or "+" in parts.scheme or self.url.startswith("git+"):
return None
# parse VCS
git_conditions = [
parts.path.endswith(".git"),
# Handle GitHub URL (https://github.com/user/package)
parts.netloc in ("github.com", "gitlab.com", "bitbucket.com")
and not parts.path.endswith((".zip", ".tar.gz")),
]
hg_conditions = [
# Handle Developer Mbed URL
# (https://developer.mbed.org/users/user/code/package/)
# (https://os.mbed.com/users/user/code/package/)
parts.netloc
in ("mbed.com", "os.mbed.com", "developer.mbed.org")
]
if any(git_conditions):
self.url = "git+" + self.url
elif any(hg_conditions):
self.url = "hg+" + self.url
return None
@staticmethod
def _parse_name_from_url(url):
if url.endswith("/"):
url = url[:-1]
for c in ("#", "?"):
if c in url:
url = url[: url.index(c)]
# parse real repository name from Github
parts = urlparse(url)
if parts.netloc == "github.com" and parts.path.count("/") > 2:
return parts.path.split("/")[2]
name = os.path.basename(url)
if "." in name:
return name.split(".", 1)[0].strip()
return name
class PackageMetaData(object):
def __init__( # pylint: disable=redefined-builtin
self, type, name, version, spec=None
):
assert type in PackageType.items().values()
if spec:
assert isinstance(spec, PackageSpec)
self.type = type
self.name = name
self._version = None
self.version = version
self.spec = spec
def __repr__(self):
return (
"PackageMetaData <type={type} name={name} version={version} "
"spec={spec}".format(**self.as_dict())
)
def __eq__(self, other):
return all(
[
self.type == other.type,
self.name == other.name,
self.version == other.version,
self.spec == other.spec,
]
)
@property
def version(self):
return self._version
@version.setter
def version(self, value):
if not value:
self._version = None
return
self._version = (
value
if isinstance(value, semantic_version.Version)
else self.to_semver(value)
)
@staticmethod
def to_semver(value, force=True, raise_exception=False):
assert value
try:
return semantic_version.Version(value)
except ValueError:
pass
if force:
try:
return semantic_version.Version.coerce(value)
except ValueError:
pass
if raise_exception:
raise ValueError("Invalid SemVer version %s" % value)
# parse commit hash
if re.match(r"^[\da-f]+$", value, flags=re.I):
return semantic_version.Version("0.0.0+sha." + value)
return semantic_version.Version("0.0.0+" + value)
def as_dict(self):
return dict(
type=self.type,
name=self.name,
version=str(self.version),
spec=self.spec.as_dict() if self.spec else None,
)
def dump(self, path):
with open(path, "w") as fp:
return json.dump(self.as_dict(), fp)
@staticmethod
def load(path):
with open(path) as fp:
data = json.load(fp)
if data["spec"]:
data["spec"] = PackageSpec(**data["spec"])
return PackageMetaData(**data)
class PackageSourceItem(object):
METAFILE_NAME = ".piopm"
def __init__(self, path, metadata=None):
self.path = path
self.metadata = metadata
if not self.metadata and self.exists():
self.metadata = self.load_meta()
def __repr__(self):
return "PackageSourceItem <path={path} metadata={metadata}".format(
path=self.path, metadata=self.metadata
)
def __eq__(self, other):
return all([self.path == other.path, self.metadata == other.metadata])
def exists(self):
return os.path.isdir(self.path)
def get_safe_dirname(self):
assert self.metadata
return re.sub(r"[^\da-z\_\-\. ]", "_", self.metadata.name, flags=re.I)
def get_metafile_locations(self):
return [
os.path.join(self.path, ".git"),
os.path.join(self.path, ".hg"),
os.path.join(self.path, ".svn"),
self.path,
]
def load_meta(self):
assert self.exists()
for location in self.get_metafile_locations():
manifest_path = os.path.join(location, self.METAFILE_NAME)
if os.path.isfile(manifest_path):
return PackageMetaData.load(manifest_path)
return None
def dump_meta(self):
assert self.exists()
location = None
for location in self.get_metafile_locations():
if os.path.isdir(location):
break
assert location
return self.metadata.dump(os.path.join(location, self.METAFILE_NAME))

View File

@ -23,7 +23,8 @@ from platformio import fs
from platformio.package.exception import PackageException
from platformio.package.manifest.parser import ManifestFileType, ManifestParserFactory
from platformio.package.manifest.schema import ManifestSchema
from platformio.unpacker import FileUnpacker
from platformio.package.meta import PackageSourceItem
from platformio.package.unpack import FileUnpacker
class PackagePacker(object):
@ -36,6 +37,7 @@ class PackagePacker(object):
".svn/",
".pio/",
"**/.pio/",
PackageSourceItem.METAFILE_NAME,
]
INCLUDE_DEFAULT = ManifestFileType.items().values()

View File

@ -1,169 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tarfile
from platformio.compat import get_object_members, string_types
from platformio.package.manifest.parser import ManifestFileType
class PackageType(object):
LIBRARY = "library"
PLATFORM = "platform"
TOOL = "tool"
@classmethod
def items(cls):
return get_object_members(cls)
@classmethod
def get_manifest_map(cls):
return {
cls.PLATFORM: (ManifestFileType.PLATFORM_JSON,),
cls.LIBRARY: (
ManifestFileType.LIBRARY_JSON,
ManifestFileType.LIBRARY_PROPERTIES,
ManifestFileType.MODULE_JSON,
),
cls.TOOL: (ManifestFileType.PACKAGE_JSON,),
}
@classmethod
def from_archive(cls, path):
assert path.endswith("tar.gz")
manifest_map = cls.get_manifest_map()
with tarfile.open(path, mode="r:gz") as tf:
for t in sorted(cls.items().values()):
for manifest in manifest_map[t]:
try:
if tf.getmember(manifest):
return t
except KeyError:
pass
return None
class PackageSpec(object):
def __init__( # pylint: disable=redefined-builtin,too-many-arguments
self, raw=None, ownername=None, id=None, name=None, requirements=None, url=None
):
self.ownername = ownername
self.id = id
self.name = name
self.requirements = requirements
self.url = url
self._parse(raw)
def __repr__(self):
return (
"PackageSpec <ownername={ownername} id={id} name={name} "
"requirements={requirements} url={url}>".format(
ownername=self.ownername,
id=self.id,
name=self.name,
requirements=self.requirements,
url=self.url,
)
)
def __eq__(self, other):
return all(
[
self.ownername == other.ownername,
self.id == other.id,
self.name == other.name,
self.requirements == other.requirements,
self.url == other.url,
]
)
def _parse(self, raw):
if raw is None:
return
if not isinstance(raw, string_types):
raw = str(raw)
raw = raw.strip()
parsers = (
self._parse_requirements,
self._parse_fixed_name,
self._parse_id,
self._parse_ownername,
self._parse_url,
)
for parser in parsers:
if raw is None:
break
raw = parser(raw)
# if name is not fixed, parse it from URL
if not self.name and self.url:
self.name = self._parse_name_from_url(self.url)
elif raw:
# the leftover is a package name
self.name = raw
def _parse_requirements(self, raw):
if "@" not in raw:
return raw
tokens = raw.rsplit("@", 1)
if any(s in tokens[1] for s in (":", "/")):
return raw
self.requirements = tokens[1].strip()
return tokens[0].strip()
def _parse_fixed_name(self, raw):
if "=" not in raw or raw.startswith("id="):
return raw
tokens = raw.split("=", 1)
if "/" in tokens[0]:
return raw
self.name = tokens[0].strip()
return tokens[1].strip()
def _parse_id(self, raw):
if raw.isdigit():
self.id = int(raw)
return None
if raw.startswith("id="):
return self._parse_id(raw[3:])
return raw
def _parse_ownername(self, raw):
if raw.count("/") != 1 or "@" in raw:
return raw
tokens = raw.split("/", 1)
self.ownername = tokens[0].strip()
self.name = tokens[1].strip()
return None
def _parse_url(self, raw):
if not any(s in raw for s in ("@", ":", "/")):
return raw
self.url = raw.strip()
return None
@staticmethod
def _parse_name_from_url(url):
if url.endswith("/"):
url = url[:-1]
for c in ("#", "?"):
if c in url:
url = url[: url.index(c)]
name = os.path.basename(url)
if "." in name:
return name.split(".", 1)[0].strip()
return name

View File

@ -19,10 +19,19 @@ from zipfile import ZipFile
import click
from platformio import exception, util
from platformio import util
from platformio.package.exception import PackageException
class ArchiveBase(object):
class ExtractArchiveItemError(PackageException):
MESSAGE = (
"Could not extract `{0}` to `{1}`. Try to disable antivirus "
"tool or check this solution -> http://bit.ly/faq-package-manager"
)
class BaseArchiver(object):
def __init__(self, arhfileobj):
self._afo = arhfileobj
@ -46,9 +55,9 @@ class ArchiveBase(object):
self._afo.close()
class TARArchive(ArchiveBase):
class TARArchiver(BaseArchiver):
def __init__(self, archpath):
super(TARArchive, self).__init__(tarfile_open(archpath))
super(TARArchiver, self).__init__(tarfile_open(archpath))
def get_items(self):
return self._afo.getmembers()
@ -79,7 +88,7 @@ class TARArchive(ArchiveBase):
self.is_link(item) and self.is_bad_link(item, dest_dir),
]
if not any(bad_conds):
super(TARArchive, self).extract_item(item, dest_dir)
super(TARArchiver, self).extract_item(item, dest_dir)
else:
click.secho(
"Blocked insecure item `%s` from TAR archive" % item.name,
@ -88,9 +97,9 @@ class TARArchive(ArchiveBase):
)
class ZIPArchive(ArchiveBase):
class ZIPArchiver(BaseArchiver):
def __init__(self, archpath):
super(ZIPArchive, self).__init__(ZipFile(archpath))
super(ZIPArchiver, self).__init__(ZipFile(archpath))
@staticmethod
def preserve_permissions(item, dest_dir):
@ -121,48 +130,59 @@ class ZIPArchive(ArchiveBase):
class FileUnpacker(object):
def __init__(self, archpath):
self.archpath = archpath
self._unpacker = None
def __init__(self, path):
self.path = path
self._archiver = None
def _init_archiver(self):
magic_map = {
b"\x1f\x8b\x08": TARArchiver,
b"\x42\x5a\x68": TARArchiver,
b"\x50\x4b\x03\x04": ZIPArchiver,
}
magic_len = max(len(k) for k in magic_map)
with open(self.path, "rb") as fp:
data = fp.read(magic_len)
for magic, archiver in magic_map.items():
if data.startswith(magic):
return archiver(self.path)
raise PackageException("Unknown archive type '%s'" % self.path)
def __enter__(self):
if self.archpath.lower().endswith((".gz", ".bz2", ".tar")):
self._unpacker = TARArchive(self.archpath)
elif self.archpath.lower().endswith(".zip"):
self._unpacker = ZIPArchive(self.archpath)
if not self._unpacker:
raise exception.UnsupportedArchiveType(self.archpath)
self._archiver = self._init_archiver()
return self
def __exit__(self, *args):
if self._unpacker:
self._unpacker.close()
if self._archiver:
self._archiver.close()
def unpack(
self, dest_dir=".", with_progress=True, check_unpacked=True, silent=False
self, dest_dir=None, with_progress=True, check_unpacked=True, silent=False
):
assert self._unpacker
assert self._archiver
if not dest_dir:
dest_dir = os.getcwd()
if not with_progress or silent:
if not silent:
click.echo("Unpacking...")
for item in self._unpacker.get_items():
self._unpacker.extract_item(item, dest_dir)
for item in self._archiver.get_items():
self._archiver.extract_item(item, dest_dir)
else:
items = self._unpacker.get_items()
items = self._archiver.get_items()
with click.progressbar(items, label="Unpacking") as pb:
for item in pb:
self._unpacker.extract_item(item, dest_dir)
self._archiver.extract_item(item, dest_dir)
if not check_unpacked:
return True
# check on disk
for item in self._unpacker.get_items():
filename = self._unpacker.get_item_filename(item)
for item in self._archiver.get_items():
filename = self._archiver.get_item_filename(item)
item_path = os.path.join(dest_dir, filename)
try:
if not self._unpacker.is_link(item) and not os.path.exists(item_path):
raise exception.ExtractArchiveItemError(filename, dest_dir)
if not self._archiver.is_link(item) and not os.path.exists(item_path):
raise ExtractArchiveItemError(filename, dest_dir)
except NotImplementedError:
pass
return True

View File

@ -371,7 +371,7 @@ PING_REMOTE_HOSTS = [
]
@memoized(expire="5s")
@memoized(expire="10s")
def _internet_on():
timeout = 2
socket.setdefaulttimeout(timeout)

View File

@ -0,0 +1,303 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import pytest
from platformio import fs, util
from platformio.package.exception import MissingPackageManifestError
from platformio.package.manager.library import LibraryPackageManager
from platformio.package.manager.platform import PlatformPackageManager
from platformio.package.manager.tool import ToolPackageManager
from platformio.package.meta import PackageSpec
from platformio.package.pack import PackagePacker
def test_download(isolated_pio_core):
url = "https://github.com/platformio/platformio-core/archive/v4.3.4.zip"
checksum = "69d59642cb91e64344f2cdc1d3b98c5cd57679b5f6db7accc7707bd4c5d9664a"
lm = LibraryPackageManager()
archive_path = lm.download(url, checksum, silent=True)
assert fs.calculate_file_hashsum("sha256", archive_path) == checksum
lm.cleanup_expired_downloads()
assert os.path.isfile(archive_path)
# test outdated downloads
lm.set_download_utime(archive_path, time.time() - lm.DOWNLOAD_CACHE_EXPIRE - 1)
lm.cleanup_expired_downloads()
assert not os.path.isfile(archive_path)
# check that key is deleted from DB
with open(lm.get_download_usagedb_path()) as fp:
assert os.path.basename(archive_path) not in fp.read()
def test_find_pkg_root(isolated_pio_core, tmpdir_factory):
# has manifest
pkg_dir = tmpdir_factory.mktemp("package-has-manifest")
root_dir = pkg_dir.join("nested").mkdir().join("folder").mkdir()
root_dir.join("platform.json").write("")
pm = PlatformPackageManager()
found_dir = pm.find_pkg_root(str(pkg_dir), spec=None)
assert os.path.realpath(str(root_dir)) == os.path.realpath(found_dir)
# does not have manifest
pkg_dir = tmpdir_factory.mktemp("package-does-not-have-manifest")
pkg_dir.join("nested").mkdir().join("folder").mkdir().join("readme.txt").write("")
pm = PlatformPackageManager()
with pytest.raises(MissingPackageManifestError):
pm.find_pkg_root(str(pkg_dir), spec=None)
# library package without manifest, should find source root
pkg_dir = tmpdir_factory.mktemp("library-package-without-manifest")
root_dir = pkg_dir.join("nested").mkdir().join("folder").mkdir()
root_dir.join("src").mkdir().join("main.cpp").write("")
root_dir.join("include").mkdir().join("main.h").write("")
assert os.path.realpath(str(root_dir)) == os.path.realpath(
LibraryPackageManager.find_library_root(str(pkg_dir))
)
# library manager should create "library.json"
lm = LibraryPackageManager()
spec = PackageSpec("custom-name@1.0.0")
pkg_root = lm.find_pkg_root(pkg_dir, spec)
manifest_path = os.path.join(pkg_root, "library.json")
assert os.path.realpath(str(root_dir)) == os.path.realpath(pkg_root)
assert os.path.isfile(manifest_path)
manifest = lm.load_manifest(pkg_root)
assert manifest["name"] == "custom-name"
assert "0.0.0" in str(manifest["version"])
def test_build_legacy_spec(isolated_pio_core, tmpdir_factory):
storage_dir = tmpdir_factory.mktemp("storage")
pm = PlatformPackageManager(str(storage_dir))
# test src manifest
pkg1_dir = storage_dir.join("pkg-1").mkdir()
pkg1_dir.join(".pio").mkdir().join(".piopkgmanager.json").write(
"""
{
"name": "StreamSpy-0.0.1.tar",
"url": "https://dl.platformio.org/e8936b7/StreamSpy-0.0.1.tar.gz",
"requirements": null
}
"""
)
assert pm.build_legacy_spec(str(pkg1_dir)) == PackageSpec(
name="StreamSpy-0.0.1.tar",
url="https://dl.platformio.org/e8936b7/StreamSpy-0.0.1.tar.gz",
)
# without src manifest
pkg2_dir = storage_dir.join("pkg-2").mkdir()
pkg2_dir.join("main.cpp").write("")
with pytest.raises(MissingPackageManifestError):
pm.build_legacy_spec(str(pkg2_dir))
# with package manifest
pkg3_dir = storage_dir.join("pkg-3").mkdir()
pkg3_dir.join("platform.json").write('{"name": "pkg3", "version": "1.2.0"}')
assert pm.build_legacy_spec(str(pkg3_dir)) == PackageSpec(name="pkg3")
def test_build_metadata(isolated_pio_core, tmpdir_factory):
pm = PlatformPackageManager()
vcs_revision = "a2ebfd7c0f"
pkg_dir = tmpdir_factory.mktemp("package")
# test package without manifest
with pytest.raises(MissingPackageManifestError):
pm.load_manifest(str(pkg_dir))
with pytest.raises(MissingPackageManifestError):
pm.build_metadata(str(pkg_dir), PackageSpec("MyLib"))
# with manifest
pkg_dir.join("platform.json").write(
'{"name": "Dev-Platform", "version": "1.2.3-alpha.1"}'
)
metadata = pm.build_metadata(str(pkg_dir), PackageSpec("owner/platform-name"))
assert metadata.name == "Dev-Platform"
assert str(metadata.version) == "1.2.3-alpha.1"
# with vcs
metadata = pm.build_metadata(
str(pkg_dir), PackageSpec("owner/platform-name"), vcs_revision
)
assert str(metadata.version) == ("1.2.3-alpha.1+sha." + vcs_revision)
assert metadata.version.build[1] == vcs_revision
def test_install_from_url(isolated_pio_core, tmpdir_factory):
tmp_dir = tmpdir_factory.mktemp("tmp")
storage_dir = tmpdir_factory.mktemp("storage")
lm = LibraryPackageManager(str(storage_dir))
# install from local directory
src_dir = tmp_dir.join("local-lib-dir").mkdir()
src_dir.join("main.cpp").write("")
spec = PackageSpec("file://%s" % src_dir)
pkg = lm.install(spec, silent=True)
assert os.path.isfile(os.path.join(pkg.path, "main.cpp"))
manifest = lm.load_manifest(pkg)
assert manifest["name"] == "local-lib-dir"
assert manifest["version"].startswith("0.0.0+")
assert spec == pkg.metadata.spec
# install from local archive
src_dir = tmp_dir.join("archive-src").mkdir()
root_dir = src_dir.mkdir("root")
root_dir.mkdir("src").join("main.cpp").write("#include <stdio.h>")
root_dir.join("library.json").write(
'{"name": "manifest-lib-name", "version": "2.0.0"}'
)
tarball_path = PackagePacker(str(src_dir)).pack(str(tmp_dir))
spec = PackageSpec("file://%s" % tarball_path)
pkg = lm.install(spec, silent=True)
assert os.path.isfile(os.path.join(pkg.path, "src", "main.cpp"))
assert pkg == lm.get_package(spec)
assert spec == pkg.metadata.spec
# install from registry
src_dir = tmp_dir.join("registry-1").mkdir()
src_dir.join("library.properties").write(
"""
name = wifilib
version = 5.2.7
"""
)
spec = PackageSpec("company/wifilib @ ^5")
pkg = lm.install_from_url("file://%s" % src_dir, spec)
assert str(pkg.metadata.version) == "5.2.7"
def test_install_from_registry(isolated_pio_core, tmpdir_factory):
# Libraries
lm = LibraryPackageManager(str(tmpdir_factory.mktemp("lib-storage")))
# library with dependencies
lm.install("AsyncMqttClient-esphome @ 0.8.4", silent=True)
assert len(lm.get_installed()) == 3
pkg = lm.get_package("AsyncTCP-esphome")
assert pkg.metadata.spec.owner == "ottowinter"
assert not lm.get_package("non-existing-package")
# mbed library
assert lm.install("wolfSSL", silent=True)
assert len(lm.get_installed()) == 4
# Tools
tm = ToolPackageManager(str(tmpdir_factory.mktemp("tool-storage")))
pkg = tm.install("tool-stlink @ ~1.10400.0", silent=True)
manifest = tm.load_manifest(pkg)
assert tm.is_system_compatible(manifest.get("system"))
assert util.get_systype() in manifest.get("system", [])
def test_get_installed(isolated_pio_core, tmpdir_factory):
storage_dir = tmpdir_factory.mktemp("storage")
lm = LibraryPackageManager(str(storage_dir))
# VCS package
(
storage_dir.join("pkg-vcs")
.mkdir()
.join(".git")
.mkdir()
.join(".piopm")
.write(
"""
{
"name": "pkg-via-vcs",
"spec": {
"id": null,
"name": "pkg-via-vcs",
"owner": null,
"requirements": null,
"url": "git+https://github.com/username/repo.git"
},
"type": "library",
"version": "0.0.0+sha.1ea4d5e"
}
"""
)
)
# package without metadata file
(
storage_dir.join("foo@3.4.5")
.mkdir()
.join("library.json")
.write('{"name": "foo", "version": "3.4.5"}')
)
# package with metadata file
foo_dir = storage_dir.join("foo").mkdir()
foo_dir.join("library.json").write('{"name": "foo", "version": "3.6.0"}')
foo_dir.join(".piopm").write(
"""
{
"name": "foo",
"spec": {
"name": "foo",
"owner": null,
"requirements": "^3"
},
"type": "library",
"version": "3.6.0"
}
"""
)
# invalid package
storage_dir.join("invalid-package").mkdir().join("package.json").write(
'{"name": "tool-scons", "version": "4.0.0"}'
)
installed = lm.get_installed()
assert len(installed) == 3
assert set(["pkg-via-vcs", "foo"]) == set(p.metadata.name for p in installed)
assert str(lm.get_package("foo").metadata.version) == "3.6.0"
def test_uninstall(isolated_pio_core, tmpdir_factory):
tmp_dir = tmpdir_factory.mktemp("tmp")
storage_dir = tmpdir_factory.mktemp("storage")
lm = LibraryPackageManager(str(storage_dir))
# foo @ 1.0.0
pkg_dir = tmp_dir.join("foo").mkdir()
pkg_dir.join("library.json").write('{"name": "foo", "version": "1.0.0"}')
lm.install_from_url("file://%s" % pkg_dir, "foo")
# foo @ 1.3.0
pkg_dir = tmp_dir.join("foo-1.3.0").mkdir()
pkg_dir.join("library.json").write('{"name": "foo", "version": "1.3.0"}')
lm.install_from_url("file://%s" % pkg_dir, "foo")
# bar
pkg_dir = tmp_dir.join("bar").mkdir()
pkg_dir.join("library.json").write('{"name": "bar", "version": "1.0.0"}')
lm.install("file://%s" % pkg_dir, silent=True)
assert len(lm.get_installed()) == 3
assert os.path.isdir(os.path.join(str(storage_dir), "foo"))
assert os.path.isdir(os.path.join(str(storage_dir), "foo@1.0.0"))
# check detaching
assert lm.uninstall("FOO", silent=True)
assert len(lm.get_installed()) == 2
assert os.path.isdir(os.path.join(str(storage_dir), "foo"))
assert not os.path.isdir(os.path.join(str(storage_dir), "foo@1.0.0"))
# uninstall the rest
assert lm.uninstall("foo", silent=True)
assert lm.uninstall("bar", silent=True)
assert len(lm.get_installed()) == 0

250
tests/package/test_meta.py Normal file
View File

@ -0,0 +1,250 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import jsondiff
import semantic_version
from platformio.package.meta import PackageMetaData, PackageSpec, PackageType
def test_spec_owner():
assert PackageSpec("alice/foo library") == PackageSpec(
owner="alice", name="foo library"
)
spec = PackageSpec(" Bob / BarUpper ")
assert spec != PackageSpec(owner="BOB", name="BARUPPER")
assert spec.owner == "Bob"
assert spec.name == "BarUpper"
def test_spec_id():
assert PackageSpec(13) == PackageSpec(id=13)
assert PackageSpec("20") == PackageSpec(id=20)
spec = PackageSpec("id=199")
assert spec == PackageSpec(id=199)
assert isinstance(spec.id, int)
def test_spec_name():
assert PackageSpec("foo") == PackageSpec(name="foo")
assert PackageSpec(" bar-24 ") == PackageSpec(name="bar-24")
def test_spec_requirements():
assert PackageSpec("foo@1.2.3") == PackageSpec(name="foo", requirements="1.2.3")
assert PackageSpec("bar @ ^1.2.3") == PackageSpec(name="bar", requirements="^1.2.3")
assert PackageSpec("13 @ ~2.0") == PackageSpec(id=13, requirements="~2.0")
spec = PackageSpec("id=20 @ !=1.2.3,<2.0")
assert isinstance(spec.requirements, semantic_version.SimpleSpec)
assert semantic_version.Version("1.3.0-beta.1") in spec.requirements
assert spec == PackageSpec(id=20, requirements="!=1.2.3,<2.0")
def test_spec_local_urls():
assert PackageSpec("file:///tmp/foo.tar.gz") == PackageSpec(
url="file:///tmp/foo.tar.gz", name="foo"
)
assert PackageSpec("customName=file:///tmp/bar.zip") == PackageSpec(
url="file:///tmp/bar.zip", name="customName"
)
assert PackageSpec("file:///tmp/some-lib/") == PackageSpec(
url="file:///tmp/some-lib/", name="some-lib"
)
assert PackageSpec("file:///tmp/foo.tar.gz@~2.3.0-beta.1") == PackageSpec(
url="file:///tmp/foo.tar.gz", name="foo", requirements="~2.3.0-beta.1"
)
def test_spec_external_urls():
assert PackageSpec(
"https://github.com/platformio/platformio-core/archive/develop.zip"
) == PackageSpec(
url="https://github.com/platformio/platformio-core/archive/develop.zip",
name="platformio-core",
)
assert PackageSpec(
"https://github.com/platformio/platformio-core/archive/develop.zip?param=value"
" @ !=2"
) == PackageSpec(
url="https://github.com/platformio/platformio-core/archive/"
"develop.zip?param=value",
name="platformio-core",
requirements="!=2",
)
spec = PackageSpec(
"Custom-Name="
"https://github.com/platformio/platformio-core/archive/develop.tar.gz@4.4.0"
)
assert spec.is_custom_name()
assert spec.name == "Custom-Name"
assert spec == PackageSpec(
url="https://github.com/platformio/platformio-core/archive/develop.tar.gz",
name="Custom-Name",
requirements="4.4.0",
)
def test_spec_vcs_urls():
assert PackageSpec("https://github.com/platformio/platformio-core") == PackageSpec(
name="platformio-core", url="git+https://github.com/platformio/platformio-core"
)
assert PackageSpec("https://gitlab.com/username/reponame") == PackageSpec(
name="reponame", url="git+https://gitlab.com/username/reponame"
)
assert PackageSpec(
"wolfSSL=https://os.mbed.com/users/wolfSSL/code/wolfSSL/"
) == PackageSpec(
name="wolfSSL", url="hg+https://os.mbed.com/users/wolfSSL/code/wolfSSL/"
)
assert PackageSpec(
"https://github.com/platformio/platformio-core.git#master"
) == PackageSpec(
name="platformio-core",
url="git+https://github.com/platformio/platformio-core.git#master",
)
assert PackageSpec(
"core=git+ssh://github.com/platformio/platformio-core.git#v4.4.0@4.4.0"
) == PackageSpec(
name="core",
url="git+ssh://github.com/platformio/platformio-core.git#v4.4.0",
requirements="4.4.0",
)
assert PackageSpec(
"username@github.com:platformio/platformio-core.git"
) == PackageSpec(
name="platformio-core",
url="git+username@github.com:platformio/platformio-core.git",
)
assert PackageSpec(
"pkg=git+git@github.com:platformio/platformio-core.git @ ^1.2.3,!=5"
) == PackageSpec(
name="pkg",
url="git+git@github.com:platformio/platformio-core.git",
requirements="^1.2.3,!=5",
)
def test_spec_as_dict():
assert not jsondiff.diff(
PackageSpec("bob/foo@1.2.3").as_dict(),
{
"owner": "bob",
"id": None,
"name": "foo",
"requirements": "1.2.3",
"url": None,
},
)
assert not jsondiff.diff(
PackageSpec(
"https://github.com/platformio/platformio-core/archive/develop.zip?param=value"
" @ !=2"
).as_dict(),
{
"owner": None,
"id": None,
"name": "platformio-core",
"requirements": "!=2",
"url": "https://github.com/platformio/platformio-core/archive/develop.zip?param=value",
},
)
def test_metadata_as_dict():
metadata = PackageMetaData(PackageType.LIBRARY, "foo", "1.2.3")
# test setter
metadata.version = "0.1.2+12345"
assert metadata.version == semantic_version.Version("0.1.2+12345")
assert not jsondiff.diff(
metadata.as_dict(),
{
"type": PackageType.LIBRARY,
"name": "foo",
"version": "0.1.2+12345",
"spec": None,
},
)
assert not jsondiff.diff(
PackageMetaData(
PackageType.TOOL,
"toolchain",
"2.0.5",
PackageSpec("platformio/toolchain@~2.0.0"),
).as_dict(),
{
"type": PackageType.TOOL,
"name": "toolchain",
"version": "2.0.5",
"spec": {
"owner": "platformio",
"id": None,
"name": "toolchain",
"requirements": "~2.0.0",
"url": None,
},
},
)
def test_metadata_dump(tmpdir_factory):
pkg_dir = tmpdir_factory.mktemp("package")
metadata = PackageMetaData(
PackageType.TOOL,
"toolchain",
"2.0.5",
PackageSpec("platformio/toolchain@~2.0.0"),
)
dst = pkg_dir.join(".piopm")
metadata.dump(str(dst))
assert os.path.isfile(str(dst))
contents = dst.read()
assert all(s in contents for s in ("null", '"~2.0.0"'))
def test_metadata_load(tmpdir_factory):
contents = """
{
"name": "foo",
"spec": {
"name": "foo",
"owner": "username",
"requirements": "!=3.4.5"
},
"type": "platform",
"version": "0.1.3"
}
"""
pkg_dir = tmpdir_factory.mktemp("package")
dst = pkg_dir.join(".piopm")
dst.write(contents)
metadata = PackageMetaData.load(str(dst))
assert metadata.version == semantic_version.Version("0.1.3")
assert metadata == PackageMetaData(
PackageType.PLATFORM,
"foo",
"0.1.3",
spec=PackageSpec(owner="username", name="foo", requirements="!=3.4.5"),
)
piopm_path = pkg_dir.join(".piopm")
metadata = PackageMetaData(
PackageType.LIBRARY, "mylib", version="1.2.3", spec=PackageSpec("mylib")
)
metadata.dump(str(piopm_path))
restored_metadata = PackageMetaData.load(str(piopm_path))
assert metadata == restored_metadata

View File

@ -1,119 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.package.spec import PackageSpec
def test_ownername():
assert PackageSpec("alice/foo library") == PackageSpec(
ownername="alice", name="foo library"
)
assert PackageSpec(" bob / bar ") == PackageSpec(ownername="bob", name="bar")
def test_id():
assert PackageSpec(13) == PackageSpec(id=13)
assert PackageSpec("20") == PackageSpec(id=20)
assert PackageSpec("id=199") == PackageSpec(id=199)
def test_name():
assert PackageSpec("foo") == PackageSpec(name="foo")
assert PackageSpec(" bar-24 ") == PackageSpec(name="bar-24")
def test_requirements():
assert PackageSpec("foo@1.2.3") == PackageSpec(name="foo", requirements="1.2.3")
assert PackageSpec("bar @ ^1.2.3") == PackageSpec(name="bar", requirements="^1.2.3")
assert PackageSpec("13 @ ~2.0") == PackageSpec(id=13, requirements="~2.0")
assert PackageSpec("id=20 @ !=1.2.3,<2.0") == PackageSpec(
id=20, requirements="!=1.2.3,<2.0"
)
def test_local_urls():
assert PackageSpec("file:///tmp/foo.tar.gz") == PackageSpec(
url="file:///tmp/foo.tar.gz", name="foo"
)
assert PackageSpec("customName=file:///tmp/bar.zip") == PackageSpec(
url="file:///tmp/bar.zip", name="customName"
)
assert PackageSpec("file:///tmp/some-lib/") == PackageSpec(
url="file:///tmp/some-lib/", name="some-lib"
)
assert PackageSpec("file:///tmp/foo.tar.gz@~2.3.0-beta.1") == PackageSpec(
url="file:///tmp/foo.tar.gz", name="foo", requirements="~2.3.0-beta.1"
)
def test_external_urls():
assert PackageSpec(
"https://github.com/platformio/platformio-core/archive/develop.zip"
) == PackageSpec(
url="https://github.com/platformio/platformio-core/archive/develop.zip",
name="develop",
)
assert PackageSpec(
"https://github.com/platformio/platformio-core/archive/develop.zip?param=value"
" @ !=2"
) == PackageSpec(
url="https://github.com/platformio/platformio-core/archive/"
"develop.zip?param=value",
name="develop",
requirements="!=2",
)
assert PackageSpec(
"platformio-core="
"https://github.com/platformio/platformio-core/archive/develop.tar.gz@4.4.0"
) == PackageSpec(
url="https://github.com/platformio/platformio-core/archive/develop.tar.gz",
name="platformio-core",
requirements="4.4.0",
)
def test_vcs_urls():
assert PackageSpec(
"https://github.com/platformio/platformio-core.git"
) == PackageSpec(
name="platformio-core", url="https://github.com/platformio/platformio-core.git",
)
assert PackageSpec(
"wolfSSL=https://os.mbed.com/users/wolfSSL/code/wolfSSL/"
) == PackageSpec(
name="wolfSSL", url="https://os.mbed.com/users/wolfSSL/code/wolfSSL/",
)
assert PackageSpec(
"git+https://github.com/platformio/platformio-core.git#master"
) == PackageSpec(
name="platformio-core",
url="git+https://github.com/platformio/platformio-core.git#master",
)
assert PackageSpec(
"core=git+ssh://github.com/platformio/platformio-core.git#v4.4.0@4.4.0"
) == PackageSpec(
name="core",
url="git+ssh://github.com/platformio/platformio-core.git#v4.4.0",
requirements="4.4.0",
)
assert PackageSpec("git@github.com:platformio/platformio-core.git") == PackageSpec(
name="platformio-core", url="git@github.com:platformio/platformio-core.git",
)
assert PackageSpec(
"pkg=git+git@github.com:platformio/platformio-core.git @ ^1.2.3,!=5"
) == PackageSpec(
name="pkg",
url="git+git@github.com:platformio/platformio-core.git",
requirements="^1.2.3,!=5",
)