Initial commit of PIO Unified Debugger

This commit is contained in:
Ivan Kravets
2019-04-19 19:56:16 +03:00
parent 21a36f8ee9
commit f1da544279
12 changed files with 1051 additions and 54 deletions

View File

@ -1,3 +1,3 @@
[settings]
line_length=79
known_third_party=bottle,click,pytest,requests,SCons,semantic_version,serial
known_third_party=bottle,click,pytest,requests,SCons,semantic_version,serial, twisted

View File

@ -14,7 +14,7 @@
import os
import sys
from os.path import join
from os.path import isdir, isfile, join
from platform import system
from traceback import format_exc
@ -24,15 +24,19 @@ from platformio import __version__, exception, maintenance
from platformio.util import get_source_dir
class PlatformioCLI(click.MultiCommand): # pylint: disable=R0904
class PlatformioCLI(click.MultiCommand):
def list_commands(self, ctx):
cmds = []
for filename in os.listdir(join(get_source_dir(), "commands")):
if filename.startswith("__init__"):
commands_dir = join(get_source_dir(), "commands")
for name in os.listdir(commands_dir):
if name.startswith("__init__"):
continue
if filename.endswith(".py"):
cmds.append(filename[:-3])
if (isdir(join(commands_dir, name))
and isfile(join(commands_dir, name, "command.py"))):
cmds.append(name)
elif name.endswith(".py"):
cmds.append(name[:-3])
cmds.sort()
return cmds

View File

@ -1,42 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from os import getcwd
import click
from platformio.managers.core import pioplus_call
@click.command(
"debug",
context_settings=dict(ignore_unknown_options=True),
short_help="PIO Unified Debugger")
@click.option(
"-d",
"--project-dir",
default=getcwd,
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True))
@click.option("--environment", "-e", metavar="<environment>")
@click.option("--verbose", "-v", is_flag=True)
@click.option("--interface", type=click.Choice(["gdb"]))
@click.argument("__unprocessed", nargs=-1, type=click.UNPROCESSED)
def cli(*args, **kwargs): # pylint: disable=unused-argument
pioplus_call(sys.argv[1:])

View File

@ -0,0 +1,15 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.commands.debug.command import cli

View File

@ -0,0 +1,271 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import re
import time
from hashlib import sha1
from os.path import abspath, basename, dirname, isdir, join, splitext
from tempfile import mkdtemp
from twisted.internet import protocol, reactor, stdio, task
from platformio import app, exception, util
from platformio.commands.debug import helpers, initcfgs
from platformio.commands.debug.process import BaseProcess
from platformio.commands.debug.server import DebugServer
from platformio.telemetry import MeasurementProtocol
LOG_FILE = None
class GDBClient(BaseProcess): # pylint: disable=too-many-instance-attributes
PIO_SRC_NAME = ".pioinit"
INIT_COMPLETED_BANNER = "PlatformIO: Initialization completed"
def __init__(self, project_dir, args, debug_options, env_options):
self.project_dir = project_dir
self.args = list(args)
self.debug_options = debug_options
self.env_options = env_options
self._debug_server = DebugServer(debug_options, env_options)
self._session_id = None
if not isdir(util.get_cache_dir()):
os.makedirs(util.get_cache_dir())
self._gdbsrc_dir = mkdtemp(
dir=util.get_cache_dir(), prefix=".piodebug-")
self._target_is_run = False
self._last_server_activity = 0
self._auto_continue_timer = None
def spawn(self, gdb_path, prog_path):
session_hash = gdb_path + prog_path
self._session_id = sha1(
session_hash if util.PY2 else session_hash.encode()).hexdigest()
self._kill_previous_session()
patterns = {
"PROJECT_DIR": helpers.escape_path(self.project_dir),
"PROG_PATH": helpers.escape_path(prog_path),
"PROG_DIR": helpers.escape_path(dirname(prog_path)),
"PROG_NAME": basename(splitext(prog_path)[0]),
"DEBUG_PORT": self.debug_options['port'],
"UPLOAD_PROTOCOL": self.debug_options['upload_protocol'],
"INIT_BREAK": self.debug_options['init_break'] or "",
"LOAD_CMD": self.debug_options['load_cmd'] or "",
}
self._debug_server.spawn(patterns)
if not patterns['DEBUG_PORT']:
patterns['DEBUG_PORT'] = self._debug_server.get_debug_port()
self.generate_pioinit(self._gdbsrc_dir, patterns)
# start GDB client
args = [
"piogdb",
"-q",
"--directory", self._gdbsrc_dir,
"--directory", self.project_dir,
"-l", "10"
] # yapf: disable
args.extend(self.args)
if not gdb_path:
raise exception.DebugInvalidOptions("GDB client is not configured")
gdb_data_dir = self._get_data_dir(gdb_path)
if gdb_data_dir:
args.extend(["--data-directory", gdb_data_dir])
args.append(patterns['PROG_PATH'])
return reactor.spawnProcess(
self, gdb_path, args, path=self.project_dir, env=os.environ)
@staticmethod
def _get_data_dir(gdb_path):
if "msp430" in gdb_path:
return None
gdb_data_dir = abspath(join(dirname(gdb_path), "..", "share", "gdb"))
return gdb_data_dir if isdir(gdb_data_dir) else None
def generate_pioinit(self, dst_dir, patterns):
server_exe = (self.debug_options.get("server") or {}).get(
"executable", "").lower()
if "jlink" in server_exe:
cfg = initcfgs.GDB_JLINK_INIT_CONFIG
elif "st-util" in server_exe:
cfg = initcfgs.GDB_STUTIL_INIT_CONFIG
elif "mspdebug" in server_exe:
cfg = initcfgs.MSPDEBUG_INIT_CONFIG
elif self.debug_options['require_debug_port']:
cfg = initcfgs.GDB_BLACKMAGIC_INIT_CONFIG
else:
cfg = initcfgs.GDB_DEFAULT_INIT_CONFIG
commands = cfg.split("\n")
if self.debug_options['init_cmds']:
commands = self.debug_options['init_cmds']
commands.extend(self.debug_options['extra_cmds'])
if not any("define pio_reset_target" in cmd for cmd in commands):
commands = [
"define pio_reset_target",
" echo Warning! Undefined pio_reset_target command\\n",
" mon reset",
"end"
] + commands # yapf: disable
if not any("define pio_reset_halt_target" in cmd for cmd in commands):
commands = [
"define pio_reset_halt_target",
" echo Warning! Undefined pio_reset_halt_target command\\n",
" mon reset halt",
"end"
] + commands # yapf: disable
if not any("define pio_restart_target" in cmd for cmd in commands):
commands += [
"define pio_restart_target",
" pio_reset_halt_target",
" $INIT_BREAK",
" %s" % ("continue" if patterns['INIT_BREAK'] else "next"),
"end"
] # yapf: disable
banner = [
"echo PlatformIO Unified Debugger > http://bit.ly/pio-debug\\n",
"echo PlatformIO: Initializing remote target...\\n"
]
footer = ["echo %s\\n" % self.INIT_COMPLETED_BANNER]
commands = banner + commands + footer
with open(join(dst_dir, self.PIO_SRC_NAME), "w") as fp:
fp.write("\n".join(self.apply_patterns(commands, patterns)))
def connectionMade(self):
self._lock_session(self.transport.pid)
p = protocol.Protocol()
p.dataReceived = self.onStdInData
stdio.StandardIO(p)
def onStdInData(self, data):
self._last_server_activity = time.time()
if LOG_FILE:
with open(LOG_FILE, "ab") as fp:
fp.write(data)
if b"-exec-run" in data:
if self._target_is_run:
token, _ = data.split(b"-", 1)
self.outReceived(token + b"^running\n")
return
data = data.replace(b"-exec-run", b"-exec-continue")
if b"-exec-continue" in data:
self._target_is_run = True
if b"-gdb-exit" in data or data.strip() in (b"q", b"quit"):
self.transport.write(b"pio_reset_target\n")
self.transport.write(data)
def processEnded(self, reason): # pylint: disable=unused-argument
self._unlock_session()
if self._gdbsrc_dir and isdir(self._gdbsrc_dir):
util.rmtree_(self._gdbsrc_dir)
if self._debug_server:
self._debug_server.terminate()
reactor.stop()
def outReceived(self, data):
self._last_server_activity = time.time()
super(GDBClient, self).outReceived(data)
self._handle_error(data)
# go to init break automatically
if self.INIT_COMPLETED_BANNER.encode() in data:
self._auto_continue_timer = task.LoopingCall(
self._auto_exec_continue)
self._auto_continue_timer.start(0.1)
def errReceived(self, data):
super(GDBClient, self).errReceived(data)
self._handle_error(data)
def console_log(self, msg):
if helpers.is_mi_mode(self.args):
self.outReceived(('~"%s\\n"\n' % msg).encode())
else:
self.outReceived(("%s\n" % msg).encode())
def _auto_exec_continue(self):
auto_exec_delay = 0.5 # in seconds
if self._last_server_activity > (time.time() - auto_exec_delay):
return
if self._auto_continue_timer:
self._auto_continue_timer.stop()
self._auto_continue_timer = None
if not self.debug_options['init_break'] or self._target_is_run:
return
self.console_log(
"PlatformIO: Resume the execution to `debug_init_break = %s`" %
self.debug_options['init_break'])
self.transport.write(b"0-exec-continue\n" if helpers.
is_mi_mode(self.args) else b"continue\n")
self._target_is_run = True
def _handle_error(self, data):
if (self.PIO_SRC_NAME.encode() not in data
or b"Error in sourced" not in data):
return
configuration = {"debug": self.debug_options, "env": self.env_options}
exd = re.sub(r'\\(?!")', "/", json.dumps(configuration))
exd = re.sub(r'"(?:[a-z]\:)?((/[^"/]+)+)"', lambda m: '"%s"' % join(
*m.group(1).split("/")[-2:]), exd, re.I | re.M)
mp = MeasurementProtocol()
mp['exd'] = "DebugGDBPioInitError: %s" % exd
mp['exf'] = 1
mp.send("exception")
self.transport.loseConnection()
def _kill_previous_session(self):
assert self._session_id
pid = None
with app.ContentCache() as cc:
pid = cc.get(self._session_id)
cc.delete(self._session_id)
if not pid:
return
if "windows" in util.get_systype():
kill = ["Taskkill", "/PID", pid, "/F"]
else:
kill = ["kill", pid]
try:
util.exec_command(kill)
except: # pylint: disable=bare-except
pass
def _lock_session(self, pid):
if not self._session_id:
return
with app.ContentCache() as cc:
cc.set(self._session_id, str(pid), "1h")
def _unlock_session(self):
if not self._session_id:
return
with app.ContentCache() as cc:
cc.delete(self._session_id)

View File

@ -0,0 +1,135 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-arguments, too-many-statements
# pylint: disable=too-many-locals, too-many-branches
import os
from os.path import isfile
import click
from platformio import exception, util
from platformio.commands.debug import helpers
from platformio.managers.core import inject_contrib_pysite
@click.command(
"debug",
context_settings=dict(ignore_unknown_options=True),
short_help="PIO Unified Debugger")
@click.option(
"-d",
"--project-dir",
default=os.getcwd,
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True))
@click.option("--environment", "-e", metavar="<environment>")
@click.option("--verbose", "-v", is_flag=True)
@click.option("--interface", type=click.Choice(["gdb"]))
@click.argument("__unprocessed", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def cli(ctx, project_dir, environment, verbose, interface, __unprocessed):
try:
util.ensure_udev_rules()
except NameError:
pass
except exception.InvalidUdevRules as e:
for line in str(e).split("\n") + [""]:
click.echo(
('~"%s\\n"' if helpers.is_mi_mode(__unprocessed) else "%s") %
line)
if not util.is_platformio_project(project_dir) and os.getenv("CWD"):
project_dir = os.getenv("CWD")
with util.cd(project_dir):
env_name = helpers.check_env_name(project_dir, environment)
env_options = helpers.get_env_options(project_dir, env_name)
if not set(env_options.keys()) >= set(["platform", "board"]):
raise exception.ProjectEnvsNotAvailable()
debug_options = helpers.validate_debug_options(ctx, env_options)
assert debug_options
if not interface:
return helpers.predebug_project(ctx, project_dir, env_name, False,
verbose)
configuration = helpers.load_configuration(ctx, project_dir, env_name)
if not configuration:
raise exception.DebugInvalidOptions(
"Could not load debug configuration")
if "--version" in __unprocessed:
result = util.exec_command([configuration['gdb_path'], "--version"])
if result['returncode'] == 0:
return click.echo(result['out'])
raise exception.PlatformioException("\n".join(
[result['out'], result['err']]))
debug_options['load_cmd'] = helpers.configure_esp32_load_cmd(
debug_options, configuration)
rebuild_prog = False
preload = debug_options['load_cmd'] == "preload"
load_mode = debug_options['load_mode']
if load_mode == "always":
rebuild_prog = (
preload
or not helpers.has_debug_symbols(configuration['prog_path']))
elif load_mode == "modified":
rebuild_prog = (
helpers.is_prog_obsolete(configuration['prog_path'])
or not helpers.has_debug_symbols(configuration['prog_path']))
else:
rebuild_prog = not isfile(configuration['prog_path'])
if preload or (not rebuild_prog and load_mode != "always"):
# don't load firmware through debug server
debug_options['load_cmd'] = None
if rebuild_prog:
if helpers.is_mi_mode(__unprocessed):
output = helpers.GDBBytesIO()
click.echo('~"Preparing firmware for debugging...\\n"')
with helpers.capture_std_streams(output):
helpers.predebug_project(ctx, project_dir, env_name, preload,
verbose)
output.close()
else:
click.echo("Preparing firmware for debugging...")
helpers.predebug_project(ctx, project_dir, env_name, preload,
verbose)
# save SHA sum of newly created prog
if load_mode == "modified":
helpers.is_prog_obsolete(configuration['prog_path'])
if not isfile(configuration['prog_path']):
raise exception.DebugInvalidOptions("Program/firmware is missed")
# run debugging client
inject_contrib_pysite()
from platformio.commands.debug.client import GDBClient, reactor
client = GDBClient(project_dir, __unprocessed, debug_options, env_options)
client.spawn(configuration['gdb_path'], configuration['prog_path'])
reactor.run()
return True

View File

@ -0,0 +1,317 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import sys
import time
from contextlib import contextmanager
from fnmatch import fnmatch
from hashlib import sha1
from io import BytesIO
from os.path import isfile
from platformio import VERSION, exception, util
from platformio.commands.platform import \
platform_install as cmd_platform_install
from platformio.commands.run import cli as cmd_run
from platformio.managers.platform import PlatformFactory
class GDBBytesIO(BytesIO): # pylint: disable=too-few-public-methods
STDOUT = sys.stdout
def write(self, text):
for line in text.strip().split("\n"):
self.STDOUT.write('~"%s\\n"\n' % line)
self.STDOUT.flush()
def is_mi_mode(args):
return "--interpreter" in " ".join(args)
def escape_path(path):
return path.replace("\\", "/")
def check_env_name(project_dir, environment):
config = util.load_project_config(project_dir)
envs = []
for section in config.sections():
if section.startswith("env:"):
envs.append(section[4:])
if not envs:
raise exception.ProjectEnvsNotAvailable()
if not environment and config.has_option("platformio", "env_default"):
environment = config.get("platformio", "env_default").split(", ")[0]
if environment:
if environment in envs:
return environment
raise exception.UnknownEnvNames(environment, envs)
return envs[0]
def get_env_options(project_dir, environment):
config = util.load_project_config(project_dir)
options = {}
for k, v in config.items("env:%s" % environment):
options[k] = v
return options
def validate_debug_options(cmd_ctx, env_options):
def _cleanup_cmds(cmds):
if not cmds:
return []
if not isinstance(cmds, list):
cmds = cmds.split("\n")
return [c.strip() for c in cmds if c.strip()]
try:
platform = PlatformFactory.newPlatform(env_options['platform'])
except exception.UnknownPlatform:
cmd_ctx.invoke(
cmd_platform_install,
platforms=[env_options['platform']],
skip_default_package=True)
platform = PlatformFactory.newPlatform(env_options['platform'])
board_config = platform.board_config(env_options['board'])
tool_name = board_config.get_debug_tool_name(env_options.get("debug_tool"))
tool_settings = board_config.get("debug", {}).get("tools", {}).get(
tool_name, {})
server_options = None
# specific server per a system
if isinstance(tool_settings.get("server", {}), list):
for item in tool_settings['server'][:]:
tool_settings['server'] = item
if util.get_systype() in item.get("system", []):
break
# user overwrites debug server
if env_options.get("debug_server"):
server_options = {
"cwd": None,
"executable": None,
"arguments": env_options.get("debug_server")
}
if not isinstance(server_options['arguments'], list):
server_options['arguments'] = server_options['arguments'].split(
"\n")
server_options['arguments'] = [
arg.strip() for arg in server_options['arguments'] if arg.strip()
]
server_options['executable'] = server_options['arguments'][0]
server_options['arguments'] = server_options['arguments'][1:]
elif "server" in tool_settings:
server_package = tool_settings['server'].get("package")
server_package_dir = platform.get_package_dir(
server_package) if server_package else None
if server_package and not server_package_dir:
platform.install_packages(
with_packages=[server_package],
skip_default_package=True,
silent=True)
server_package_dir = platform.get_package_dir(server_package)
server_options = dict(
cwd=server_package_dir if server_package else None,
executable=tool_settings['server'].get("executable"),
arguments=[
a.replace("$PACKAGE_DIR", escape_path(server_package_dir))
if server_package_dir else a
for a in tool_settings['server'].get("arguments", [])
])
extra_cmds = _cleanup_cmds(env_options.get("debug_extra_cmds"))
extra_cmds.extend(_cleanup_cmds(tool_settings.get("extra_cmds")))
result = dict(
tool=tool_name,
upload_protocol=env_options.get(
"upload_protocol",
board_config.get("upload", {}).get("protocol")),
load_cmd=env_options.get("debug_load_cmd",
tool_settings.get("load_cmd", "load")),
load_mode=env_options.get("debug_load_mode",
tool_settings.get("load_mode", "always")),
init_break=env_options.get(
"debug_init_break", tool_settings.get("init_break",
"tbreak main")),
init_cmds=_cleanup_cmds(
env_options.get("debug_init_cmds",
tool_settings.get("init_cmds"))),
extra_cmds=extra_cmds,
require_debug_port=tool_settings.get("require_debug_port", False),
port=reveal_debug_port(
env_options.get("debug_port", tool_settings.get("port")),
tool_name, tool_settings),
server=server_options)
return result
def predebug_project(ctx, project_dir, env_name, preload, verbose):
ctx.invoke(
cmd_run,
project_dir=project_dir,
environment=[env_name],
target=["__debug"] + (["upload"] if preload else []),
verbose=verbose)
if preload:
time.sleep(5)
@contextmanager
def capture_std_streams(stdout, stderr=None):
_stdout = sys.stdout
_stderr = sys.stderr
sys.stdout = stdout
sys.stderr = stderr or stdout
yield
sys.stdout = _stdout
sys.stderr = _stderr
def load_configuration(ctx, project_dir, env_name):
output = BytesIO()
with capture_std_streams(output):
ctx.invoke(
cmd_run,
project_dir=project_dir,
environment=[env_name],
target=["idedata"])
result = output.getvalue().decode()
output.close()
if '"includes":' not in result:
return None
for line in result.split("\n"):
line = line.strip()
if line.startswith('{"') and "cxx_path" in line:
return json.loads(line[:line.rindex("}") + 1])
return None
def configure_esp32_load_cmd(debug_options, configuration):
ignore_conds = [
debug_options['load_cmd'] != "load",
"xtensa-esp32" not in configuration.get("cc_path", ""),
not configuration.get("flash_extra_images"), not all([
isfile(item['path'])
for item in configuration.get("flash_extra_images")
])
]
if any(ignore_conds):
return debug_options['load_cmd']
mon_cmds = [
'monitor program_esp32 "{{{path}}}" {offset} verify'.format(
path=escape_path(item['path']), offset=item['offset'])
for item in configuration.get("flash_extra_images")
]
mon_cmds.append('monitor program_esp32 "{%s.bin}" 0x10000 verify' %
escape_path(configuration['prog_path'][:-4]))
return "\n".join(mon_cmds)
def has_debug_symbols(prog_path):
if not isfile(prog_path):
return False
matched = {
b".debug_info": False,
b".debug_abbrev": False,
b" -Og": False,
b" -g": False,
b"__PLATFORMIO_DEBUG__": (3, 6) > VERSION[:2]
}
with open(prog_path, "rb") as fp:
last_data = b""
while True:
data = fp.read(1024)
if not data:
break
for pattern, found in matched.items():
if found:
continue
if pattern in last_data + data:
matched[pattern] = True
last_data = data
return all(matched.values())
def is_prog_obsolete(prog_path):
prog_hash_path = prog_path + ".sha1"
if not isfile(prog_path):
return True
shasum = sha1()
with open(prog_path, "rb") as fp:
while True:
data = fp.read(1024)
if not data:
break
shasum.update(data)
new_digest = shasum.hexdigest()
old_digest = None
if isfile(prog_hash_path):
with open(prog_hash_path, "r") as fp:
old_digest = fp.read()
if new_digest == old_digest:
return False
with open(prog_hash_path, "w") as fp:
fp.write(new_digest)
return True
def reveal_debug_port(env_debug_port, tool_name, tool_settings):
def _get_pattern():
if not env_debug_port:
return None
if set(["*", "?", "[", "]"]) & set(env_debug_port):
return env_debug_port
return None
def _is_match_pattern(port):
pattern = _get_pattern()
if not pattern:
return True
return fnmatch(port, pattern)
def _look_for_serial_port(hwids):
for item in util.get_serialports(filter_hwid=True):
if not _is_match_pattern(item['port']):
continue
port = item['port']
if tool_name.startswith("blackmagic"):
if "windows" in util.get_systype() and \
port.startswith("COM") and len(port) > 4:
port = "\\\\.\\%s" % port
if "GDB" in item['description']:
return port
for hwid in hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item['hwid']:
return port
return None
if env_debug_port and not _get_pattern():
return env_debug_port
if not tool_settings.get("require_debug_port"):
return None
debug_port = _look_for_serial_port(tool_settings.get("hwids", []))
if not debug_port:
raise exception.DebugInvalidOptions(
"Please specify `debug_port` for environment")
return debug_port

View File

@ -0,0 +1,109 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
GDB_DEFAULT_INIT_CONFIG = """
define pio_reset_halt_target
monitor reset halt
end
define pio_reset_target
monitor reset
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
pio_reset_halt_target
$LOAD_CMD
monitor init
pio_reset_halt_target
"""
GDB_STUTIL_INIT_CONFIG = """
define pio_reset_halt_target
monitor halt
monitor reset
end
define pio_reset_target
monitor reset
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
pio_reset_halt_target
$LOAD_CMD
pio_reset_halt_target
"""
GDB_JLINK_INIT_CONFIG = """
define pio_reset_halt_target
monitor halt
monitor reset
end
define pio_reset_target
monitor reset
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
pio_reset_halt_target
$LOAD_CMD
pio_reset_halt_target
"""
GDB_BLACKMAGIC_INIT_CONFIG = """
define pio_reset_halt_target
set language c
set *0xE000ED0C = 0x05FA0004
set $busy = (*0xE000ED0C & 0x4)
while ($busy)
set $busy = (*0xE000ED0C & 0x4)
end
set language auto
end
define pio_reset_target
pio_reset_halt_target
end
target extended-remote $DEBUG_PORT
monitor swdp_scan
attach 1
set mem inaccessible-by-default off
$INIT_BREAK
$LOAD_CMD
set language c
set *0xE000ED0C = 0x05FA0004
set $busy = (*0xE000ED0C & 0x4)
while ($busy)
set $busy = (*0xE000ED0C & 0x4)
end
set language auto
"""
MSPDEBUG_INIT_CONFIG = """
define pio_reset_halt_target
end
define pio_reset_target
end
target extended-remote $DEBUG_PORT
$INIT_BREAK
monitor erase
$LOAD_CMD
pio_reset_halt_target
"""

View File

@ -0,0 +1,72 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import click
from twisted.internet import protocol
from platformio import util
from platformio.commands.debug import helpers
LOG_FILE = None
class BaseProcess(protocol.ProcessProtocol, object):
STDOUT_CHUNK_SIZE = 2048
COMMON_PATTERNS = {
"PLATFORMIO_HOME_DIR": helpers.escape_path(util.get_home_dir()),
"PYTHONEXE": os.getenv("PYTHONEXEPATH", "")
}
def apply_patterns(self, source, patterns=None):
_patterns = self.COMMON_PATTERNS.copy()
_patterns.update(patterns or {})
def _replace(text):
for key, value in _patterns.items():
pattern = "$%s" % key
text = text.replace(pattern, value or "")
return text
if isinstance(source, util.string_types):
source = _replace(source)
elif isinstance(source, (list, dict)):
items = enumerate(source) if isinstance(source,
list) else source.items()
for key, value in items:
if isinstance(value, util.string_types):
source[key] = _replace(value)
elif isinstance(value, (list, dict)):
source[key] = self.apply_patterns(value, patterns)
return source
def outReceived(self, data):
if LOG_FILE:
with open(LOG_FILE, "ab") as fp:
fp.write(data)
while data:
chunk = data[:self.STDOUT_CHUNK_SIZE]
click.echo(chunk, nl=False)
data = data[self.STDOUT_CHUNK_SIZE:]
@staticmethod
def errReceived(data):
if LOG_FILE:
with open(LOG_FILE, "ab") as fp:
fp.write(data)
click.echo(data, nl=False, err=True)

View File

@ -0,0 +1,110 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from os.path import isdir, isfile, join
from twisted.internet import reactor
from platformio import exception, util
from platformio.commands.debug import helpers
from platformio.commands.debug.process import BaseProcess
class DebugServer(BaseProcess):
def __init__(self, debug_options, env_options):
self.debug_options = debug_options
self.env_options = env_options
self._debug_port = None
self._transport = None
def spawn(self, patterns): # pylint: disable=too-many-branches
systype = util.get_systype()
server = self.debug_options.get("server")
if not server:
return None
server = self.apply_patterns(server, patterns)
server_executable = server['executable']
if not server_executable:
return None
if server['cwd']:
server_executable = join(server['cwd'], server_executable)
if ("windows" in systype and not server_executable.endswith(".exe")
and isfile(server_executable + ".exe")):
server_executable = server_executable + ".exe"
if not isfile(server_executable):
server_executable = util.where_is_program(server_executable)
if not isfile(server_executable):
raise exception.DebugInvalidOptions(
"\nCould not launch Debug Server '%s'. Please check that it "
"is installed and is included in a system PATH\n\n"
"See documentation or contact contact@platformio.org:\n"
"http://docs.platformio.org/page/plus/debugging.html\n" %
server_executable)
self._debug_port = ":3333"
openocd_pipe_allowed = all([
not self.debug_options['port'], "openocd" in server_executable,
self.env_options['platform'] != "riscv"
])
if openocd_pipe_allowed:
args = []
if server['cwd']:
args.extend(["-s", helpers.escape_path(server['cwd'])])
args.extend([
"-c", "gdb_port pipe; tcl_port disabled; telnet_port disabled"
])
args.extend(server['arguments'])
str_args = " ".join(
[arg if arg.startswith("-") else '"%s"' % arg for arg in args])
self._debug_port = '| "%s" %s' % (
helpers.escape_path(server_executable), str_args)
else:
env = os.environ.copy()
# prepend server "lib" folder to LD path
if ("windows" not in systype and server['cwd']
and isdir(join(server['cwd'], "lib"))):
ld_key = ("DYLD_LIBRARY_PATH"
if "darwin" in systype else "LD_LIBRARY_PATH")
env[ld_key] = join(server['cwd'], "lib")
if os.environ.get(ld_key):
env[ld_key] = "%s:%s" % (env[ld_key],
os.environ.get(ld_key))
# prepend BIN to PATH
if server['cwd'] and isdir(join(server['cwd'], "bin")):
env['PATH'] = "%s%s%s" % (
join(server['cwd'], "bin"), os.pathsep,
os.environ.get("PATH", os.environ.get("Path", "")))
self._transport = reactor.spawnProcess(
self,
server_executable, [server_executable] + server['arguments'],
path=server['cwd'],
env=env)
if "mspdebug" in server_executable.lower():
self._debug_port = ":2000"
elif "jlink" in server_executable.lower():
self._debug_port = ":2331"
return self._transport
def get_debug_port(self):
return self._debug_port
def terminate(self):
if self._transport:
self._transport.signalProcess("KILL")

View File

@ -115,6 +115,15 @@ def shutdown_piohome_servers():
port += 1
def inject_contrib_pysite():
from site import addsitedir
contrib_pysite_dir = get_core_package_dir("contrib-pysite")
if contrib_pysite_dir in sys.path:
return
addsitedir(contrib_pysite_dir)
sys.path.insert(0, contrib_pysite_dir)
def pioplus_call(args, **kwargs):
if "windows" in util.get_systype() and sys.version_info < (2, 7, 6):
raise exception.PlatformioException(

View File

@ -518,11 +518,8 @@ def get_mdns_services():
try:
import zeroconf
except ImportError:
from site import addsitedir
from platformio.managers.core import get_core_package_dir
contrib_pysite_dir = get_core_package_dir("contrib-pysite")
addsitedir(contrib_pysite_dir)
sys.path.insert(0, contrib_pysite_dir)
from platformio.managers.core import inject_contrib_pysite
inject_contrib_pysite()
import zeroconf
class mDNSListener(object):