Files
platformio-core/platformio/debug/process/server.py

149 lines
5.6 KiB
Python

# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import time
from platformio import fs, util
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.debug.helpers import escape_gdbmi_stream, is_gdbmi_mode
from platformio.debug.process.base import DebugBaseProcess
from platformio.proc import where_is_program
class DebugServerProcess(DebugBaseProcess):
def __init__(self, debug_options, env_options):
super(DebugServerProcess, self).__init__()
self.debug_options = debug_options
self.env_options = env_options
self._debug_port = ":3333"
self._ready = False
async def run(self, patterns): # pylint: disable=too-many-branches
systype = util.get_systype()
server = self.debug_options.get("server")
if not server:
return None
server = self.apply_patterns(server, patterns)
server_executable = server["executable"]
if not server_executable:
return None
if server["cwd"]:
server_executable = os.path.join(server["cwd"], server_executable)
if (
"windows" in systype
and not server_executable.endswith(".exe")
and os.path.isfile(server_executable + ".exe")
):
server_executable = server_executable + ".exe"
if not os.path.isfile(server_executable):
server_executable = where_is_program(server_executable)
if not os.path.isfile(server_executable):
raise DebugInvalidOptionsError(
"\nCould not launch Debug Server '%s'. Please check that it "
"is installed and is included in a system PATH\n\n"
"See documentation or contact contact@platformio.org:\n"
"https://docs.platformio.org/page/plus/debugging.html\n"
% server_executable
)
openocd_pipe_allowed = all(
[not self.debug_options["port"], "openocd" in server_executable]
)
openocd_pipe_allowed = False
if openocd_pipe_allowed:
args = []
if server["cwd"]:
args.extend(["-s", server["cwd"]])
args.extend(
["-c", "gdb_port pipe; tcl_port disabled; telnet_port disabled"]
)
args.extend(server["arguments"])
str_args = " ".join(
[arg if arg.startswith("-") else '"%s"' % arg for arg in args]
)
self._debug_port = '| "%s" %s' % (server_executable, str_args)
self._debug_port = fs.to_unix_path(self._debug_port)
return self._debug_port
env = os.environ.copy()
# prepend server "lib" folder to LD path
if (
"windows" not in systype
and server["cwd"]
and os.path.isdir(os.path.join(server["cwd"], "lib"))
):
ld_key = "DYLD_LIBRARY_PATH" if "darwin" in systype else "LD_LIBRARY_PATH"
env[ld_key] = os.path.join(server["cwd"], "lib")
if os.environ.get(ld_key):
env[ld_key] = "%s:%s" % (env[ld_key], os.environ.get(ld_key))
# prepend BIN to PATH
if server["cwd"] and os.path.isdir(os.path.join(server["cwd"], "bin")):
env["PATH"] = "%s%s%s" % (
os.path.join(server["cwd"], "bin"),
os.pathsep,
os.environ.get("PATH", os.environ.get("Path", "")),
)
await self.spawn(
*([server_executable] + server["arguments"]), cwd=server["cwd"], env=env
)
if "mspdebug" in server_executable.lower():
self._debug_port = ":2000"
elif "jlink" in server_executable.lower():
self._debug_port = ":2331"
elif "qemu" in server_executable.lower():
self._debug_port = ":1234"
await self._wait_until_ready()
return self._debug_port
async def _wait_until_ready(self):
ready_pattern = self.debug_options.get("server", {}).get("ready_pattern")
timeout = 60 if ready_pattern else 10
elapsed = 0
delay = 0.5
auto_ready_delay = 0.5
while not self._ready and self.is_running() and elapsed < timeout:
await asyncio.sleep(delay)
if not ready_pattern:
self._ready = self._last_activity < (time.time() - auto_ready_delay)
elapsed += delay
def _check_ready_by_pattern(self, data):
if self._ready:
return self._ready
ready_pattern = self.debug_options.get("server", {}).get("ready_pattern")
if ready_pattern:
self._ready = ready_pattern.encode() in data
return self._ready
def get_debug_port(self):
return self._debug_port
def stdout_data_received(self, data):
super(DebugServerProcess, self).stdout_data_received(
escape_gdbmi_stream("@", data) if is_gdbmi_mode() else data
)
self._check_ready_by_pattern(data)
def stderr_data_received(self, data):
super(DebugServerProcess, self).stderr_data_received(data)
self._check_ready_by_pattern(data)