Files
platformio-core/platformio/util.py
2015-05-25 13:17:50 +03:00

354 lines
9.3 KiB
Python

# Copyright (C) Ivan Kravets <me@ikravets.com>
# See LICENSE for details.
import collections
import functools
import json
import os
import re
import subprocess
from os.path import (abspath, basename, dirname, expanduser, isdir, isfile,
join, realpath)
from platform import system, uname
from threading import Thread
import requests
from platformio import __apiurl__, __version__, exception
try:
from configparser import ConfigParser
except ImportError:
from ConfigParser import ConfigParser
class AsyncPipe(Thread):
def __init__(self, outcallback=None):
Thread.__init__(self)
self.outcallback = outcallback
self._fd_read, self._fd_write = os.pipe()
self._pipe_reader = os.fdopen(self._fd_read)
self._buffer = []
self.start()
def get_buffer(self):
return self._buffer
def fileno(self):
return self._fd_write
def run(self):
for line in iter(self._pipe_reader.readline, ""):
line = line.strip()
self._buffer.append(line)
if self.outcallback:
self.outcallback(line)
else:
print line
self._pipe_reader.close()
def close(self):
os.close(self._fd_write)
self.join()
class memoized(object):
'''
Decorator. Caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned
(not reevaluated).
https://wiki.python.org/moin/PythonDecoratorLibrary#Memoize
'''
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, *args):
if not isinstance(args, collections.Hashable):
# uncacheable. a list, for instance.
# better to not cache than blow up.
return self.func(*args)
if args in self.cache:
return self.cache[args]
else:
value = self.func(*args)
self.cache[args] = value
return value
def __repr__(self):
'''Return the function's docstring.'''
return self.func.__doc__
def __get__(self, obj, objtype):
'''Support instance methods.'''
return functools.partial(self.__call__, obj)
def get_systype():
data = uname()
systype = data[0]
if data[4]:
systype += "_" + data[4]
return systype.lower()
def pioversion_to_intstr():
vermatch = re.match(r"^([\d\.]+)", __version__)
assert vermatch
return [int(i) for i in vermatch.group(1).split(".")[:3]]
def _get_projconf_option_dir(name, default=None):
_env_name = "PLATFORMIO_%s" % name.upper()
if _env_name in os.environ:
return os.getenv(_env_name)
try:
config = get_project_config()
if (config.has_section("platformio") and
config.has_option("platformio", name)):
option_dir = config.get("platformio", name)
if option_dir.startswith("~"):
option_dir = expanduser(option_dir)
return abspath(option_dir)
except exception.NotPlatformProject:
pass
return default
def get_home_dir():
home_dir = _get_projconf_option_dir(
"home_dir",
join(expanduser("~"), ".platformio")
)
if not isdir(home_dir):
os.makedirs(home_dir)
assert isdir(home_dir)
return home_dir
def get_lib_dir():
return _get_projconf_option_dir(
"lib_dir",
join(get_home_dir(), "lib")
)
def get_source_dir():
return dirname(realpath(__file__))
def get_project_dir():
return os.getcwd()
def get_projectsrc_dir():
return _get_projconf_option_dir(
"src_dir",
join(get_project_dir(), "src")
)
def get_pioenvs_dir():
return _get_projconf_option_dir(
"envs_dir",
join(get_project_dir(), ".pioenvs")
)
def get_project_config():
path = join(get_project_dir(), "platformio.ini")
if not isfile(path):
raise exception.NotPlatformProject(get_project_dir())
cp = ConfigParser()
cp.read(path)
return cp
def change_filemtime(path, time):
os.utime(path, (time, time))
def is_ci():
return os.getenv("CI", "").lower() == "true"
def exec_command(*args, **kwargs):
result = {
"out": None,
"err": None,
"returncode": None
}
default = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=system() == "Windows"
)
default.update(kwargs)
kwargs = default
p = subprocess.Popen(*args, **kwargs)
try:
result['out'], result['err'] = p.communicate()
result['returncode'] = p.returncode
except KeyboardInterrupt:
raise exception.AbortedByUser()
finally:
for s in ("stdout", "stderr"):
if isinstance(kwargs[s], AsyncPipe):
kwargs[s].close()
for s in ("stdout", "stderr"):
if isinstance(kwargs[s], AsyncPipe):
result[s[3:]] = "\n".join(kwargs[s].get_buffer())
for k, v in result.iteritems():
if v and isinstance(v, basestring):
result[k].strip()
return result
def get_serialports():
if os.name == "nt":
from serial.tools.list_ports_windows import comports
elif os.name == "posix":
from serial.tools.list_ports_posix import comports
else:
raise exception.GetSerialPortsError(os.name)
return [{"port": p, "description": d, "hwid": h} for p, d, h in comports()]
def get_logicaldisks():
disks = []
if system() == "Windows":
result = exec_command(
["wmic", "logicaldisk", "get", "name,VolumeName"]).get("out")
disknamere = re.compile(r"^([A-Z]{1}\:)\s*(\S+)?")
for line in result.split("\n"):
match = disknamere.match(line.strip())
if not match:
continue
disks.append({"disk": match.group(1), "name": match.group(2)})
else:
result = exec_command(["df"]).get("out")
disknamere = re.compile(r"\d+\%\s+([a-z\d\-_/]+)$", flags=re.I)
for line in result.split("\n"):
match = disknamere.search(line.strip())
if not match:
continue
disks.append({"disk": match.group(1),
"name": basename(match.group(1))})
return disks
def get_request_defheaders():
return {"User-Agent": "PlatformIO/%s CI/%d %s" % (
__version__,
1 if is_ci() else 0,
requests.utils.default_user_agent()
)}
def get_api_result(path, params=None, data=None):
result = None
r = None
try:
if data:
r = requests.post(__apiurl__ + path, params=params, data=data,
headers=get_request_defheaders())
else:
r = requests.get(__apiurl__ + path, params=params,
headers=get_request_defheaders())
result = r.json()
r.raise_for_status()
except requests.exceptions.HTTPError as e:
if result and "errors" in result:
raise exception.APIRequestError(result['errors'][0]['title'])
else:
raise exception.APIRequestError(e)
except requests.exceptions.ConnectionError:
raise exception.APIRequestError(
"Could not connect to PlatformIO Registry Service")
except ValueError:
raise exception.APIRequestError(
"Invalid response: %s" % r.text.encode("utf-8"))
finally:
if r:
r.close()
return result
@memoized
def _lookup_boards():
boards = {}
bdirs = [join(get_source_dir(), "boards")]
if isdir(join(get_home_dir(), "boards")):
bdirs.append(join(get_home_dir(), "boards"))
for bdir in bdirs:
for json_file in os.listdir(bdir):
if not json_file.endswith(".json"):
continue
with open(join(bdir, json_file)) as f:
boards.update(json.load(f))
return boards
def get_boards(type_=None):
boards = _lookup_boards()
if type_ is None:
return boards
else:
if type_ not in boards:
raise exception.UnknownBoard(type_)
return boards[type_]
@memoized
def _lookup_frameworks():
frameworks = {}
frameworks_path = join(
get_source_dir(), "builder", "scripts", "frameworks")
frameworks_list = [f[:-3] for f in os.listdir(frameworks_path)
if not f.startswith("__") and f.endswith(".py")]
for _type in frameworks_list:
script_path = join(frameworks_path, "%s.py" % _type)
with open(script_path) as f:
fcontent = f.read()
assert '"""' in fcontent
_doc_start = fcontent.index('"""') + 3
fdoc = fcontent[
_doc_start:fcontent.index('"""', _doc_start)].strip()
doclines = [l.strip() for l in fdoc.splitlines() if l.strip()]
frameworks[_type] = {
"name": doclines[0],
"description": " ".join(doclines[1:-1]),
"url": doclines[-1],
"script": script_path
}
return frameworks
def get_frameworks(type_=None):
frameworks = _lookup_frameworks()
if type_ is None:
return frameworks
else:
if type_ not in frameworks:
raise exception.UnknownFramework(type_)
return frameworks[type_]
return frameworks