Files
platformio-core/platformio/util.py

751 lines
22 KiB
Python
Raw Normal View History

# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2019-05-07 22:13:21 +03:00
# pylint: disable=too-many-ancestors
import json
import os
2016-10-13 00:42:16 +03:00
import platform
2015-03-09 12:27:54 +02:00
import re
2017-12-13 18:14:01 +02:00
import socket
import stat
import subprocess
import sys
import time
2017-08-15 22:57:20 +03:00
from functools import wraps
from glob import glob
from os.path import (abspath, basename, dirname, expanduser, isdir, isfile,
2016-08-29 20:20:12 +03:00
join, normpath, splitdrive)
from shutil import rmtree
from threading import Thread
import click
import requests
from platformio import __apiurl__, __version__, exception
from platformio.project.config import ProjectConfig
2019-05-07 22:13:21 +03:00
from platformio.project.helpers import ( # pylint: disable=unused-import
get_project_dir, get_project_optional_dir, get_projectboards_dir,
get_projectbuild_dir, get_projectdata_dir, get_projectlib_dir,
get_projectsrc_dir, get_projecttest_dir, is_platformio_project)
2019-05-07 22:13:21 +03:00
# FIXME: check platformio.project.helpers imports
2018-01-10 03:07:17 +02:00
PY2 = sys.version_info[0] == 2
if PY2:
string_types = basestring # pylint: disable=undefined-variable
else:
string_types = str
class AsyncPipe(Thread):
def __init__(self, outcallback=None):
super(AsyncPipe, self).__init__()
self.outcallback = outcallback
self._fd_read, self._fd_write = os.pipe()
self._pipe_reader = os.fdopen(self._fd_read)
self._buffer = []
self.start()
def get_buffer(self):
return self._buffer
def fileno(self):
return self._fd_write
def run(self):
for line in iter(self._pipe_reader.readline, ""):
line = line.strip()
self._buffer.append(line)
if self.outcallback:
self.outcallback(line)
else:
print(line)
self._pipe_reader.close()
def close(self):
os.close(self._fd_write)
self.join()
2015-05-25 23:29:10 +03:00
class cd(object):
2015-11-30 01:11:57 +02:00
def __init__(self, new_path):
self.new_path = new_path
self.prev_path = os.getcwd()
def __enter__(self):
os.chdir(self.new_path)
def __exit__(self, etype, value, traceback):
os.chdir(self.prev_path)
class memoized(object):
def __init__(self, expire=0):
self.expire = expire / 1000 # milliseconds
self.cache = {}
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
if (key not in self.cache
or (self.expire > 0
and self.cache[key][0] < time.time() - self.expire)):
self.cache[key] = (time.time(), func(*args, **kwargs))
return self.cache[key][1]
2018-10-24 01:19:39 +03:00
wrapper.reset = self._reset
return wrapper
2018-10-24 01:19:39 +03:00
def _reset(self):
self.cache = {}
2017-08-15 22:57:20 +03:00
class throttle(object):
2017-08-17 23:55:42 +03:00
def __init__(self, threshhold):
self.threshhold = threshhold # milliseconds
2017-08-15 22:57:20 +03:00
self.last = 0
def __call__(self, func):
2017-08-15 22:57:20 +03:00
@wraps(func)
2017-08-15 22:57:20 +03:00
def wrapper(*args, **kwargs):
diff = int(round((time.time() - self.last) * 1000))
2017-08-17 23:55:42 +03:00
if diff < self.threshhold:
time.sleep((self.threshhold - diff) * 0.001)
self.last = time.time()
return func(*args, **kwargs)
2017-08-15 22:57:20 +03:00
return wrapper
def singleton(cls):
""" From PEP-318 http://www.python.org/dev/peps/pep-0318/#examples """
_instances = {}
def get_instance(*args, **kwargs):
if cls not in _instances:
_instances[cls] = cls(*args, **kwargs)
return _instances[cls]
2016-08-03 23:38:20 +03:00
return get_instance
2017-11-02 23:14:32 +02:00
def path_to_unicode(path):
if not PY2:
return path
2017-11-02 23:14:32 +02:00
return path.decode(sys.getfilesystemencoding()).encode("utf-8")
2017-04-27 18:28:50 +03:00
2016-04-09 14:15:59 +03:00
def load_json(file_path):
try:
with open(file_path, "r") as f:
return json.load(f)
except ValueError:
2018-04-15 06:08:29 +03:00
raise exception.InvalidJSONFile(file_path)
2016-04-09 14:15:59 +03:00
2014-07-31 16:20:31 +03:00
def get_systype():
2016-10-13 00:42:16 +03:00
type_ = platform.system().lower()
arch = platform.machine().lower()
if type_ == "windows":
arch = "amd64" if platform.architecture()[0] == "64bit" else "x86"
2015-09-10 19:47:14 +03:00
return "%s_%s" % (type_, arch) if arch else type_
2015-03-13 17:54:24 +02:00
def pioversion_to_intstr():
vermatch = re.match(r"^([\d\.]+)", __version__)
assert vermatch
return [int(i) for i in vermatch.group(1).split(".")[:3]]
def get_home_dir():
home_dir = get_project_optional_dir("home_dir",
2016-08-03 23:38:20 +03:00
join(expanduser("~"), ".platformio"))
win_home_dir = None
2016-03-21 18:50:45 +02:00
if "windows" in get_systype():
win_home_dir = splitdrive(home_dir)[0] + "\\.platformio"
if isdir(win_home_dir):
home_dir = win_home_dir
2016-03-21 18:50:45 +02:00
if not isdir(home_dir):
try:
os.makedirs(home_dir)
2017-05-05 13:02:20 +03:00
except: # pylint: disable=bare-except
if win_home_dir:
os.makedirs(win_home_dir)
home_dir = win_home_dir
2014-11-22 23:55:17 +02:00
assert isdir(home_dir)
return home_dir
def get_cache_dir():
return get_project_optional_dir("cache_dir", join(get_home_dir(),
".cache"))
def get_source_dir():
curpath = abspath(__file__)
if not isfile(curpath):
for p in sys.path:
if isfile(join(p, __file__)):
curpath = join(p, __file__)
break
return dirname(curpath)
2019-05-07 17:51:50 +03:00
def load_project_config(path=None): # FIXME: Remove
2018-10-23 22:27:18 +03:00
if not path or isdir(path):
path = join(path or get_project_dir(), "platformio.ini")
if not isfile(path):
raise exception.NotPlatformIOProject(
dirname(path) if path.endswith("platformio.ini") else path)
return ProjectConfig(path)
2019-05-07 17:51:50 +03:00
def parse_conf_multi_values(items): # FIXME: Remove
return ProjectConfig.parse_multi_values(items)
2017-08-15 22:57:20 +03:00
def change_filemtime(path, mtime):
os.utime(path, (mtime, mtime))
2014-06-03 21:27:36 +03:00
def is_ci():
2015-05-25 13:17:50 +03:00
return os.getenv("CI", "").lower() == "true"
2016-09-29 23:53:19 +03:00
def is_container():
if not isfile("/proc/1/cgroup"):
return False
with open("/proc/1/cgroup") as fp:
for line in fp:
line = line.strip()
if ":" in line and not line.endswith(":/"):
return True
return False
2015-01-29 18:54:28 +02:00
def exec_command(*args, **kwargs):
2016-08-03 23:38:20 +03:00
result = {"out": None, "err": None, "returncode": None}
default = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE)
default.update(kwargs)
kwargs = default
p = subprocess.Popen(*args, **kwargs)
try:
result['out'], result['err'] = p.communicate()
result['returncode'] = p.returncode
except KeyboardInterrupt:
2015-02-23 11:50:14 +02:00
raise exception.AbortedByUser()
finally:
for s in ("stdout", "stderr"):
if isinstance(kwargs[s], AsyncPipe):
kwargs[s].close()
for s in ("stdout", "stderr"):
if isinstance(kwargs[s], AsyncPipe):
result[s[3:]] = "\n".join(kwargs[s].get_buffer())
for k, v in result.items():
if not PY2 and isinstance(result[k], bytes):
result[k] = result[k].decode()
if v and isinstance(v, string_types):
result[k] = result[k].strip()
2015-01-29 18:54:28 +02:00
return result
2016-08-29 20:20:12 +03:00
def copy_pythonpath_to_osenv():
_PYTHONPATH = []
if "PYTHONPATH" in os.environ:
_PYTHONPATH = os.environ.get("PYTHONPATH").split(os.pathsep)
for p in os.sys.path:
conditions = [p not in _PYTHONPATH]
if "windows" not in get_systype():
conditions.append(
isdir(join(p, "click")) or isdir(join(p, "platformio")))
if all(conditions):
2016-08-29 20:20:12 +03:00
_PYTHONPATH.append(p)
os.environ['PYTHONPATH'] = os.pathsep.join(_PYTHONPATH)
def get_serial_ports(filter_hwid=False):
2015-10-03 12:28:21 +01:00
try:
from serial.tools.list_ports import comports
except ImportError:
raise exception.GetSerialPortsError(os.name)
result = []
for p, d, h in comports():
if not p:
continue
if "windows" in get_systype():
if PY2:
try:
d = unicode( # pylint: disable=undefined-variable
d,
errors="ignore")
except TypeError:
pass
if not filter_hwid or "VID:PID" in h:
result.append({"port": p, "description": d, "hwid": h})
if filter_hwid:
return result
# fix for PySerial
if not result and "darwin" in get_systype():
for p in glob("/dev/tty.*"):
2016-06-03 20:14:37 +03:00
result.append({"port": p, "description": "n/a", "hwid": "n/a"})
return result
2015-03-09 12:27:54 +02:00
# Backward compatibility for PIO Core <3.5
get_serialports = get_serial_ports
def get_logical_devices():
items = []
if "windows" in get_systype():
2017-08-14 15:27:12 +03:00
try:
result = exec_command(
["wmic", "logicaldisk", "get", "name,VolumeName"]).get(
"out", "")
devicenamere = re.compile(r"^([A-Z]{1}\:)\s*(\S+)?")
2017-08-14 15:27:12 +03:00
for line in result.split("\n"):
match = devicenamere.match(line.strip())
2017-08-14 15:27:12 +03:00
if not match:
continue
items.append({
"path": match.group(1) + "\\",
2017-08-31 20:56:25 +03:00
"name": match.group(2)
})
return items
2017-08-14 15:27:12 +03:00
except WindowsError: # pylint: disable=undefined-variable
pass
# try "fsutil"
result = exec_command(["fsutil", "fsinfo", "drives"]).get("out", "")
for device in re.findall(r"[A-Z]:\\", result):
items.append({"path": device, "name": None})
return items
result = exec_command(["df"]).get("out")
devicenamere = re.compile(r"^/.+\d+\%\s+([a-z\d\-_/]+)$", flags=re.I)
for line in result.split("\n"):
match = devicenamere.match(line.strip())
if not match:
continue
items.append({
"path": match.group(1),
"name": basename(match.group(1))
})
return items
def get_mdns_services():
try:
import zeroconf
except ImportError:
from site import addsitedir
from platformio.managers.core import get_core_package_dir
contrib_pysite_dir = get_core_package_dir("contrib-pysite")
addsitedir(contrib_pysite_dir)
sys.path.insert(0, contrib_pysite_dir)
import zeroconf
class mDNSListener(object):
def __init__(self):
self._zc = zeroconf.Zeroconf(
interfaces=zeroconf.InterfaceChoice.All)
self._found_types = []
self._found_services = []
def __enter__(self):
zeroconf.ServiceBrowser(self._zc, "_services._dns-sd._udp.local.",
self)
return self
def __exit__(self, etype, value, traceback):
self._zc.close()
def remove_service(self, zc, type_, name):
pass
def add_service(self, zc, type_, name):
try:
2017-12-23 22:59:49 +02:00
assert zeroconf.service_type_name(name)
assert str(name)
except (AssertionError, UnicodeError,
zeroconf.BadTypeInNameException):
return
if name not in self._found_types:
self._found_types.append(name)
zeroconf.ServiceBrowser(self._zc, name, self)
if type_ in self._found_types:
s = zc.get_service_info(type_, name)
if s:
self._found_services.append(s)
def get_services(self):
return self._found_services
items = []
with mDNSListener() as mdns:
time.sleep(3)
for service in mdns.get_services():
properties = None
if service.properties:
try:
properties = {
k.decode("utf8"):
v.decode("utf8") if isinstance(v, bytes) else v
for k, v in service.properties.items()
}
json.dumps(properties)
except UnicodeDecodeError:
properties = None
items.append({
"type":
service.type,
"name":
service.name,
"ip":
".".join([
str(c if isinstance(c, int) else ord(c))
for c in service.address
]),
"port":
service.port,
"properties":
properties
})
return items
2015-04-29 18:17:14 +01:00
def get_request_defheaders():
2016-08-03 23:38:20 +03:00
data = (__version__, int(is_ci()), requests.utils.default_user_agent())
return {"User-Agent": "PlatformIO/%s CI/%d %s" % data}
2015-04-29 18:17:14 +01:00
@memoized(expire=10000)
2016-08-01 00:14:22 +03:00
def _api_request_session():
return requests.Session()
2017-08-15 22:57:20 +03:00
@throttle(500)
2016-10-31 20:05:34 +02:00
def _get_api_result(
url, # pylint: disable=too-many-branches
params=None,
data=None,
auth=None):
from platformio.app import get_setting
2016-08-25 22:57:52 +03:00
result = None
r = None
verify_ssl = sys.version_info >= (2, 7, 9)
2014-12-01 22:45:53 +02:00
2016-03-21 14:17:36 +02:00
headers = get_request_defheaders()
if not url.startswith("http"):
url = __apiurl__ + url
if not get_setting("enable_ssl"):
url = url.replace("https://", "http://")
2016-08-25 22:57:52 +03:00
try:
if data:
2016-08-01 00:14:22 +03:00
r = _api_request_session().post(
url,
params=params,
data=data,
headers=headers,
auth=auth,
verify=verify_ssl)
2014-09-08 22:02:57 +03:00
else:
2017-03-02 17:09:22 +02:00
r = _api_request_session().get(
url,
params=params,
headers=headers,
auth=auth,
verify=verify_ssl)
2018-03-23 12:13:59 +02:00
result = r.json()
2016-08-01 00:14:22 +03:00
r.raise_for_status()
2018-03-23 12:13:59 +02:00
return r.text
2014-11-22 23:55:17 +02:00
except requests.exceptions.HTTPError as e:
if result and "message" in result:
raise exception.APIRequestError(result['message'])
if result and "errors" in result:
2015-02-13 23:01:01 +02:00
raise exception.APIRequestError(result['errors'][0]['title'])
raise exception.APIRequestError(e)
except ValueError:
2017-06-05 16:05:05 +03:00
raise exception.APIRequestError(
"Invalid response: %s" % r.text.encode("utf-8"))
finally:
if r:
r.close()
2018-03-23 12:13:59 +02:00
return None
def get_api_result(url, params=None, data=None, auth=None, cache_valid=None):
from platformio.app import ContentCache
total = 0
max_retries = 5
cache_key = (ContentCache.key_from_args(url, params, data, auth)
if cache_valid else None)
while total < max_retries:
try:
with ContentCache() as cc:
if cache_key:
result = cc.get(cache_key)
if result is not None:
return json.loads(result)
# check internet before and resolve issue with 60 seconds timeout
internet_on(raise_exception=True)
result = _get_api_result(url, params, data)
if cache_valid:
with ContentCache() as cc:
cc.set(cache_key, result, cache_valid)
return json.loads(result)
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout) as e:
2016-09-14 14:55:07 +03:00
from platformio.maintenance import in_silence
total += 1
2016-09-14 14:55:07 +03:00
if not in_silence():
click.secho(
"[API] ConnectionError: {0} (incremented retry: max={1}, "
"total={2})".format(e, max_retries, total),
fg="yellow")
time.sleep(2 * total)
raise exception.APIRequestError(
"Could not connect to PlatformIO API Service. "
"Please try later.")
PING_INTERNET_IPS = [
2017-12-28 15:01:18 +02:00
"192.30.253.113", # github.com
"193.222.52.25" # dl.platformio.org
]
@memoized(expire=5000)
2017-12-13 18:14:01 +02:00
def _internet_on():
timeout = 2
socket.setdefaulttimeout(timeout)
for ip in PING_INTERNET_IPS:
try:
2017-12-13 18:14:01 +02:00
if os.getenv("HTTP_PROXY", os.getenv("HTTPS_PROXY")):
requests.get(
"http://%s" % ip, allow_redirects=False, timeout=timeout)
2017-12-13 18:14:01 +02:00
else:
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((ip,
80))
return True
except: # pylint: disable=bare-except
pass
return False
2016-09-19 15:10:28 +03:00
2017-12-13 18:14:01 +02:00
def internet_on(raise_exception=False):
result = _internet_on()
if raise_exception and not result:
raise exception.InternetIsOffline()
return result
2016-08-29 20:20:12 +03:00
def get_pythonexe_path():
return os.environ.get("PYTHONEXEPATH", normpath(sys.executable))
def where_is_program(program, envpath=None):
env = os.environ
if envpath:
env['PATH'] = envpath
# try OS's built-in commands
try:
result = exec_command(
["where" if "windows" in get_systype() else "which", program],
2016-08-03 23:38:20 +03:00
env=env)
if result['returncode'] == 0 and isfile(result['out'].strip()):
return result['out'].strip()
except OSError:
pass
# look up in $PATH
for bin_dir in env.get("PATH", "").split(os.pathsep):
if isfile(join(bin_dir, program)):
return join(bin_dir, program)
if isfile(join(bin_dir, "%s.exe" % program)):
return join(bin_dir, "%s.exe" % program)
return program
def pepver_to_semver(pepver):
2017-01-31 01:58:10 +02:00
return re.sub(r"(\.\d+)\.?(dev|a|b|rc|post)", r"\1-\2.", pepver, 1)
2018-01-13 19:44:05 +02:00
def items_to_list(items):
if not isinstance(items, list):
items = [i.strip() for i in items.split(",")]
return [i.lower() for i in items if i]
def items_in_list(needle, haystack):
needle = items_to_list(needle)
haystack = items_to_list(haystack)
if "*" in needle or "*" in haystack:
return True
return set(needle) & set(haystack)
def parse_date(datestr):
if "T" in datestr and "Z" in datestr:
return time.strptime(datestr, "%Y-%m-%dT%H:%M:%SZ")
return time.strptime(datestr)
def format_filesize(filesize):
base = 1024
unit = 0
suffix = "B"
filesize = float(filesize)
if filesize < base:
return "%d%s" % (filesize, suffix)
for i, suffix in enumerate("KMGTPEZY"):
unit = base**(i + 2)
if filesize >= unit:
continue
if filesize % (base**(i + 1)):
return "%.2f%sB" % ((base * filesize / unit), suffix)
break
return "%d%sB" % ((base * filesize / unit), suffix)
2018-05-25 21:18:08 +03:00
def merge_dicts(d1, d2, path=None):
if path is None:
path = []
for key in d2:
if (key in d1 and isinstance(d1[key], dict)
and isinstance(d2[key], dict)):
merge_dicts(d1[key], d2[key], path + [str(key)])
else:
d1[key] = d2[key]
return d1
2019-01-11 14:07:35 +02:00
def get_file_contents(path):
try:
with open(path) as f:
return f.read()
except UnicodeDecodeError:
with open(path, encoding="latin-1") as f:
return f.read()
def ensure_udev_rules():
def _rules_to_set(rules_path):
return set(l.strip() for l in get_file_contents(rules_path).split("\n")
if l.strip() and not l.startswith("#"))
if "linux" not in get_systype():
return None
installed_rules = [
"/etc/udev/rules.d/99-platformio-udev.rules",
"/lib/udev/rules.d/99-platformio-udev.rules"
]
if not any(isfile(p) for p in installed_rules):
raise exception.MissedUdevRules
origin_path = abspath(
join(get_source_dir(), "..", "scripts", "99-platformio-udev.rules"))
if not isfile(origin_path):
return None
origin_rules = _rules_to_set(origin_path)
for rules_path in installed_rules:
if not isfile(rules_path):
continue
current_rules = _rules_to_set(rules_path)
if not origin_rules <= current_rules:
raise exception.OutdatedUdevRules(rules_path)
return True
def get_original_version(version):
if version.count(".") != 2:
return None
_, y = version.split(".")[:2]
if int(y) < 100:
return None
if len(y) % 2 != 0:
y = "0" + y
parts = [str(int(y[i * 2:i * 2 + 2])) for i in range(int(len(y) / 2))]
return ".".join(parts)
def rmtree_(path):
def _onerror(_, name, __):
try:
os.chmod(name, stat.S_IWRITE)
os.remove(name)
except Exception as e: # pylint: disable=broad-except
click.secho(
"%s \nPlease manually remove the file `%s`" % (str(e), name),
fg="red",
err=True)
return rmtree(path, onerror=_onerror)
#
# Glob.Escape from Python 3.4
# https://github.com/python/cpython/blob/master/Lib/glob.py#L161
#
try:
from glob import escape as glob_escape # pylint: disable=unused-import
except ImportError:
magic_check = re.compile('([*?[])')
magic_check_bytes = re.compile(b'([*?[])')
def glob_escape(pathname):
"""Escape all special characters."""
# Escaping is done by wrapping any of "*?[" between square brackets.
# Metacharacters do not work in the drive part and shouldn't be
# escaped.
drive, pathname = os.path.splitdrive(pathname)
if isinstance(pathname, bytes):
pathname = magic_check_bytes.sub(br'[\1]', pathname)
else:
pathname = magic_check.sub(r'[\1]', pathname)
return drive + pathname