Open sourcing PIO Unified Debugger, PIO Unit Testing Engine, and PIO Home Server

This commit is contained in:
Ivan Kravets
2019-05-27 17:19:33 +03:00
31 changed files with 2761 additions and 130 deletions

View File

@ -1,3 +1,3 @@
[settings]
line_length=79
known_third_party=bottle,click,pytest,requests,SCons,semantic_version,serial
known_third_party=bottle,click,pytest,requests,SCons,semantic_version,serial,twisted,autobahn,bs4,jsonrpc

View File

@ -1,42 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from os import getcwd
import click
from platformio.managers.core import pioplus_call
@click.command(
"debug",
context_settings=dict(ignore_unknown_options=True),
short_help="PIO Unified Debugger")
@click.option(
"-d",
"--project-dir",
default=getcwd,
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True))
@click.option("--environment", "-e", metavar="<environment>")
@click.option("--verbose", "-v", is_flag=True)
@click.option("--interface", type=click.Choice(["gdb"]))
@click.argument("__unprocessed", nargs=-1, type=click.UNPROCESSED)
def cli(*args, **kwargs): # pylint: disable=unused-argument
pioplus_call(sys.argv[1:])

View File

@ -12,20 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import click
from platformio.managers.core import pioplus_call
@click.command("home", short_help="PIO Home")
@click.option("--port", type=int, default=8008, help="HTTP port, default=8008")
@click.option(
"--host",
default="127.0.0.1",
help="HTTP host, default=127.0.0.1. "
"You can open PIO Home for inbound connections with --host=0.0.0.0")
@click.option("--no-open", is_flag=True)
def cli(*args, **kwargs): # pylint: disable=unused-argument
pioplus_call(sys.argv[1:])
from platformio.commands.debug.command import cli

View File

@ -0,0 +1,275 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import re
import time
from hashlib import sha1
from os.path import abspath, basename, dirname, isdir, join, splitext
from tempfile import mkdtemp
from twisted.internet import protocol # pylint: disable=import-error
from twisted.internet import reactor # pylint: disable=import-error
from twisted.internet import stdio # pylint: disable=import-error
from twisted.internet import task # pylint: disable=import-error
from platformio import app, exception, util
from platformio.commands.debug import helpers, initcfgs
from platformio.commands.debug.process import BaseProcess
from platformio.commands.debug.server import DebugServer
from platformio.compat import PY2
from platformio.telemetry import MeasurementProtocol
LOG_FILE = None
class GDBClient(BaseProcess): # pylint: disable=too-many-instance-attributes
PIO_SRC_NAME = ".pioinit"
INIT_COMPLETED_BANNER = "PlatformIO: Initialization completed"
def __init__(self, project_dir, args, debug_options, env_options):
self.project_dir = project_dir
self.args = list(args)
self.debug_options = debug_options
self.env_options = env_options
self._debug_server = DebugServer(debug_options, env_options)
self._session_id = None
if not isdir(util.get_cache_dir()):
os.makedirs(util.get_cache_dir())
self._gdbsrc_dir = mkdtemp(
dir=util.get_cache_dir(), prefix=".piodebug-")
self._target_is_run = False
self._last_server_activity = 0
self._auto_continue_timer = None
def spawn(self, gdb_path, prog_path):
session_hash = gdb_path + prog_path
self._session_id = sha1(
session_hash if PY2 else session_hash.encode()).hexdigest()
self._kill_previous_session()
patterns = {
"PROJECT_DIR": helpers.escape_path(self.project_dir),
"PROG_PATH": helpers.escape_path(prog_path),
"PROG_DIR": helpers.escape_path(dirname(prog_path)),
"PROG_NAME": basename(splitext(prog_path)[0]),
"DEBUG_PORT": self.debug_options['port'],
"UPLOAD_PROTOCOL": self.debug_options['upload_protocol'],
"INIT_BREAK": self.debug_options['init_break'] or "",
"LOAD_CMD": self.debug_options['load_cmd'] or "",
}
self._debug_server.spawn(patterns)
if not patterns['DEBUG_PORT']:
patterns['DEBUG_PORT'] = self._debug_server.get_debug_port()
self.generate_pioinit(self._gdbsrc_dir, patterns)
# start GDB client
args = [
"piogdb",
"-q",
"--directory", self._gdbsrc_dir,
"--directory", self.project_dir,
"-l", "10"
] # yapf: disable
args.extend(self.args)
if not gdb_path:
raise exception.DebugInvalidOptions("GDB client is not configured")
gdb_data_dir = self._get_data_dir(gdb_path)
if gdb_data_dir:
args.extend(["--data-directory", gdb_data_dir])
args.append(patterns['PROG_PATH'])
return reactor.spawnProcess(
self, gdb_path, args, path=self.project_dir, env=os.environ)
@staticmethod
def _get_data_dir(gdb_path):
if "msp430" in gdb_path:
return None
gdb_data_dir = abspath(join(dirname(gdb_path), "..", "share", "gdb"))
return gdb_data_dir if isdir(gdb_data_dir) else None
def generate_pioinit(self, dst_dir, patterns):
server_exe = (self.debug_options.get("server") or {}).get(
"executable", "").lower()
if "jlink" in server_exe:
cfg = initcfgs.GDB_JLINK_INIT_CONFIG
elif "st-util" in server_exe:
cfg = initcfgs.GDB_STUTIL_INIT_CONFIG
elif "mspdebug" in server_exe:
cfg = initcfgs.GDB_MSPDEBUG_INIT_CONFIG
elif self.debug_options['require_debug_port']:
cfg = initcfgs.GDB_BLACKMAGIC_INIT_CONFIG
else:
cfg = initcfgs.GDB_DEFAULT_INIT_CONFIG
commands = cfg.split("\n")
if self.debug_options['init_cmds']:
commands = self.debug_options['init_cmds']
commands.extend(self.debug_options['extra_cmds'])
if not any("define pio_reset_target" in cmd for cmd in commands):
commands = [
"define pio_reset_target",
" echo Warning! Undefined pio_reset_target command\\n",
" mon reset",
"end"
] + commands # yapf: disable
if not any("define pio_reset_halt_target" in cmd for cmd in commands):
commands = [
"define pio_reset_halt_target",
" echo Warning! Undefined pio_reset_halt_target command\\n",
" mon reset halt",
"end"
] + commands # yapf: disable
if not any("define pio_restart_target" in cmd for cmd in commands):
commands += [
"define pio_restart_target",
" pio_reset_halt_target",
" $INIT_BREAK",
" %s" % ("continue" if patterns['INIT_BREAK'] else "next"),
"end"
] # yapf: disable
banner = [
"echo PlatformIO Unified Debugger > http://bit.ly/pio-debug\\n",
"echo PlatformIO: Initializing remote target...\\n"
]
footer = ["echo %s\\n" % self.INIT_COMPLETED_BANNER]
commands = banner + commands + footer
with open(join(dst_dir, self.PIO_SRC_NAME), "w") as fp:
fp.write("\n".join(self.apply_patterns(commands, patterns)))
def connectionMade(self):
self._lock_session(self.transport.pid)
p = protocol.Protocol()
p.dataReceived = self.onStdInData
stdio.StandardIO(p)
def onStdInData(self, data):
self._last_server_activity = time.time()
if LOG_FILE:
with open(LOG_FILE, "ab") as fp:
fp.write(data)
if b"-exec-run" in data:
if self._target_is_run:
token, _ = data.split(b"-", 1)
self.outReceived(token + b"^running\n")
return
data = data.replace(b"-exec-run", b"-exec-continue")
if b"-exec-continue" in data:
self._target_is_run = True
if b"-gdb-exit" in data or data.strip() in (b"q", b"quit"):
self.transport.write(b"pio_reset_target\n")
self.transport.write(data)
def processEnded(self, reason): # pylint: disable=unused-argument
self._unlock_session()
if self._gdbsrc_dir and isdir(self._gdbsrc_dir):
util.rmtree_(self._gdbsrc_dir)
if self._debug_server:
self._debug_server.terminate()
reactor.stop()
def outReceived(self, data):
self._last_server_activity = time.time()
super(GDBClient, self).outReceived(data)
self._handle_error(data)
# go to init break automatically
if self.INIT_COMPLETED_BANNER.encode() in data:
self._auto_continue_timer = task.LoopingCall(
self._auto_exec_continue)
self._auto_continue_timer.start(0.1)
def errReceived(self, data):
super(GDBClient, self).errReceived(data)
self._handle_error(data)
def console_log(self, msg):
if helpers.is_mi_mode(self.args):
self.outReceived(('~"%s\\n"\n' % msg).encode())
else:
self.outReceived(("%s\n" % msg).encode())
def _auto_exec_continue(self):
auto_exec_delay = 0.5 # in seconds
if self._last_server_activity > (time.time() - auto_exec_delay):
return
if self._auto_continue_timer:
self._auto_continue_timer.stop()
self._auto_continue_timer = None
if not self.debug_options['init_break'] or self._target_is_run:
return
self.console_log(
"PlatformIO: Resume the execution to `debug_init_break = %s`" %
self.debug_options['init_break'])
self.transport.write(b"0-exec-continue\n" if helpers.
is_mi_mode(self.args) else b"continue\n")
self._target_is_run = True
def _handle_error(self, data):
if (self.PIO_SRC_NAME.encode() not in data
or b"Error in sourced" not in data):
return
configuration = {"debug": self.debug_options, "env": self.env_options}
exd = re.sub(r'\\(?!")', "/", json.dumps(configuration))
exd = re.sub(r'"(?:[a-z]\:)?((/[^"/]+)+)"', lambda m: '"%s"' % join(
*m.group(1).split("/")[-2:]), exd, re.I | re.M)
mp = MeasurementProtocol()
mp['exd'] = "DebugGDBPioInitError: %s" % exd
mp['exf'] = 1
mp.send("exception")
self.transport.loseConnection()
def _kill_previous_session(self):
assert self._session_id
pid = None
with app.ContentCache() as cc:
pid = cc.get(self._session_id)
cc.delete(self._session_id)
if not pid:
return
if "windows" in util.get_systype():
kill = ["Taskkill", "/PID", pid, "/F"]
else:
kill = ["kill", pid]
try:
util.exec_command(kill)
except: # pylint: disable=bare-except
pass
def _lock_session(self, pid):
if not self._session_id:
return
with app.ContentCache() as cc:
cc.set(self._session_id, str(pid), "1h")
def _unlock_session(self):
if not self._session_id:
return
with app.ContentCache() as cc:
cc.delete(self._session_id)

View File

@ -0,0 +1,150 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-arguments, too-many-statements
# pylint: disable=too-many-locals, too-many-branches
import os
from os.path import isfile, join
import click
from platformio import exception, util
from platformio.commands.debug import helpers
from platformio.managers.core import inject_contrib_pysite
from platformio.project.config import ProjectConfig
@click.command(
"debug",
context_settings=dict(ignore_unknown_options=True),
short_help="PIO Unified Debugger")
@click.option(
"-d",
"--project-dir",
default=os.getcwd,
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True))
@click.option(
"-c",
"--project-conf",
type=click.Path(
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True))
@click.option("--environment", "-e", metavar="<environment>")
@click.option("--verbose", "-v", is_flag=True)
@click.option("--interface", type=click.Choice(["gdb"]))
@click.argument("__unprocessed", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def cli(ctx, project_dir, project_conf, environment, verbose, interface,
__unprocessed):
try:
util.ensure_udev_rules()
except NameError:
pass
except exception.InvalidUdevRules as e:
for line in str(e).split("\n") + [""]:
click.echo(
('~"%s\\n"' if helpers.is_mi_mode(__unprocessed) else "%s") %
line)
if not util.is_platformio_project(project_dir) and os.getenv("CWD"):
project_dir = os.getenv("CWD")
with util.cd(project_dir):
config = ProjectConfig.get_instance(
project_conf or join(project_dir, "platformio.ini"))
config.validate(envs=[environment] if environment else None)
env_name = environment or helpers.get_default_debug_env(config)
env_options = config.items(env=env_name, as_dict=True)
if not set(env_options.keys()) >= set(["platform", "board"]):
raise exception.ProjectEnvsNotAvailable()
debug_options = helpers.validate_debug_options(ctx, env_options)
assert debug_options
if not interface:
return helpers.predebug_project(ctx, project_dir, env_name, False,
verbose)
configuration = helpers.load_configuration(ctx, project_dir, env_name)
if not configuration:
raise exception.DebugInvalidOptions(
"Could not load debug configuration")
if "--version" in __unprocessed:
result = util.exec_command([configuration['gdb_path'], "--version"])
if result['returncode'] == 0:
return click.echo(result['out'])
raise exception.PlatformioException("\n".join(
[result['out'], result['err']]))
debug_options['load_cmd'] = helpers.configure_esp32_load_cmd(
debug_options, configuration)
rebuild_prog = False
preload = debug_options['load_cmd'] == "preload"
load_mode = debug_options['load_mode']
if load_mode == "always":
rebuild_prog = (
preload
or not helpers.has_debug_symbols(configuration['prog_path']))
elif load_mode == "modified":
rebuild_prog = (
helpers.is_prog_obsolete(configuration['prog_path'])
or not helpers.has_debug_symbols(configuration['prog_path']))
else:
rebuild_prog = not isfile(configuration['prog_path'])
if preload or (not rebuild_prog and load_mode != "always"):
# don't load firmware through debug server
debug_options['load_cmd'] = None
if rebuild_prog:
if helpers.is_mi_mode(__unprocessed):
output = helpers.GDBBytesIO()
click.echo('~"Preparing firmware for debugging...\\n"')
with helpers.capture_std_streams(output):
helpers.predebug_project(ctx, project_dir, env_name, preload,
verbose)
output.close()
else:
click.echo("Preparing firmware for debugging...")
helpers.predebug_project(ctx, project_dir, env_name, preload,
verbose)
# save SHA sum of newly created prog
if load_mode == "modified":
helpers.is_prog_obsolete(configuration['prog_path'])
if not isfile(configuration['prog_path']):
raise exception.DebugInvalidOptions("Program/firmware is missed")
# run debugging client
inject_contrib_pysite()
from platformio.commands.debug.client import GDBClient, reactor
client = GDBClient(project_dir, __unprocessed, debug_options, env_options)
client.spawn(configuration['gdb_path'], configuration['prog_path'])
reactor.run()
return True

View File

@ -0,0 +1,297 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import sys
import time
from contextlib import contextmanager
from fnmatch import fnmatch
from hashlib import sha1
from io import BytesIO
from os.path import isfile
from platformio import VERSION, exception, util
from platformio.commands.platform import \
platform_install as cmd_platform_install
from platformio.commands.run import cli as cmd_run
from platformio.managers.platform import PlatformFactory
class GDBBytesIO(BytesIO): # pylint: disable=too-few-public-methods
STDOUT = sys.stdout
def write(self, text):
for line in text.strip().split("\n"):
self.STDOUT.write('~"%s\\n"\n' % line)
self.STDOUT.flush()
def is_mi_mode(args):
return "--interpreter" in " ".join(args)
def escape_path(path):
return path.replace("\\", "/")
def get_default_debug_env(config):
default_envs = config.default_envs()
return default_envs[0] if default_envs else config.envs()[0]
def validate_debug_options(cmd_ctx, env_options):
def _cleanup_cmds(cmds):
if not cmds:
return []
if not isinstance(cmds, list):
cmds = cmds.split("\n")
return [c.strip() for c in cmds if c.strip()]
try:
platform = PlatformFactory.newPlatform(env_options['platform'])
except exception.UnknownPlatform:
cmd_ctx.invoke(
cmd_platform_install,
platforms=[env_options['platform']],
skip_default_package=True)
platform = PlatformFactory.newPlatform(env_options['platform'])
board_config = platform.board_config(env_options['board'])
tool_name = board_config.get_debug_tool_name(env_options.get("debug_tool"))
tool_settings = board_config.get("debug", {}).get("tools", {}).get(
tool_name, {})
server_options = None
# specific server per a system
if isinstance(tool_settings.get("server", {}), list):
for item in tool_settings['server'][:]:
tool_settings['server'] = item
if util.get_systype() in item.get("system", []):
break
# user overwrites debug server
if env_options.get("debug_server"):
server_options = {
"cwd": None,
"executable": None,
"arguments": env_options.get("debug_server")
}
if not isinstance(server_options['arguments'], list):
server_options['arguments'] = server_options['arguments'].split(
"\n")
server_options['arguments'] = [
arg.strip() for arg in server_options['arguments'] if arg.strip()
]
server_options['executable'] = server_options['arguments'][0]
server_options['arguments'] = server_options['arguments'][1:]
elif "server" in tool_settings:
server_package = tool_settings['server'].get("package")
server_package_dir = platform.get_package_dir(
server_package) if server_package else None
if server_package and not server_package_dir:
platform.install_packages(
with_packages=[server_package],
skip_default_package=True,
silent=True)
server_package_dir = platform.get_package_dir(server_package)
server_options = dict(
cwd=server_package_dir if server_package else None,
executable=tool_settings['server'].get("executable"),
arguments=[
a.replace("$PACKAGE_DIR", escape_path(server_package_dir))
if server_package_dir else a
for a in tool_settings['server'].get("arguments", [])
])
extra_cmds = _cleanup_cmds(env_options.get("debug_extra_cmds"))
extra_cmds.extend(_cleanup_cmds(tool_settings.get("extra_cmds")))
result = dict(
tool=tool_name,
upload_protocol=env_options.get(
"upload_protocol",
board_config.get("upload", {}).get("protocol")),
load_cmd=env_options.get("debug_load_cmd",
tool_settings.get("load_cmd", "load")),
load_mode=env_options.get("debug_load_mode",
tool_settings.get("load_mode", "always")),
init_break=env_options.get(
"debug_init_break", tool_settings.get("init_break",
"tbreak main")),
init_cmds=_cleanup_cmds(
env_options.get("debug_init_cmds",
tool_settings.get("init_cmds"))),
extra_cmds=extra_cmds,
require_debug_port=tool_settings.get("require_debug_port", False),
port=reveal_debug_port(
env_options.get("debug_port", tool_settings.get("port")),
tool_name, tool_settings),
server=server_options)
return result
def predebug_project(ctx, project_dir, env_name, preload, verbose):
ctx.invoke(
cmd_run,
project_dir=project_dir,
environment=[env_name],
target=["__debug"] + (["upload"] if preload else []),
verbose=verbose)
if preload:
time.sleep(5)
@contextmanager
def capture_std_streams(stdout, stderr=None):
_stdout = sys.stdout
_stderr = sys.stderr
sys.stdout = stdout
sys.stderr = stderr or stdout
yield
sys.stdout = _stdout
sys.stderr = _stderr
def load_configuration(ctx, project_dir, env_name):
output = BytesIO()
with capture_std_streams(output):
ctx.invoke(
cmd_run,
project_dir=project_dir,
environment=[env_name],
target=["idedata"])
result = output.getvalue().decode()
output.close()
if '"includes":' not in result:
return None
for line in result.split("\n"):
line = line.strip()
if line.startswith('{"') and "cxx_path" in line:
return json.loads(line[:line.rindex("}") + 1])
return None
def configure_esp32_load_cmd(debug_options, configuration):
ignore_conds = [
debug_options['load_cmd'] != "load",
"xtensa-esp32" not in configuration.get("cc_path", ""),
not configuration.get("flash_extra_images"), not all([
isfile(item['path'])
for item in configuration.get("flash_extra_images")
])
]
if any(ignore_conds):
return debug_options['load_cmd']
mon_cmds = [
'monitor program_esp32 "{{{path}}}" {offset} verify'.format(
path=escape_path(item['path']), offset=item['offset'])
for item in configuration.get("flash_extra_images")
]
mon_cmds.append('monitor program_esp32 "{%s.bin}" 0x10000 verify' %
escape_path(configuration['prog_path'][:-4]))
return "\n".join(mon_cmds)
def has_debug_symbols(prog_path):
if not isfile(prog_path):
return False
matched = {
b".debug_info": False,
b".debug_abbrev": False,
b" -Og": False,
b" -g": False,
b"__PLATFORMIO_DEBUG__": (3, 6) > VERSION[:2]
}
with open(prog_path, "rb") as fp:
last_data = b""
while True:
data = fp.read(1024)
if not data:
break
for pattern, found in matched.items():
if found:
continue
if pattern in last_data + data:
matched[pattern] = True
last_data = data
return all(matched.values())
def is_prog_obsolete(prog_path):
prog_hash_path = prog_path + ".sha1"
if not isfile(prog_path):
return True
shasum = sha1()
with open(prog_path, "rb") as fp:
while True:
data = fp.read(1024)
if not data:
break
shasum.update(data)
new_digest = shasum.hexdigest()
old_digest = None
if isfile(prog_hash_path):
with open(prog_hash_path, "r") as fp:
old_digest = fp.read()
if new_digest == old_digest:
return False
with open(prog_hash_path, "w") as fp:
fp.write(new_digest)
return True
def reveal_debug_port(env_debug_port, tool_name, tool_settings):
def _get_pattern():
if not env_debug_port:
return None
if set(["*", "?", "[", "]"]) & set(env_debug_port):
return env_debug_port
return None
def _is_match_pattern(port):
pattern = _get_pattern()
if not pattern:
return True
return fnmatch(port, pattern)
def _look_for_serial_port(hwids):
for item in util.get_serialports(filter_hwid=True):
if not _is_match_pattern(item['port']):
continue
port = item['port']
if tool_name.startswith("blackmagic"):
if "windows" in util.get_systype() and \
port.startswith("COM") and len(port) > 4:
port = "\\\\.\\%s" % port
if "GDB" in item['description']:
return port
for hwid in hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item['hwid']:
return port
return None
if env_debug_port and not _get_pattern():
return env_debug_port
if not tool_settings.get("require_debug_port"):
return None
debug_port = _look_for_serial_port(tool_settings.get("hwids", []))
if not debug_port:
raise exception.DebugInvalidOptions(
"Please specify `debug_port` for environment")
return debug_port

View File

@ -0,0 +1,109 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
GDB_DEFAULT_INIT_CONFIG = """
define pio_reset_halt_target
monitor reset halt
end
define pio_reset_target
monitor reset
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
pio_reset_halt_target
$LOAD_CMD
monitor init
pio_reset_halt_target
"""
GDB_STUTIL_INIT_CONFIG = """
define pio_reset_halt_target
monitor halt
monitor reset
end
define pio_reset_target
monitor reset
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
pio_reset_halt_target
$LOAD_CMD
pio_reset_halt_target
"""
GDB_JLINK_INIT_CONFIG = """
define pio_reset_halt_target
monitor halt
monitor reset
end
define pio_reset_target
monitor reset
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
pio_reset_halt_target
$LOAD_CMD
pio_reset_halt_target
"""
GDB_BLACKMAGIC_INIT_CONFIG = """
define pio_reset_halt_target
set language c
set *0xE000ED0C = 0x05FA0004
set $busy = (*0xE000ED0C & 0x4)
while ($busy)
set $busy = (*0xE000ED0C & 0x4)
end
set language auto
end
define pio_reset_target
pio_reset_halt_target
end
target extended-remote $DEBUG_PORT
monitor swdp_scan
attach 1
set mem inaccessible-by-default off
$INIT_BREAK
$LOAD_CMD
set language c
set *0xE000ED0C = 0x05FA0004
set $busy = (*0xE000ED0C & 0x4)
while ($busy)
set $busy = (*0xE000ED0C & 0x4)
end
set language auto
"""
GDB_MSPDEBUG_INIT_CONFIG = """
define pio_reset_halt_target
end
define pio_reset_target
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
monitor erase
$LOAD_CMD
pio_reset_halt_target
"""

View File

@ -0,0 +1,73 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import click
from twisted.internet import protocol # pylint: disable=import-error
from platformio import util
from platformio.commands.debug import helpers
from platformio.compat import string_types
LOG_FILE = None
class BaseProcess(protocol.ProcessProtocol, object):
STDOUT_CHUNK_SIZE = 2048
COMMON_PATTERNS = {
"PLATFORMIO_HOME_DIR": helpers.escape_path(util.get_home_dir()),
"PYTHONEXE": os.getenv("PYTHONEXEPATH", "")
}
def apply_patterns(self, source, patterns=None):
_patterns = self.COMMON_PATTERNS.copy()
_patterns.update(patterns or {})
def _replace(text):
for key, value in _patterns.items():
pattern = "$%s" % key
text = text.replace(pattern, value or "")
return text
if isinstance(source, string_types):
source = _replace(source)
elif isinstance(source, (list, dict)):
items = enumerate(source) if isinstance(source,
list) else source.items()
for key, value in items:
if isinstance(value, string_types):
source[key] = _replace(value)
elif isinstance(value, (list, dict)):
source[key] = self.apply_patterns(value, patterns)
return source
def outReceived(self, data):
if LOG_FILE:
with open(LOG_FILE, "ab") as fp:
fp.write(data)
while data:
chunk = data[:self.STDOUT_CHUNK_SIZE]
click.echo(chunk, nl=False)
data = data[self.STDOUT_CHUNK_SIZE:]
@staticmethod
def errReceived(data):
if LOG_FILE:
with open(LOG_FILE, "ab") as fp:
fp.write(data)
click.echo(data, nl=False, err=True)

View File

@ -0,0 +1,110 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from os.path import isdir, isfile, join
from twisted.internet import reactor # pylint: disable=import-error
from platformio import exception, util
from platformio.commands.debug import helpers
from platformio.commands.debug.process import BaseProcess
class DebugServer(BaseProcess):
def __init__(self, debug_options, env_options):
self.debug_options = debug_options
self.env_options = env_options
self._debug_port = None
self._transport = None
def spawn(self, patterns): # pylint: disable=too-many-branches
systype = util.get_systype()
server = self.debug_options.get("server")
if not server:
return None
server = self.apply_patterns(server, patterns)
server_executable = server['executable']
if not server_executable:
return None
if server['cwd']:
server_executable = join(server['cwd'], server_executable)
if ("windows" in systype and not server_executable.endswith(".exe")
and isfile(server_executable + ".exe")):
server_executable = server_executable + ".exe"
if not isfile(server_executable):
server_executable = util.where_is_program(server_executable)
if not isfile(server_executable):
raise exception.DebugInvalidOptions(
"\nCould not launch Debug Server '%s'. Please check that it "
"is installed and is included in a system PATH\n\n"
"See documentation or contact contact@platformio.org:\n"
"http://docs.platformio.org/page/plus/debugging.html\n" %
server_executable)
self._debug_port = ":3333"
openocd_pipe_allowed = all([
not self.debug_options['port'], "openocd" in server_executable,
self.env_options['platform'] != "riscv"
])
if openocd_pipe_allowed:
args = []
if server['cwd']:
args.extend(["-s", helpers.escape_path(server['cwd'])])
args.extend([
"-c", "gdb_port pipe; tcl_port disabled; telnet_port disabled"
])
args.extend(server['arguments'])
str_args = " ".join(
[arg if arg.startswith("-") else '"%s"' % arg for arg in args])
self._debug_port = '| "%s" %s' % (
helpers.escape_path(server_executable), str_args)
else:
env = os.environ.copy()
# prepend server "lib" folder to LD path
if ("windows" not in systype and server['cwd']
and isdir(join(server['cwd'], "lib"))):
ld_key = ("DYLD_LIBRARY_PATH"
if "darwin" in systype else "LD_LIBRARY_PATH")
env[ld_key] = join(server['cwd'], "lib")
if os.environ.get(ld_key):
env[ld_key] = "%s:%s" % (env[ld_key],
os.environ.get(ld_key))
# prepend BIN to PATH
if server['cwd'] and isdir(join(server['cwd'], "bin")):
env['PATH'] = "%s%s%s" % (
join(server['cwd'], "bin"), os.pathsep,
os.environ.get("PATH", os.environ.get("Path", "")))
self._transport = reactor.spawnProcess(
self,
server_executable, [server_executable] + server['arguments'],
path=server['cwd'],
env=env)
if "mspdebug" in server_executable.lower():
self._debug_port = ":2000"
elif "jlink" in server_executable.lower():
self._debug_port = ":2331"
return self._transport
def get_debug_port(self):
return self._debug_port
def terminate(self):
if self._transport:
self._transport.signalProcess("KILL")

View File

@ -0,0 +1,15 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.commands.home.command import cli

View File

@ -0,0 +1,109 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mimetypes
import socket
from os.path import isdir
import click
from platformio import exception
from platformio.managers.core import (get_core_package_dir,
inject_contrib_pysite)
@click.command("home", short_help="PIO Home")
@click.option("--port", type=int, default=8008, help="HTTP port, default=8008")
@click.option(
"--host",
default="127.0.0.1",
help="HTTP host, default=127.0.0.1. "
"You can open PIO Home for inbound connections with --host=0.0.0.0")
@click.option("--no-open", is_flag=True) # pylint: disable=too-many-locals
def cli(port, host, no_open):
# import contrib modules
inject_contrib_pysite()
# pylint: disable=import-error
from autobahn.twisted.resource import WebSocketResource
from twisted.internet import reactor
from twisted.web import server
# pylint: enable=import-error
from platformio.commands.home.rpc.handlers.app import AppRPC
from platformio.commands.home.rpc.handlers.ide import IDERPC
from platformio.commands.home.rpc.handlers.misc import MiscRPC
from platformio.commands.home.rpc.handlers.os import OSRPC
from platformio.commands.home.rpc.handlers.piocore import PIOCoreRPC
from platformio.commands.home.rpc.handlers.project import ProjectRPC
from platformio.commands.home.rpc.server import JSONRPCServerFactory
from platformio.commands.home.web import WebRoot
factory = JSONRPCServerFactory()
factory.addHandler(AppRPC(), namespace="app")
factory.addHandler(IDERPC(), namespace="ide")
factory.addHandler(MiscRPC(), namespace="misc")
factory.addHandler(OSRPC(), namespace="os")
factory.addHandler(PIOCoreRPC(), namespace="core")
factory.addHandler(ProjectRPC(), namespace="project")
contrib_dir = get_core_package_dir("contrib-piohome")
if not isdir(contrib_dir):
raise exception.PlatformioException("Invalid path to PIO Home Contrib")
# Ensure PIO Home mimetypes are known
mimetypes.add_type("text/html", ".html")
mimetypes.add_type("text/css", ".css")
mimetypes.add_type("application/javascript", ".js")
root = WebRoot(contrib_dir)
root.putChild(b"wsrpc", WebSocketResource(factory))
site = server.Site(root)
# hook for `platformio-node-helpers`
if host == "__do_not_start__":
return
# if already started
already_started = False
socket.setdefaulttimeout(1)
try:
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
already_started = True
except: # pylint: disable=bare-except
pass
home_url = "http://%s:%d" % (host, port)
if not no_open:
if already_started:
click.launch(home_url)
else:
reactor.callLater(1, lambda: click.launch(home_url))
click.echo("\n".join([
"",
" ___I_",
" /\\-_--\\ PlatformIO Home",
"/ \\_-__\\",
"|[]| [] | %s" % home_url,
"|__|____|______________%s" % ("_" * len(host)),
]))
click.echo("")
click.echo("Open PIO Home in your browser by this URL => %s" % home_url)
if already_started:
return
click.echo("PIO Home has been started. Press Ctrl+C to shutdown.")
reactor.listenTCP(port, site, interface=host)
reactor.run()

View File

@ -0,0 +1,69 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=keyword-arg-before-vararg, arguments-differ
import os
import socket
import requests
from twisted.internet import defer # pylint: disable=import-error
from twisted.internet import reactor # pylint: disable=import-error
from twisted.internet import threads # pylint: disable=import-error
from platformio import util
class AsyncSession(requests.Session):
def __init__(self, n=None, *args, **kwargs):
if n:
pool = reactor.getThreadPool()
pool.adjustPoolsize(0, n)
super(AsyncSession, self).__init__(*args, **kwargs)
def request(self, *args, **kwargs):
func = super(AsyncSession, self).request
return threads.deferToThread(func, *args, **kwargs)
def wrap(self, *args, **kwargs): # pylint: disable=no-self-use
return defer.ensureDeferred(*args, **kwargs)
@util.memoized(expire=5000)
def requests_session():
return AsyncSession(n=5)
@util.memoized()
def get_core_fullpath():
return util.where_is_program(
"platformio" + (".exe" if "windows" in util.get_systype() else ""))
@util.memoized(expire=10000)
def is_twitter_blocked():
ip = "104.244.42.1"
timeout = 2
try:
if os.getenv("HTTP_PROXY", os.getenv("HTTPS_PROXY")):
requests.get(
"http://%s" % ip, allow_redirects=False, timeout=timeout)
else:
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((ip, 80))
return False
except: # pylint: disable=bare-except
pass
return True

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,83 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import json
from os.path import expanduser, isfile, join
from platformio import __version__, app, exception, util
from platformio.compat import path_to_unicode
class AppRPC(object):
APPSTATE_PATH = join(util.get_home_dir(), "homestate.json")
@staticmethod
def load_state():
state = None
try:
if isfile(AppRPC.APPSTATE_PATH):
state = util.load_json(AppRPC.APPSTATE_PATH)
except exception.PlatformioException:
pass
if not isinstance(state, dict):
state = {}
storage = state.get("storage", {})
# base data
caller_id = app.get_session_var("caller_id")
storage['cid'] = app.get_cid()
storage['coreVersion'] = __version__
storage['coreSystype'] = util.get_systype()
storage['coreCaller'] = (str(caller_id).lower() if caller_id else None)
storage['coreSettings'] = {
name: {
"description": data['description'],
"default_value": data['value'],
"value": app.get_setting(name)
}
for name, data in app.DEFAULT_SETTINGS.items()
}
# encode to UTF-8
for key in storage['coreSettings']:
if not key.endswith("dir"):
continue
storage['coreSettings'][key]['default_value'] = path_to_unicode(
storage['coreSettings'][key]['default_value'])
storage['coreSettings'][key]['value'] = path_to_unicode(
storage['coreSettings'][key]['value'])
storage['homeDir'] = path_to_unicode(expanduser("~"))
storage['projectsDir'] = storage['coreSettings']['projects_dir'][
'value']
# skip non-existing recent projects
storage['recentProjects'] = [
p for p in storage.get("recentProjects", [])
if util.is_platformio_project(p)
]
state['storage'] = storage
return state
@staticmethod
def get_state():
return AppRPC.load_state()
def save_state(self, state):
with open(self.APPSTATE_PATH, "w") as fp:
json.dump(state, fp)
return True

View File

@ -0,0 +1,42 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import jsonrpc # pylint: disable=import-error
from twisted.internet import defer # pylint: disable=import-error
class IDERPC(object):
def __init__(self):
self._queue = []
def send_command(self, command, params):
if not self._queue:
raise jsonrpc.exceptions.JSONRPCDispatchException(
code=4005, message="PIO Home IDE agent is not started")
while self._queue:
self._queue.pop().callback({
"id": time.time(),
"method": command,
"params": params
})
def listen_commands(self):
self._queue.append(defer.Deferred())
return self._queue[-1]
def open_project(self, project_dir):
return self.send_command("open_project", project_dir)

View File

@ -0,0 +1,194 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
import time
from bs4 import BeautifulSoup # pylint: disable=import-error
from twisted.internet import defer, reactor # pylint: disable=import-error
from platformio import app
from platformio.commands.home import helpers
from platformio.commands.home.rpc.handlers.os import OSRPC
class MiscRPC(object):
def load_latest_tweets(self, username):
cache_key = "piohome_latest_tweets_%s" % username
cache_valid = "7d"
with app.ContentCache() as cc:
cache_data = cc.get(cache_key)
if cache_data:
cache_data = json.loads(cache_data)
# automatically update cache in background every 12 hours
if cache_data['time'] < (time.time() - (3600 * 12)):
reactor.callLater(5, self._preload_latest_tweets, username,
cache_key, cache_valid)
return cache_data['result']
result = self._preload_latest_tweets(username, cache_key, cache_valid)
return result
@defer.inlineCallbacks
def _preload_latest_tweets(self, username, cache_key, cache_valid):
result = yield self._fetch_tweets(username)
with app.ContentCache() as cc:
cc.set(cache_key,
json.dumps({
"time": int(time.time()),
"result": result
}), cache_valid)
defer.returnValue(result)
@defer.inlineCallbacks
def _fetch_tweets(self, username):
api_url = ("https://twitter.com/i/profiles/show/%s/timeline/tweets?"
"include_available_features=1&include_entities=1&"
"include_new_items_bar=true") % username
if helpers.is_twitter_blocked():
api_url = self._get_proxed_uri(api_url)
html_or_json = yield OSRPC.fetch_content(
api_url, headers=self._get_twitter_headers(username))
# issue with PIO Core < 3.5.3 and ContentCache
if not isinstance(html_or_json, dict):
html_or_json = json.loads(html_or_json)
assert "items_html" in html_or_json
soup = BeautifulSoup(html_or_json['items_html'], "html.parser")
tweet_nodes = soup.find_all(
"div", attrs={
"class": "tweet",
"data-tweet-id": True
})
result = yield defer.DeferredList(
[self._parse_tweet_node(node, username) for node in tweet_nodes],
consumeErrors=True)
defer.returnValue([r[1] for r in result if r[0]])
@defer.inlineCallbacks
def _parse_tweet_node(self, tweet, username):
# remove non-visible items
for node in tweet.find_all(class_=["invisible", "u-hidden"]):
node.decompose()
twitter_url = "https://twitter.com"
time_node = tweet.find("span", attrs={"data-time": True})
text_node = tweet.find(class_="tweet-text")
quote_text_node = tweet.find(class_="QuoteTweet-text")
if quote_text_node and not text_node.get_text().strip():
text_node = quote_text_node
photos = [
node.get("data-image-url") for node in (tweet.find_all(class_=[
"AdaptiveMedia-photoContainer", "QuoteMedia-photoContainer"
]) or [])
]
urls = [
node.get("data-expanded-url")
for node in (quote_text_node or text_node).find_all(
class_="twitter-timeline-link",
attrs={"data-expanded-url": True})
]
# fetch data from iframe card
if (not photos or not urls) and tweet.get("data-card2-type"):
iframe_node = tweet.find(
"div", attrs={"data-full-card-iframe-url": True})
if iframe_node:
iframe_card = yield self._fetch_iframe_card(
twitter_url + iframe_node.get("data-full-card-iframe-url"),
username)
if not photos and iframe_card['photo']:
photos.append(iframe_card['photo'])
if not urls and iframe_card['url']:
urls.append(iframe_card['url'])
if iframe_card['text_node']:
text_node = iframe_card['text_node']
if not photos:
photos.append(tweet.find("img", class_="avatar").get("src"))
def _fetch_text(text_node):
text = text_node.decode_contents(formatter="html").strip()
text = re.sub(r'href="/', 'href="%s/' % twitter_url, text)
if "</p>" not in text and "<br" not in text:
text = re.sub(r"\n+", "<br />", text)
return text
defer.returnValue({
"tweetId":
tweet.get("data-tweet-id"),
"tweetUrl":
twitter_url + tweet.get("data-permalink-path"),
"author":
tweet.get("data-name"),
"time":
int(time_node.get("data-time")),
"timeFormatted":
time_node.string,
"text":
_fetch_text(text_node),
"entries": {
"urls":
urls,
"photos": [
self._get_proxed_uri(uri)
if helpers.is_twitter_blocked() else uri for uri in photos
]
},
"isPinned":
"user-pinned" in tweet.get("class")
})
@defer.inlineCallbacks
def _fetch_iframe_card(self, url, username):
if helpers.is_twitter_blocked():
url = self._get_proxed_uri(url)
html = yield OSRPC.fetch_content(
url, headers=self._get_twitter_headers(username), cache_valid="7d")
soup = BeautifulSoup(html, "html.parser")
photo_node = soup.find("img", attrs={"data-src": True})
url_node = soup.find("a", class_="TwitterCard-container")
text_node = soup.find("div", class_="SummaryCard-content")
if text_node:
text_node.find(
"span", class_="SummaryCard-destination").decompose()
defer.returnValue({
"photo":
photo_node.get("data-src") if photo_node else None,
"text_node":
text_node,
"url":
url_node.get("href") if url_node else None
})
@staticmethod
def _get_proxed_uri(uri):
index = uri.index("://")
return "https://dl.platformio.org/__prx__/" + uri[index + 3:]
@staticmethod
def _get_twitter_headers(username):
return {
"Accept":
"application/json, text/javascript, */*; q=0.01",
"Referer":
"https://twitter.com/%s" % username,
"User-Agent":
("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit"
"/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8"),
"X-Twitter-Active-User":
"yes",
"X-Requested-With":
"XMLHttpRequest"
}

View File

@ -0,0 +1,153 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import glob
import os
import shutil
from functools import cmp_to_key
from os.path import expanduser, isdir, isfile, join
import click
from twisted.internet import defer # pylint: disable=import-error
from platformio import app, util
from platformio.commands.home import helpers
from platformio.compat import PY2, get_filesystem_encoding, path_to_unicode
class OSRPC(object):
@staticmethod
@defer.inlineCallbacks
def fetch_content(uri, data=None, headers=None, cache_valid=None):
timeout = 2
if not headers:
headers = {
"User-Agent":
("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) "
"AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 "
"Safari/603.3.8")
}
cache_key = (app.ContentCache.key_from_args(uri, data)
if cache_valid else None)
with app.ContentCache() as cc:
if cache_key:
result = cc.get(cache_key)
if result is not None:
defer.returnValue(result)
# check internet before and resolve issue with 60 seconds timeout
util.internet_on(raise_exception=True)
session = helpers.requests_session()
if data:
r = yield session.post(
uri, data=data, headers=headers, timeout=timeout)
else:
r = yield session.get(uri, headers=headers, timeout=timeout)
r.raise_for_status()
result = r.text
if cache_valid:
with app.ContentCache() as cc:
cc.set(cache_key, result, cache_valid)
defer.returnValue(result)
def request_content(self, uri, data=None, headers=None, cache_valid=None):
if uri.startswith('http'):
return self.fetch_content(uri, data, headers, cache_valid)
if isfile(uri):
with open(uri) as fp:
return fp.read()
return None
@staticmethod
def open_url(url):
return click.launch(url)
@staticmethod
def reveal_file(path):
return click.launch(
path.encode(get_filesystem_encoding()) if PY2 else path,
locate=True)
@staticmethod
def is_file(path):
return isfile(path)
@staticmethod
def is_dir(path):
return isdir(path)
@staticmethod
def make_dirs(path):
return os.makedirs(path)
@staticmethod
def rename(src, dst):
return os.rename(src, dst)
@staticmethod
def copy(src, dst):
return shutil.copytree(src, dst)
@staticmethod
def glob(pathnames, root=None):
if not isinstance(pathnames, list):
pathnames = [pathnames]
result = set()
for pathname in pathnames:
result |= set(
glob.glob(join(root, pathname) if root else pathname))
return list(result)
@staticmethod
def list_dir(path):
def _cmp(x, y):
if x[1] and not y[1]:
return -1
if not x[1] and y[1]:
return 1
if x[0].lower() > y[0].lower():
return 1
if x[0].lower() < y[0].lower():
return -1
return 0
items = []
if path.startswith("~"):
path = expanduser(path)
if not isdir(path):
return items
for item in os.listdir(path):
try:
item_is_dir = isdir(join(path, item))
if item_is_dir:
os.listdir(join(path, item))
items.append((item, item_is_dir))
except OSError:
pass
return sorted(items, key=cmp_to_key(_cmp))
@staticmethod
def get_logical_devices():
items = []
for item in util.get_logical_devices():
if item['name']:
item['name'] = path_to_unicode(item['name'])
items.append(item)
return items

View File

@ -0,0 +1,83 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import json
import os
import re
import jsonrpc # pylint: disable=import-error
from twisted.internet import utils # pylint: disable=import-error
from platformio import __version__
from platformio.commands.home import helpers
from platformio.compat import get_filesystem_encoding, string_types
class PIOCoreRPC(object):
@staticmethod
def call(args, options=None):
json_output = "--json-output" in args
try:
args = [
arg.encode(get_filesystem_encoding()) if isinstance(
arg, string_types) else str(arg) for arg in args
]
except UnicodeError:
raise jsonrpc.exceptions.JSONRPCDispatchException(
code=4002, message="PIO Core: non-ASCII chars in arguments")
d = utils.getProcessOutputAndValue(
helpers.get_core_fullpath(),
args,
path=(options or {}).get("cwd"),
env={k: v
for k, v in os.environ.items() if "%" not in k})
d.addCallback(PIOCoreRPC._call_callback, json_output)
d.addErrback(PIOCoreRPC._call_errback)
return d
@staticmethod
def _call_callback(result, json_output=False):
result = list(result)
assert len(result) == 3
for i in (0, 1):
result[i] = result[i].decode(get_filesystem_encoding()).strip()
out, err, code = result
text = ("%s\n\n%s" % (out, err)).strip()
if code != 0:
raise Exception(text)
if not json_output:
return text
try:
return json.loads(out)
except ValueError as e:
if "sh: " in out:
return json.loads(
re.sub(r"^sh: [^\n]+$", "", out, flags=re.M).strip())
raise e
@staticmethod
def _call_errback(failure):
raise jsonrpc.exceptions.JSONRPCDispatchException(
code=4003,
message="PIO Core Call Error",
data=failure.getErrorMessage())
@staticmethod
def version():
return __version__

View File

@ -0,0 +1,277 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import os
import shutil
import time
from os.path import (basename, expanduser, getmtime, isdir, isfile, join,
realpath, sep)
import jsonrpc # pylint: disable=import-error
from platformio import exception, util
from platformio.commands.home.rpc.handlers.app import AppRPC
from platformio.commands.home.rpc.handlers.piocore import PIOCoreRPC
from platformio.compat import get_filesystem_encoding
from platformio.ide.projectgenerator import ProjectGenerator
from platformio.managers.platform import PlatformManager
from platformio.project.config import ProjectConfig
class ProjectRPC(object):
@staticmethod
def _get_projects(project_dirs=None):
def _get_project_data(project_dir):
data = {"boards": [], "libExtraDirs": []}
config = ProjectConfig(join(project_dir, "platformio.ini"))
config.validate(validate_options=False)
if config.has_section("platformio") and \
config.has_option("platformio", "lib_extra_dirs"):
data['libExtraDirs'].extend(
util.parse_conf_multi_values(
config.get("platformio", "lib_extra_dirs")))
for section in config.sections():
if not section.startswith("env:"):
continue
if config.has_option(section, "board"):
data['boards'].append(config.get(section, "board"))
if config.has_option(section, "lib_extra_dirs"):
data['libExtraDirs'].extend(
util.parse_conf_multi_values(
config.get(section, "lib_extra_dirs")))
# resolve libExtraDirs paths
with util.cd(project_dir):
data['libExtraDirs'] = [
expanduser(d) if d.startswith("~") else realpath(d)
for d in data['libExtraDirs']
]
# skip non existing folders
data['libExtraDirs'] = [
d for d in data['libExtraDirs'] if isdir(d)
]
return data
def _path_to_name(path):
return (sep).join(path.split(sep)[-2:])
if not project_dirs:
project_dirs = AppRPC.load_state()['storage']['recentProjects']
result = []
pm = PlatformManager()
for project_dir in project_dirs:
data = {}
boards = []
try:
data = _get_project_data(project_dir)
except exception.PlatformIOProjectException:
continue
for board_id in data.get("boards", []):
name = board_id
try:
name = pm.board_config(board_id)['name']
except (exception.UnknownBoard, exception.UnknownPlatform):
pass
boards.append({"id": board_id, "name": name})
result.append({
"path":
project_dir,
"name":
_path_to_name(project_dir),
"modified":
int(getmtime(project_dir)),
"boards":
boards,
"extraLibStorages": [{
"name": _path_to_name(d),
"path": d
} for d in data.get("libExtraDirs", [])]
})
return result
def get_projects(self, project_dirs=None):
return self._get_projects(project_dirs)
def init(self, board, framework, project_dir):
assert project_dir
state = AppRPC.load_state()
if not isdir(project_dir):
os.makedirs(project_dir)
args = ["init", "--project-dir", project_dir, "--board", board]
if framework:
args.extend(["--project-option", "framework = %s" % framework])
if (state['storage']['coreCaller'] and state['storage']['coreCaller']
in ProjectGenerator.get_supported_ides()):
args.extend(["--ide", state['storage']['coreCaller']])
d = PIOCoreRPC.call(args)
d.addCallback(self._generate_project_main, project_dir, framework)
return d
@staticmethod
def _generate_project_main(_, project_dir, framework):
main_content = None
if framework == "arduino":
main_content = "\n".join([
"#include <Arduino.h>",
"",
"void setup() {",
" // put your setup code here, to run once:",
"}",
"",
"void loop() {",
" // put your main code here, to run repeatedly:",
"}"
""
]) # yapf: disable
elif framework == "mbed":
main_content = "\n".join([
"#include <mbed.h>",
"",
"int main() {",
"",
" // put your setup code here, to run once:",
"",
" while(1) {",
" // put your main code here, to run repeatedly:",
" }",
"}",
""
]) # yapf: disable
if not main_content:
return project_dir
with util.cd(project_dir):
src_dir = util.get_projectsrc_dir()
main_path = join(src_dir, "main.cpp")
if isfile(main_path):
return project_dir
if not isdir(src_dir):
os.makedirs(src_dir)
with open(main_path, "w") as f:
f.write(main_content.strip())
return project_dir
def import_arduino(self, board, use_arduino_libs, arduino_project_dir):
# don't import PIO Project
if util.is_platformio_project(arduino_project_dir):
return arduino_project_dir
is_arduino_project = any([
isfile(
join(arduino_project_dir,
"%s.%s" % (basename(arduino_project_dir), ext)))
for ext in ("ino", "pde")
])
if not is_arduino_project:
raise jsonrpc.exceptions.JSONRPCDispatchException(
code=4000,
message="Not an Arduino project: %s" % arduino_project_dir)
state = AppRPC.load_state()
project_dir = join(state['storage']['projectsDir'].decode("utf-8"),
time.strftime("%y%m%d-%H%M%S-") + board)
if not isdir(project_dir):
os.makedirs(project_dir)
args = ["init", "--project-dir", project_dir, "--board", board]
args.extend(["--project-option", "framework = arduino"])
if use_arduino_libs:
args.extend([
"--project-option",
"lib_extra_dirs = ~/Documents/Arduino/libraries"
])
if (state['storage']['coreCaller'] and state['storage']['coreCaller']
in ProjectGenerator.get_supported_ides()):
args.extend(["--ide", state['storage']['coreCaller']])
d = PIOCoreRPC.call(args)
d.addCallback(self._finalize_arduino_import, project_dir,
arduino_project_dir)
return d
@staticmethod
def _finalize_arduino_import(_, project_dir, arduino_project_dir):
with util.cd(project_dir):
src_dir = util.get_projectsrc_dir()
if isdir(src_dir):
util.rmtree_(src_dir)
shutil.copytree(
arduino_project_dir.encode(get_filesystem_encoding()), src_dir)
return project_dir
@staticmethod
def get_project_examples():
result = []
for manifest in PlatformManager().get_installed():
examples_dir = join(manifest['__pkg_dir'], "examples")
if not isdir(examples_dir):
continue
items = []
for project_dir, _, __ in os.walk(examples_dir):
project_description = None
try:
config = ProjectConfig(join(project_dir, "platformio.ini"))
config.validate(validate_options=False)
if config.has_section("platformio") and \
config.has_option("platformio", "description"):
project_description = config.get(
"platformio", "description")
except exception.PlatformIOProjectException:
continue
path_tokens = project_dir.split(sep)
items.append({
"name":
"/".join(path_tokens[path_tokens.index("examples") + 1:]),
"path":
project_dir,
"description":
project_description
})
result.append({
"platform": {
"title": manifest['title'],
"version": manifest['version']
},
"items": sorted(items, key=lambda item: item['name'])
})
return sorted(result, key=lambda data: data['platform']['title'])
@staticmethod
def import_pio(project_dir):
if not project_dir or not util.is_platformio_project(project_dir):
raise jsonrpc.exceptions.JSONRPCDispatchException(
code=4001,
message="Not an PlatformIO project: %s" % project_dir)
new_project_dir = join(
AppRPC.load_state()['storage']['projectsDir'].decode("utf-8"),
time.strftime("%y%m%d-%H%M%S-") + basename(project_dir))
shutil.copytree(project_dir, new_project_dir)
state = AppRPC.load_state()
args = ["init", "--project-dir", new_project_dir]
if (state['storage']['coreCaller'] and state['storage']['coreCaller']
in ProjectGenerator.get_supported_ides()):
args.extend(["--ide", state['storage']['coreCaller']])
d = PIOCoreRPC.call(args)
d.addCallback(lambda _: new_project_dir)
return d

View File

@ -0,0 +1,72 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=import-error
import json
import jsonrpc
from autobahn.twisted.websocket import (WebSocketServerFactory,
WebSocketServerProtocol)
from jsonrpc.exceptions import JSONRPCDispatchException
from twisted.internet import defer
class JSONRPCServerProtocol(WebSocketServerProtocol):
def onMessage(self, payload, isBinary): # pylint: disable=unused-argument
# print("> %s" % payload)
response = jsonrpc.JSONRPCResponseManager.handle(
payload, self.factory.dispatcher).data
# if error
if "result" not in response:
self.sendJSONResponse(response)
return None
d = defer.maybeDeferred(lambda: response['result'])
d.addCallback(self._callback, response)
d.addErrback(self._errback, response)
return None
def _callback(self, result, response):
response['result'] = result
self.sendJSONResponse(response)
def _errback(self, failure, response):
if isinstance(failure.value, JSONRPCDispatchException):
e = failure.value
else:
e = JSONRPCDispatchException(
code=4999, message=failure.getErrorMessage())
del response["result"]
response['error'] = e.error._data # pylint: disable=protected-access
print(response['error'])
self.sendJSONResponse(response)
def sendJSONResponse(self, response):
# print("< %s" % response)
self.sendMessage(json.dumps(response).encode("utf8"))
class JSONRPCServerFactory(WebSocketServerFactory):
protocol = JSONRPCServerProtocol
def __init__(self):
super(JSONRPCServerFactory, self).__init__()
self.dispatcher = jsonrpc.Dispatcher()
def addHandler(self, handler, namespace):
self.dispatcher.build_method_map(handler, prefix="%s." % namespace)

View File

@ -0,0 +1,30 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import reactor # pylint: disable=import-error
from twisted.web import static # pylint: disable=import-error
class WebRoot(static.File):
def render_GET(self, request):
if request.args.get("__shutdown__", False):
reactor.stop()
return "Server has been stopped"
request.setHeader("cache-control",
"no-cache, no-store, must-revalidate")
request.setHeader("pragma", "no-cache")
request.setHeader("expires", "0")
return static.File.render_GET(self, request)

View File

@ -274,7 +274,7 @@ def lib_list(ctx, json_output):
items = lm.get_installed()
if json_output:
json_result[storage_dir] = items
else:
elif items:
for item in sorted(items, key=lambda i: i['name']):
print_lib_item(item)
else:

View File

@ -1,67 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from os import getcwd
import click
from platformio.managers.core import pioplus_call
@click.command("test", short_help="Local Unit Testing")
@click.option("--environment", "-e", multiple=True, metavar="<environment>")
@click.option(
"--filter",
"-f",
multiple=True,
metavar="<pattern>",
help="Filter tests by a pattern")
@click.option(
"--ignore",
"-i",
multiple=True,
metavar="<pattern>",
help="Ignore tests by a pattern")
@click.option("--upload-port")
@click.option("--test-port")
@click.option(
"-d",
"--project-dir",
default=getcwd,
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True))
@click.option("--without-building", is_flag=True)
@click.option("--without-uploading", is_flag=True)
@click.option(
"--no-reset",
is_flag=True,
help="Disable software reset via Serial.DTR/RST")
@click.option(
"--monitor-rts",
default=None,
type=click.IntRange(0, 1),
help="Set initial RTS line state for Serial Monitor")
@click.option(
"--monitor-dtr",
default=None,
type=click.IntRange(0, 1),
help="Set initial DTR line state for Serial Monitor")
@click.option("--verbose", "-v", is_flag=True)
def cli(*args, **kwargs): # pylint: disable=unused-argument
pioplus_call(sys.argv[1:])

View File

@ -0,0 +1,15 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.commands.test.command import cli

View File

@ -0,0 +1,186 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-arguments, too-many-locals, too-many-branches
from fnmatch import fnmatch
from os import getcwd, listdir
from os.path import isdir, join
from time import time
import click
from platformio import exception, util
from platformio.commands.run import print_header
from platformio.commands.test.embedded import EmbeddedTestProcessor
from platformio.commands.test.native import NativeTestProcessor
from platformio.project.config import ProjectConfig
@click.command("test", short_help="Unit Testing")
@click.option("--environment", "-e", multiple=True, metavar="<environment>")
@click.option(
"--filter",
"-f",
multiple=True,
metavar="<pattern>",
help="Filter tests by a pattern")
@click.option(
"--ignore",
"-i",
multiple=True,
metavar="<pattern>",
help="Ignore tests by a pattern")
@click.option("--upload-port")
@click.option("--test-port")
@click.option(
"-d",
"--project-dir",
default=getcwd,
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True))
@click.option(
"-c",
"--project-conf",
type=click.Path(
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True))
@click.option("--without-building", is_flag=True)
@click.option("--without-uploading", is_flag=True)
@click.option("--without-testing", is_flag=True)
@click.option("--no-reset", is_flag=True)
@click.option(
"--monitor-rts",
default=None,
type=click.IntRange(0, 1),
help="Set initial RTS line state for Serial Monitor")
@click.option(
"--monitor-dtr",
default=None,
type=click.IntRange(0, 1),
help="Set initial DTR line state for Serial Monitor")
@click.option("--verbose", "-v", is_flag=True)
@click.pass_context
def cli( # pylint: disable=redefined-builtin
ctx, environment, ignore, filter, upload_port, test_port, project_dir,
project_conf, without_building, without_uploading, without_testing,
no_reset, monitor_rts, monitor_dtr, verbose):
with util.cd(project_dir):
test_dir = util.get_projecttest_dir()
if not isdir(test_dir):
raise exception.TestDirNotExists(test_dir)
test_names = get_test_names(test_dir)
config = ProjectConfig.get_instance(
project_conf or join(project_dir, "platformio.ini"))
config.validate(envs=environment)
click.echo("Verbose mode can be enabled via `-v, --verbose` option")
click.echo("Collected %d items" % len(test_names))
results = []
start_time = time()
default_envs = config.default_envs()
for testname in test_names:
for envname in config.envs():
section = "env:%s" % envname
# filter and ignore patterns
patterns = dict(filter=list(filter), ignore=list(ignore))
for key in patterns:
if config.has_option(section, "test_%s" % key):
patterns[key].extend(
config.getlist(section, "test_%s" % key))
skip_conditions = [
environment and envname not in environment,
not environment and default_envs
and envname not in default_envs,
testname != "*" and patterns['filter'] and
not any([fnmatch(testname, p)
for p in patterns['filter']]),
testname != "*"
and any([fnmatch(testname, p)
for p in patterns['ignore']]),
]
if any(skip_conditions):
results.append((None, testname, envname))
continue
cls = (NativeTestProcessor
if config.get(section, "platform") == "native" else
EmbeddedTestProcessor)
tp = cls(
ctx, testname, envname,
dict(
project_config=config,
project_dir=project_dir,
upload_port=upload_port,
test_port=test_port,
without_building=without_building,
without_uploading=without_uploading,
without_testing=without_testing,
no_reset=no_reset,
monitor_rts=monitor_rts,
monitor_dtr=monitor_dtr,
verbose=verbose))
results.append((tp.process(), testname, envname))
if without_testing:
return
click.echo()
print_header("[%s]" % click.style("TEST SUMMARY"))
passed = True
for result in results:
status, testname, envname = result
status_str = click.style("PASSED", fg="green")
if status is False:
passed = False
status_str = click.style("FAILED", fg="red")
elif status is None:
status_str = click.style("IGNORED", fg="yellow")
click.echo(
"test/%s/env:%s\t[%s]" % (click.style(testname, fg="yellow"),
click.style(envname, fg="cyan"),
status_str),
err=status is False)
print_header(
"[%s] Took %.2f seconds" % (
(click.style("PASSED", fg="green", bold=True) if passed else
click.style("FAILED", fg="red", bold=True)), time() - start_time),
is_error=not passed)
if not passed:
raise exception.ReturnErrorCode(1)
def get_test_names(test_dir):
names = []
for item in sorted(listdir(test_dir)):
if isdir(join(test_dir, item)):
names.append(item)
if not names:
names = ["*"]
return names

View File

@ -0,0 +1,133 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from time import sleep
import click
import serial
from platformio import exception, util
from platformio.commands.test.processor import TestProcessorBase
from platformio.managers.platform import PlatformFactory
class EmbeddedTestProcessor(TestProcessorBase):
SERIAL_TIMEOUT = 600
def process(self):
if not self.options['without_building']:
self.print_progress("Building... (1/3)")
target = ["__test"]
if self.options['without_uploading']:
target.append("checkprogsize")
self.build_or_upload(target)
if not self.options['without_uploading']:
self.print_progress("Uploading... (2/3)")
target = ["upload"]
if self.options['without_building']:
target.append("nobuild")
else:
target.append("__test")
self.build_or_upload(target)
if self.options['without_testing']:
return None
self.print_progress("Testing... (3/3)")
return self.run()
def run(self):
click.echo("If you don't see any output for the first 10 secs, "
"please reset board (press reset button)")
click.echo()
try:
ser = serial.Serial(
baudrate=self.get_baudrate(), timeout=self.SERIAL_TIMEOUT)
ser.port = self.get_test_port()
ser.rts = self.options['monitor_rts']
ser.dtr = self.options['monitor_dtr']
ser.open()
except serial.SerialException as e:
click.secho(str(e), fg="red", err=True)
return False
if not self.options['no_reset']:
ser.flushInput()
ser.setDTR(False)
ser.setRTS(False)
sleep(0.1)
ser.setDTR(True)
ser.setRTS(True)
sleep(0.1)
while True:
line = ser.readline().strip()
# fix non-ascii output from device
for i, c in enumerate(line[::-1]):
if not isinstance(c, int):
c = ord(c)
if c > 127:
line = line[-i:]
break
if not line:
continue
if isinstance(line, bytes):
line = line.decode("utf8")
self.on_run_out(line)
if all([l in line for l in ("Tests", "Failures", "Ignored")]):
break
ser.close()
return not self._run_failed
def get_test_port(self):
# if test port is specified manually or in config
if self.options.get("test_port"):
return self.options.get("test_port")
if self.env_options.get("test_port"):
return self.env_options.get("test_port")
assert set(["platform", "board"]) & set(self.env_options.keys())
p = PlatformFactory.newPlatform(self.env_options['platform'])
board_hwids = p.board_config(self.env_options['board']).get(
"build.hwids", [])
port = None
elapsed = 0
while elapsed < 5 and not port:
for item in util.get_serialports():
port = item['port']
for hwid in board_hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item['hwid']:
return port
# check if port is already configured
try:
serial.Serial(port, timeout=self.SERIAL_TIMEOUT).close()
except serial.SerialException:
port = None
if not port:
sleep(0.25)
elapsed += 0.25
if not port:
raise exception.PlatformioException(
"Please specify `test_port` for environment or use "
"global `--test-port` option.")
return port

View File

@ -0,0 +1,39 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os.path import join
from platformio import util
from platformio.commands.test.processor import TestProcessorBase
class NativeTestProcessor(TestProcessorBase):
def process(self):
if not self.options['without_building']:
self.print_progress("Building... (1/2)")
self.build_or_upload(["__test"])
if self.options['without_testing']:
return None
self.print_progress("Testing... (2/2)")
return self.run()
def run(self):
with util.cd(self.options['project_dir']):
build_dir = util.get_projectbuild_dir()
result = util.exec_command([join(build_dir, self.env_name, "program")],
stdout=util.AsyncPipe(self.on_run_out),
stderr=util.AsyncPipe(self.on_run_out))
assert "returncode" in result
return result['returncode'] == 0 and not self._run_failed

View File

@ -0,0 +1,198 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
from os import remove
from os.path import isdir, isfile, join
from string import Template
import click
from platformio import exception, util
from platformio.commands.run import cli as cmd_run
from platformio.commands.run import print_header
TRANSPORT_OPTIONS = {
"arduino": {
"include": "#include <Arduino.h>",
"object": "",
"putchar": "Serial.write(c)",
"flush": "Serial.flush()",
"begin": "Serial.begin($baudrate)",
"end": "Serial.end()"
},
"mbed": {
"include": "#include <mbed.h>",
"object": "Serial pc(USBTX, USBRX);",
"putchar": "pc.putc(c)",
"flush": "",
"begin": "pc.baud($baudrate)",
"end": ""
},
"energia": {
"include": "#include <Energia.h>",
"object": "",
"putchar": "Serial.write(c)",
"flush": "Serial.flush()",
"begin": "Serial.begin($baudrate)",
"end": "Serial.end()"
},
"espidf": {
"include": "#include <stdio.h>",
"object": "",
"putchar": "putchar(c)",
"flush": "fflush(stdout)",
"begin": "",
"end": ""
},
"native": {
"include": "#include <stdio.h>",
"object": "",
"putchar": "putchar(c)",
"flush": "fflush(stdout)",
"begin": "",
"end": ""
},
"custom": {
"include": '#include "unittest_transport.h"',
"object": "",
"putchar": "unittest_uart_putchar(c)",
"flush": "unittest_uart_flush()",
"begin": "unittest_uart_begin()",
"end": "unittest_uart_end()"
}
}
class TestProcessorBase(object):
DEFAULT_BAUDRATE = 115200
def __init__(self, cmd_ctx, testname, envname, options):
self.cmd_ctx = cmd_ctx
self.cmd_ctx.meta['piotest_processor'] = True
self.test_name = testname
self.options = options
self.env_name = envname
self.env_options = options['project_config'].items(
env=envname, as_dict=True)
self._run_failed = False
self._outputcpp_generated = False
def get_transport(self):
transport = self.env_options.get("framework")
if self.env_options.get("platform") == "native":
transport = "native"
if "test_transport" in self.env_options:
transport = self.env_options['test_transport']
if transport not in TRANSPORT_OPTIONS:
raise exception.PlatformioException(
"Unknown Unit Test transport `%s`" % transport)
return transport.lower()
def get_baudrate(self):
return int(self.env_options.get("test_speed", self.DEFAULT_BAUDRATE))
def print_progress(self, text, is_error=False):
click.echo()
print_header(
"[test/%s] %s" % (click.style(
self.test_name, fg="yellow", bold=True), text),
is_error=is_error)
def build_or_upload(self, target):
if not self._outputcpp_generated:
self.generate_outputcpp(util.get_projecttest_dir())
self._outputcpp_generated = True
if self.test_name != "*":
self.cmd_ctx.meta['piotest'] = self.test_name
if not self.options['verbose']:
click.echo("Please wait...")
return self.cmd_ctx.invoke(
cmd_run,
project_dir=self.options['project_dir'],
upload_port=self.options['upload_port'],
silent=not self.options['verbose'],
environment=[self.env_name],
disable_auto_clean="nobuild" in target,
target=target)
def process(self):
raise NotImplementedError
def run(self):
raise NotImplementedError
def on_run_out(self, line):
if line.endswith(":PASS"):
click.echo(
"%s\t[%s]" % (line[:-5], click.style("PASSED", fg="green")))
elif ":FAIL" in line:
self._run_failed = True
click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red")))
else:
click.echo(line)
def generate_outputcpp(self, test_dir):
assert isdir(test_dir)
cpp_tpl = "\n".join([
"$include",
"#include <output_export.h>",
"",
"$object",
"",
"void output_start(unsigned int baudrate)",
"{",
" $begin;",
"}",
"",
"void output_char(int c)",
"{",
" $putchar;",
"}",
"",
"void output_flush(void)",
"{",
" $flush;",
"}",
"",
"void output_complete(void)",
"{",
" $end;",
"}"
]) # yapf: disable
def delete_tmptest_file(file_):
try:
remove(file_)
except: # pylint: disable=bare-except
if isfile(file_):
click.secho(
"Warning: Could not remove temporary file '%s'. "
"Please remove it manually." % file_,
fg="yellow")
tpl = Template(cpp_tpl).substitute(
TRANSPORT_OPTIONS[self.get_transport()])
data = Template(tpl).substitute(baudrate=self.get_baudrate())
tmp_file = join(test_dir, "output_export.cpp")
with open(tmp_file, "w") as f:
f.write(data)
atexit.register(delete_tmptest_file, tmp_file)

View File

@ -320,3 +320,12 @@ class DebugSupportError(PlatformioException):
class DebugInvalidOptions(PlatformioException):
pass
class TestDirNotExists(PlatformioException):
MESSAGE = "A test folder '{0}' does not exist.\nPlease create 'test' "\
"directory in project's root and put a test set.\n"\
"More details about Unit "\
"Testing: http://docs.platformio.org/page/plus/"\
"unit-testing.html"

View File

@ -118,6 +118,15 @@ def shutdown_piohome_servers():
port += 1
def inject_contrib_pysite():
from site import addsitedir
contrib_pysite_dir = get_core_package_dir("contrib-pysite")
if contrib_pysite_dir in sys.path:
return
addsitedir(contrib_pysite_dir)
sys.path.insert(0, contrib_pysite_dir)
def pioplus_call(args, **kwargs):
if WINDOWS and sys.version_info < (2, 7, 6):
raise exception.PlatformioException(