# Copyright (c) 2014-present PlatformIO # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from os.path import isdir, isfile, join from twisted.internet import reactor # pylint: disable=import-error from platformio import exception, util from platformio.commands.debug import helpers from platformio.commands.debug.process import BaseProcess from platformio.proc import where_is_program class DebugServer(BaseProcess): def __init__(self, debug_options, env_options): self.debug_options = debug_options self.env_options = env_options self._debug_port = None self._transport = None def spawn(self, patterns): # pylint: disable=too-many-branches systype = util.get_systype() server = self.debug_options.get("server") if not server: return None server = self.apply_patterns(server, patterns) server_executable = server['executable'] if not server_executable: return None if server['cwd']: server_executable = join(server['cwd'], server_executable) if ("windows" in systype and not server_executable.endswith(".exe") and isfile(server_executable + ".exe")): server_executable = server_executable + ".exe" if not isfile(server_executable): server_executable = where_is_program(server_executable) if not isfile(server_executable): raise exception.DebugInvalidOptions( "\nCould not launch Debug Server '%s'. Please check that it " "is installed and is included in a system PATH\n\n" "See documentation or contact contact@platformio.org:\n" "http://docs.platformio.org/page/plus/debugging.html\n" % server_executable) self._debug_port = ":3333" openocd_pipe_allowed = all([ not self.debug_options['port'], "openocd" in server_executable ]) # yapf: disable if openocd_pipe_allowed: args = [] if server['cwd']: args.extend(["-s", helpers.escape_path(server['cwd'])]) args.extend([ "-c", "gdb_port pipe; tcl_port disabled; telnet_port disabled" ]) args.extend(server['arguments']) str_args = " ".join( [arg if arg.startswith("-") else '"%s"' % arg for arg in args]) self._debug_port = '| "%s" %s' % ( helpers.escape_path(server_executable), str_args) else: env = os.environ.copy() # prepend server "lib" folder to LD path if ("windows" not in systype and server['cwd'] and isdir(join(server['cwd'], "lib"))): ld_key = ("DYLD_LIBRARY_PATH" if "darwin" in systype else "LD_LIBRARY_PATH") env[ld_key] = join(server['cwd'], "lib") if os.environ.get(ld_key): env[ld_key] = "%s:%s" % (env[ld_key], os.environ.get(ld_key)) # prepend BIN to PATH if server['cwd'] and isdir(join(server['cwd'], "bin")): env['PATH'] = "%s%s%s" % ( join(server['cwd'], "bin"), os.pathsep, os.environ.get("PATH", os.environ.get("Path", ""))) self._transport = reactor.spawnProcess( self, server_executable, [server_executable] + server['arguments'], path=server['cwd'], env=env) if "mspdebug" in server_executable.lower(): self._debug_port = ":2000" elif "jlink" in server_executable.lower(): self._debug_port = ":2331" elif "qemu" in server_executable.lower(): self._debug_port = ":1234" return self._transport def get_debug_port(self): return self._debug_port def terminate(self): if self._transport: self._transport.signalProcess("KILL")