forked from platformio/platformio-core
		
	
		
			
				
	
	
		
			433 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			433 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2014-present PlatformIO <contact@platformio.org>
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| # pylint: disable=too-many-arguments, too-many-locals, too-many-branches
 | |
| # pylint: disable=too-many-return-statements
 | |
| 
 | |
| import json
 | |
| import re
 | |
| from glob import glob
 | |
| from os.path import isdir, join
 | |
| 
 | |
| import click
 | |
| 
 | |
| from platformio import app, commands, exception, util
 | |
| from platformio.managers.package import BasePkgManager
 | |
| from platformio.managers.platform import PlatformFactory, PlatformManager
 | |
| 
 | |
| 
 | |
| class LibraryManager(BasePkgManager):
 | |
| 
 | |
|     def __init__(self, package_dir=None):
 | |
|         if not package_dir:
 | |
|             package_dir = join(util.get_home_dir(), "lib")
 | |
|         super(LibraryManager, self).__init__(package_dir)
 | |
| 
 | |
|     @property
 | |
|     def manifest_names(self):
 | |
|         return [
 | |
|             ".library.json", "library.json", "library.properties",
 | |
|             "module.json"
 | |
|         ]
 | |
| 
 | |
|     def get_manifest_path(self, pkg_dir):
 | |
|         path = BasePkgManager.get_manifest_path(self, pkg_dir)
 | |
|         if path:
 | |
|             return path
 | |
| 
 | |
|         # if library without manifest, returns first source file
 | |
|         src_dir = join(util.glob_escape(pkg_dir))
 | |
|         if isdir(join(pkg_dir, "src")):
 | |
|             src_dir = join(src_dir, "src")
 | |
|         chs_files = glob(join(src_dir, "*.[chS]"))
 | |
|         if chs_files:
 | |
|             return chs_files[0]
 | |
|         cpp_files = glob(join(src_dir, "*.cpp"))
 | |
|         if cpp_files:
 | |
|             return cpp_files[0]
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     def load_manifest(self, pkg_dir):
 | |
|         manifest = BasePkgManager.load_manifest(self, pkg_dir)
 | |
|         if not manifest:
 | |
|             return manifest
 | |
| 
 | |
|         # if Arduino library.properties
 | |
|         if "sentence" in manifest:
 | |
|             manifest['frameworks'] = ["arduino"]
 | |
|             manifest['description'] = manifest['sentence']
 | |
|             del manifest['sentence']
 | |
| 
 | |
|         if "author" in manifest:
 | |
|             if isinstance(manifest['author'], dict):
 | |
|                 manifest['authors'] = [manifest['author']]
 | |
|             else:
 | |
|                 manifest['authors'] = [{"name": manifest['author']}]
 | |
|             del manifest['author']
 | |
| 
 | |
|         if "authors" in manifest and not isinstance(manifest['authors'], list):
 | |
|             manifest['authors'] = [manifest['authors']]
 | |
| 
 | |
|         if "keywords" not in manifest:
 | |
|             keywords = []
 | |
|             for keyword in re.split(r"[\s/]+",
 | |
|                                     manifest.get("category", "Uncategorized")):
 | |
|                 keyword = keyword.strip()
 | |
|                 if not keyword:
 | |
|                     continue
 | |
|                 keywords.append(keyword.lower())
 | |
|             manifest['keywords'] = keywords
 | |
|             if "category" in manifest:
 | |
|                 del manifest['category']
 | |
| 
 | |
|         # don't replace VCS URL
 | |
|         if "url" in manifest and "description" in manifest:
 | |
|             manifest['homepage'] = manifest['url']
 | |
|             del manifest['url']
 | |
| 
 | |
|         if "architectures" in manifest:
 | |
|             platforms = []
 | |
|             platforms_map = {
 | |
|                 "avr": "atmelavr",
 | |
|                 "sam": "atmelsam",
 | |
|                 "samd": "atmelsam",
 | |
|                 "esp8266": "espressif8266",
 | |
|                 "esp32": "espressif32",
 | |
|                 "arc32": "intel_arc32"
 | |
|             }
 | |
|             for arch in manifest['architectures'].split(","):
 | |
|                 arch = arch.strip()
 | |
|                 if arch == "*":
 | |
|                     platforms = "*"
 | |
|                     break
 | |
|                 if arch in platforms_map:
 | |
|                     platforms.append(platforms_map[arch])
 | |
|             manifest['platforms'] = platforms
 | |
|             del manifest['architectures']
 | |
| 
 | |
|         # convert listed items via comma to array
 | |
|         for key in ("keywords", "frameworks", "platforms"):
 | |
|             if key not in manifest or \
 | |
|                     not isinstance(manifest[key], basestring):
 | |
|                 continue
 | |
|             manifest[key] = [
 | |
|                 i.strip() for i in manifest[key].split(",") if i.strip()
 | |
|             ]
 | |
| 
 | |
|         return manifest
 | |
| 
 | |
|     @staticmethod
 | |
|     def normalize_dependencies(dependencies):
 | |
|         if not dependencies:
 | |
|             return []
 | |
|         items = []
 | |
|         if isinstance(dependencies, dict):
 | |
|             if "name" in dependencies:
 | |
|                 items.append(dependencies)
 | |
|             else:
 | |
|                 for name, version in dependencies.items():
 | |
|                     items.append({"name": name, "version": version})
 | |
|         elif isinstance(dependencies, list):
 | |
|             items = [d for d in dependencies if "name" in d]
 | |
|         for item in items:
 | |
|             for k in ("frameworks", "platforms"):
 | |
|                 if k not in item or isinstance(k, list):
 | |
|                     continue
 | |
|                 if item[k] == "*":
 | |
|                     del item[k]
 | |
|                 elif isinstance(item[k], basestring):
 | |
|                     item[k] = [
 | |
|                         i.strip() for i in item[k].split(",") if i.strip()
 | |
|                     ]
 | |
|         return items
 | |
| 
 | |
|     def max_satisfying_repo_version(self, versions, requirements=None):
 | |
| 
 | |
|         def _cmp_dates(datestr1, datestr2):
 | |
|             date1 = util.parse_date(datestr1)
 | |
|             date2 = util.parse_date(datestr2)
 | |
|             if date1 == date2:
 | |
|                 return 0
 | |
|             return -1 if date1 < date2 else 1
 | |
| 
 | |
|         semver_spec = self.parse_semver_spec(
 | |
|             requirements) if requirements else None
 | |
|         item = None
 | |
| 
 | |
|         for v in versions:
 | |
|             semver_new = self.parse_semver_version(v['name'])
 | |
|             if semver_spec:
 | |
|                 if not semver_new or semver_new not in semver_spec:
 | |
|                     continue
 | |
|                 if not item or self.parse_semver_version(
 | |
|                         item['name']) < semver_new:
 | |
|                     item = v
 | |
|             elif requirements:
 | |
|                 if requirements == v['name']:
 | |
|                     return v
 | |
| 
 | |
|             else:
 | |
|                 if not item or _cmp_dates(item['released'],
 | |
|                                           v['released']) == -1:
 | |
|                     item = v
 | |
|         return item
 | |
| 
 | |
|     def get_latest_repo_version(self, name, requirements, silent=False):
 | |
|         item = self.max_satisfying_repo_version(
 | |
|             util.get_api_result(
 | |
|                 "/lib/info/%d" % self.search_lib_id(
 | |
|                     {
 | |
|                         "name": name,
 | |
|                         "requirements": requirements
 | |
|                     },
 | |
|                     silent=silent),
 | |
|                 cache_valid="1h")['versions'], requirements)
 | |
|         return item['name'] if item else None
 | |
| 
 | |
|     def _install_from_piorepo(self, name, requirements):
 | |
|         assert name.startswith("id="), name
 | |
|         version = self.get_latest_repo_version(name, requirements)
 | |
|         if not version:
 | |
|             raise exception.UndefinedPackageVersion(requirements or "latest",
 | |
|                                                     util.get_systype())
 | |
|         dl_data = util.get_api_result(
 | |
|             "/lib/download/" + str(name[3:]),
 | |
|             dict(version=version),
 | |
|             cache_valid="30d")
 | |
|         assert dl_data
 | |
| 
 | |
|         return self._install_from_url(
 | |
|             name, dl_data['url'].replace("http://", "https://")
 | |
|             if app.get_setting("enable_ssl") else dl_data['url'], requirements)
 | |
| 
 | |
|     def search_lib_id(  # pylint: disable=too-many-branches
 | |
|             self,
 | |
|             filters,
 | |
|             silent=False,
 | |
|             interactive=False):
 | |
|         assert isinstance(filters, dict)
 | |
|         assert "name" in filters
 | |
| 
 | |
|         # try to find ID within installed packages
 | |
|         lib_id = self._get_lib_id_from_installed(filters)
 | |
|         if lib_id:
 | |
|             return lib_id
 | |
| 
 | |
|         # looking in PIO Library Registry
 | |
|         if not silent:
 | |
|             click.echo("Looking for %s library in registry" % click.style(
 | |
|                 filters['name'], fg="cyan"))
 | |
|         query = []
 | |
|         for key in filters:
 | |
|             if key not in ("name", "authors", "frameworks", "platforms"):
 | |
|                 continue
 | |
|             values = filters[key]
 | |
|             if not isinstance(values, list):
 | |
|                 values = [v.strip() for v in values.split(",") if v]
 | |
|             for value in values:
 | |
|                 query.append('%s:"%s"' %
 | |
|                              (key[:-1] if key.endswith("s") else key, value))
 | |
| 
 | |
|         lib_info = None
 | |
|         result = util.get_api_result(
 | |
|             "/v2/lib/search", dict(query=" ".join(query)), cache_valid="1h")
 | |
|         if result['total'] == 1:
 | |
|             lib_info = result['items'][0]
 | |
|         elif result['total'] > 1:
 | |
|             if silent and not interactive:
 | |
|                 lib_info = result['items'][0]
 | |
|             else:
 | |
|                 click.secho(
 | |
|                     "Conflict: More than one library has been found "
 | |
|                     "by request %s:" % json.dumps(filters),
 | |
|                     fg="yellow",
 | |
|                     err=True)
 | |
|                 for item in result['items']:
 | |
|                     commands.lib.print_lib_item(item)
 | |
| 
 | |
|                 if not interactive:
 | |
|                     click.secho(
 | |
|                         "Automatically chose the first available library "
 | |
|                         "(use `--interactive` option to make a choice)",
 | |
|                         fg="yellow",
 | |
|                         err=True)
 | |
|                     lib_info = result['items'][0]
 | |
|                 else:
 | |
|                     deplib_id = click.prompt(
 | |
|                         "Please choose library ID",
 | |
|                         type=click.Choice(
 | |
|                             [str(i['id']) for i in result['items']]))
 | |
|                     for item in result['items']:
 | |
|                         if item['id'] == int(deplib_id):
 | |
|                             lib_info = item
 | |
|                             break
 | |
| 
 | |
|         if not lib_info:
 | |
|             if filters.keys() == ["name"]:
 | |
|                 raise exception.LibNotFound(filters['name'])
 | |
|             else:
 | |
|                 raise exception.LibNotFound(str(filters))
 | |
|         if not silent:
 | |
|             click.echo("Found: %s" % click.style(
 | |
|                 "https://platformio.org/lib/show/{id}/{name}".format(
 | |
|                     **lib_info),
 | |
|                 fg="blue"))
 | |
|         return int(lib_info['id'])
 | |
| 
 | |
|     def _get_lib_id_from_installed(self, filters):
 | |
|         if filters['name'].startswith("id="):
 | |
|             return int(filters['name'][3:])
 | |
|         package_dir = self.get_package_dir(
 | |
|             filters['name'], filters.get("requirements",
 | |
|                                          filters.get("version")))
 | |
|         if not package_dir:
 | |
|             return None
 | |
|         manifest = self.load_manifest(package_dir)
 | |
|         if "id" not in manifest:
 | |
|             return None
 | |
| 
 | |
|         for key in ("frameworks", "platforms"):
 | |
|             if key not in filters:
 | |
|                 continue
 | |
|             if key not in manifest:
 | |
|                 return None
 | |
|             if not util.items_in_list(
 | |
|                     util.items_to_list(filters[key]),
 | |
|                     util.items_to_list(manifest[key])):
 | |
|                 return None
 | |
| 
 | |
|         if "authors" in filters:
 | |
|             if "authors" not in manifest:
 | |
|                 return None
 | |
|             manifest_authors = manifest['authors']
 | |
|             if not isinstance(manifest_authors, list):
 | |
|                 manifest_authors = [manifest_authors]
 | |
|             manifest_authors = [
 | |
|                 a['name'] for a in manifest_authors
 | |
|                 if isinstance(a, dict) and "name" in a
 | |
|             ]
 | |
|             filter_authors = filters['authors']
 | |
|             if not isinstance(filter_authors, list):
 | |
|                 filter_authors = [filter_authors]
 | |
|             if not set(filter_authors) <= set(manifest_authors):
 | |
|                 return None
 | |
| 
 | |
|         return int(manifest['id'])
 | |
| 
 | |
|     def install(  # pylint: disable=arguments-differ
 | |
|             self,
 | |
|             name,
 | |
|             requirements=None,
 | |
|             silent=False,
 | |
|             after_update=False,
 | |
|             interactive=False,
 | |
|             force=False):
 | |
|         _name, _requirements, _url = self.parse_pkg_uri(name, requirements)
 | |
|         if not _url:
 | |
|             name = "id=%d" % self.search_lib_id({
 | |
|                 "name": _name,
 | |
|                 "requirements": _requirements
 | |
|             },
 | |
|                                                 silent=silent,
 | |
|                                                 interactive=interactive)
 | |
|             requirements = _requirements
 | |
|         pkg_dir = BasePkgManager.install(
 | |
|             self,
 | |
|             name,
 | |
|             requirements,
 | |
|             silent=silent,
 | |
|             after_update=after_update,
 | |
|             force=force)
 | |
| 
 | |
|         if not pkg_dir:
 | |
|             return None
 | |
| 
 | |
|         manifest = self.load_manifest(pkg_dir)
 | |
|         if "dependencies" not in manifest:
 | |
|             return pkg_dir
 | |
| 
 | |
|         if not silent:
 | |
|             click.secho("Installing dependencies", fg="yellow")
 | |
| 
 | |
|         for filters in self.normalize_dependencies(manifest['dependencies']):
 | |
|             assert "name" in filters
 | |
| 
 | |
|             # avoid circle dependencies
 | |
|             if not self.INSTALL_HISTORY:
 | |
|                 self.INSTALL_HISTORY = []
 | |
|             history_key = str(filters)
 | |
|             if history_key in self.INSTALL_HISTORY:
 | |
|                 continue
 | |
|             self.INSTALL_HISTORY.append(history_key)
 | |
| 
 | |
|             if any(s in filters.get("version", "") for s in ("\\", "/")):
 | |
|                 self.install(
 | |
|                     "{name}={version}".format(**filters),
 | |
|                     silent=silent,
 | |
|                     after_update=after_update,
 | |
|                     interactive=interactive,
 | |
|                     force=force)
 | |
|             else:
 | |
|                 try:
 | |
|                     lib_id = self.search_lib_id(filters, silent, interactive)
 | |
|                 except exception.LibNotFound as e:
 | |
|                     if not silent or is_builtin_lib(filters['name']):
 | |
|                         click.secho("Warning! %s" % e, fg="yellow")
 | |
|                     continue
 | |
| 
 | |
|                 if filters.get("version"):
 | |
|                     self.install(
 | |
|                         lib_id,
 | |
|                         filters.get("version"),
 | |
|                         silent=silent,
 | |
|                         after_update=after_update,
 | |
|                         interactive=interactive,
 | |
|                         force=force)
 | |
|                 else:
 | |
|                     self.install(
 | |
|                         lib_id,
 | |
|                         silent=silent,
 | |
|                         after_update=after_update,
 | |
|                         interactive=interactive,
 | |
|                         force=force)
 | |
|         return pkg_dir
 | |
| 
 | |
| 
 | |
| @util.memoized()
 | |
| def get_builtin_libs(storage_names=None):
 | |
|     items = []
 | |
|     storage_names = storage_names or []
 | |
|     pm = PlatformManager()
 | |
|     for manifest in pm.get_installed():
 | |
|         p = PlatformFactory.newPlatform(manifest['__pkg_dir'])
 | |
|         for storage in p.get_lib_storages():
 | |
|             if storage_names and storage['name'] not in storage_names:
 | |
|                 continue
 | |
|             lm = LibraryManager(storage['path'])
 | |
|             items.append({
 | |
|                 "name": storage['name'],
 | |
|                 "path": storage['path'],
 | |
|                 "items": lm.get_installed()
 | |
|             })
 | |
|     return items
 | |
| 
 | |
| 
 | |
| @util.memoized()
 | |
| def is_builtin_lib(name):
 | |
|     for storage in get_builtin_libs():
 | |
|         if any(l.get("name") == name for l in storage['items']):
 | |
|             return True
 | |
|     return False
 |