forked from platformio/platformio-core
		
	
		
			
				
	
	
		
			298 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			298 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import json
 | 
						|
import sys
 | 
						|
import time
 | 
						|
from contextlib import contextmanager
 | 
						|
from fnmatch import fnmatch
 | 
						|
from hashlib import sha1
 | 
						|
from io import BytesIO
 | 
						|
from os.path import isfile
 | 
						|
 | 
						|
from platformio import VERSION, exception, util
 | 
						|
from platformio.commands.platform import \
 | 
						|
    platform_install as cmd_platform_install
 | 
						|
from platformio.commands.run import cli as cmd_run
 | 
						|
from platformio.managers.platform import PlatformFactory
 | 
						|
 | 
						|
 | 
						|
class GDBBytesIO(BytesIO):  # pylint: disable=too-few-public-methods
 | 
						|
 | 
						|
    STDOUT = sys.stdout
 | 
						|
 | 
						|
    def write(self, text):
 | 
						|
        for line in text.strip().split("\n"):
 | 
						|
            self.STDOUT.write('~"%s\\n"\n' % line)
 | 
						|
        self.STDOUT.flush()
 | 
						|
 | 
						|
 | 
						|
def is_mi_mode(args):
 | 
						|
    return "--interpreter" in " ".join(args)
 | 
						|
 | 
						|
 | 
						|
def escape_path(path):
 | 
						|
    return path.replace("\\", "/")
 | 
						|
 | 
						|
 | 
						|
def get_default_debug_env(config):
 | 
						|
    default_envs = config.default_envs()
 | 
						|
    return default_envs[0] if default_envs else config.envs()[0]
 | 
						|
 | 
						|
 | 
						|
def validate_debug_options(cmd_ctx, env_options):
 | 
						|
 | 
						|
    def _cleanup_cmds(cmds):
 | 
						|
        if not cmds:
 | 
						|
            return []
 | 
						|
        if not isinstance(cmds, list):
 | 
						|
            cmds = cmds.split("\n")
 | 
						|
        return [c.strip() for c in cmds if c.strip()]
 | 
						|
 | 
						|
    try:
 | 
						|
        platform = PlatformFactory.newPlatform(env_options['platform'])
 | 
						|
    except exception.UnknownPlatform:
 | 
						|
        cmd_ctx.invoke(
 | 
						|
            cmd_platform_install,
 | 
						|
            platforms=[env_options['platform']],
 | 
						|
            skip_default_package=True)
 | 
						|
        platform = PlatformFactory.newPlatform(env_options['platform'])
 | 
						|
 | 
						|
    board_config = platform.board_config(env_options['board'])
 | 
						|
    tool_name = board_config.get_debug_tool_name(env_options.get("debug_tool"))
 | 
						|
    tool_settings = board_config.get("debug", {}).get("tools", {}).get(
 | 
						|
        tool_name, {})
 | 
						|
    server_options = None
 | 
						|
 | 
						|
    # specific server per a system
 | 
						|
    if isinstance(tool_settings.get("server", {}), list):
 | 
						|
        for item in tool_settings['server'][:]:
 | 
						|
            tool_settings['server'] = item
 | 
						|
            if util.get_systype() in item.get("system", []):
 | 
						|
                break
 | 
						|
 | 
						|
    # user overwrites debug server
 | 
						|
    if env_options.get("debug_server"):
 | 
						|
        server_options = {
 | 
						|
            "cwd": None,
 | 
						|
            "executable": None,
 | 
						|
            "arguments": env_options.get("debug_server")
 | 
						|
        }
 | 
						|
        if not isinstance(server_options['arguments'], list):
 | 
						|
            server_options['arguments'] = server_options['arguments'].split(
 | 
						|
                "\n")
 | 
						|
        server_options['arguments'] = [
 | 
						|
            arg.strip() for arg in server_options['arguments'] if arg.strip()
 | 
						|
        ]
 | 
						|
        server_options['executable'] = server_options['arguments'][0]
 | 
						|
        server_options['arguments'] = server_options['arguments'][1:]
 | 
						|
    elif "server" in tool_settings:
 | 
						|
        server_package = tool_settings['server'].get("package")
 | 
						|
        server_package_dir = platform.get_package_dir(
 | 
						|
            server_package) if server_package else None
 | 
						|
        if server_package and not server_package_dir:
 | 
						|
            platform.install_packages(
 | 
						|
                with_packages=[server_package],
 | 
						|
                skip_default_package=True,
 | 
						|
                silent=True)
 | 
						|
            server_package_dir = platform.get_package_dir(server_package)
 | 
						|
        server_options = dict(
 | 
						|
            cwd=server_package_dir if server_package else None,
 | 
						|
            executable=tool_settings['server'].get("executable"),
 | 
						|
            arguments=[
 | 
						|
                a.replace("$PACKAGE_DIR", escape_path(server_package_dir))
 | 
						|
                if server_package_dir else a
 | 
						|
                for a in tool_settings['server'].get("arguments", [])
 | 
						|
            ])
 | 
						|
 | 
						|
    extra_cmds = _cleanup_cmds(env_options.get("debug_extra_cmds"))
 | 
						|
    extra_cmds.extend(_cleanup_cmds(tool_settings.get("extra_cmds")))
 | 
						|
    result = dict(
 | 
						|
        tool=tool_name,
 | 
						|
        upload_protocol=env_options.get(
 | 
						|
            "upload_protocol",
 | 
						|
            board_config.get("upload", {}).get("protocol")),
 | 
						|
        load_cmd=env_options.get("debug_load_cmd",
 | 
						|
                                 tool_settings.get("load_cmd", "load")),
 | 
						|
        load_mode=env_options.get("debug_load_mode",
 | 
						|
                                  tool_settings.get("load_mode", "always")),
 | 
						|
        init_break=env_options.get(
 | 
						|
            "debug_init_break", tool_settings.get("init_break",
 | 
						|
                                                  "tbreak main")),
 | 
						|
        init_cmds=_cleanup_cmds(
 | 
						|
            env_options.get("debug_init_cmds",
 | 
						|
                            tool_settings.get("init_cmds"))),
 | 
						|
        extra_cmds=extra_cmds,
 | 
						|
        require_debug_port=tool_settings.get("require_debug_port", False),
 | 
						|
        port=reveal_debug_port(
 | 
						|
            env_options.get("debug_port", tool_settings.get("port")),
 | 
						|
            tool_name, tool_settings),
 | 
						|
        server=server_options)
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def predebug_project(ctx, project_dir, env_name, preload, verbose):
 | 
						|
    ctx.invoke(
 | 
						|
        cmd_run,
 | 
						|
        project_dir=project_dir,
 | 
						|
        environment=[env_name],
 | 
						|
        target=["__debug"] + (["upload"] if preload else []),
 | 
						|
        verbose=verbose)
 | 
						|
    if preload:
 | 
						|
        time.sleep(5)
 | 
						|
 | 
						|
 | 
						|
@contextmanager
 | 
						|
def capture_std_streams(stdout, stderr=None):
 | 
						|
    _stdout = sys.stdout
 | 
						|
    _stderr = sys.stderr
 | 
						|
    sys.stdout = stdout
 | 
						|
    sys.stderr = stderr or stdout
 | 
						|
    yield
 | 
						|
    sys.stdout = _stdout
 | 
						|
    sys.stderr = _stderr
 | 
						|
 | 
						|
 | 
						|
def load_configuration(ctx, project_dir, env_name):
 | 
						|
    output = BytesIO()
 | 
						|
    with capture_std_streams(output):
 | 
						|
        ctx.invoke(
 | 
						|
            cmd_run,
 | 
						|
            project_dir=project_dir,
 | 
						|
            environment=[env_name],
 | 
						|
            target=["idedata"])
 | 
						|
    result = output.getvalue().decode()
 | 
						|
    output.close()
 | 
						|
    if '"includes":' not in result:
 | 
						|
        return None
 | 
						|
    for line in result.split("\n"):
 | 
						|
        line = line.strip()
 | 
						|
        if line.startswith('{"') and "cxx_path" in line:
 | 
						|
            return json.loads(line[:line.rindex("}") + 1])
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
def configure_esp32_load_cmd(debug_options, configuration):
 | 
						|
    ignore_conds = [
 | 
						|
        debug_options['load_cmd'] != "load",
 | 
						|
        "xtensa-esp32" not in configuration.get("cc_path", ""),
 | 
						|
        not configuration.get("flash_extra_images"), not all([
 | 
						|
            isfile(item['path'])
 | 
						|
            for item in configuration.get("flash_extra_images")
 | 
						|
        ])
 | 
						|
    ]
 | 
						|
    if any(ignore_conds):
 | 
						|
        return debug_options['load_cmd']
 | 
						|
 | 
						|
    mon_cmds = [
 | 
						|
        'monitor program_esp32 "{{{path}}}" {offset} verify'.format(
 | 
						|
            path=escape_path(item['path']), offset=item['offset'])
 | 
						|
        for item in configuration.get("flash_extra_images")
 | 
						|
    ]
 | 
						|
    mon_cmds.append('monitor program_esp32 "{%s.bin}" 0x10000 verify' %
 | 
						|
                    escape_path(configuration['prog_path'][:-4]))
 | 
						|
    return "\n".join(mon_cmds)
 | 
						|
 | 
						|
 | 
						|
def has_debug_symbols(prog_path):
 | 
						|
    if not isfile(prog_path):
 | 
						|
        return False
 | 
						|
    matched = {
 | 
						|
        b".debug_info": False,
 | 
						|
        b".debug_abbrev": False,
 | 
						|
        b" -Og": False,
 | 
						|
        b" -g": False,
 | 
						|
        b"__PLATFORMIO_DEBUG__": (3, 6) > VERSION[:2]
 | 
						|
    }
 | 
						|
    with open(prog_path, "rb") as fp:
 | 
						|
        last_data = b""
 | 
						|
        while True:
 | 
						|
            data = fp.read(1024)
 | 
						|
            if not data:
 | 
						|
                break
 | 
						|
            for pattern, found in matched.items():
 | 
						|
                if found:
 | 
						|
                    continue
 | 
						|
                if pattern in last_data + data:
 | 
						|
                    matched[pattern] = True
 | 
						|
            last_data = data
 | 
						|
    return all(matched.values())
 | 
						|
 | 
						|
 | 
						|
def is_prog_obsolete(prog_path):
 | 
						|
    prog_hash_path = prog_path + ".sha1"
 | 
						|
    if not isfile(prog_path):
 | 
						|
        return True
 | 
						|
    shasum = sha1()
 | 
						|
    with open(prog_path, "rb") as fp:
 | 
						|
        while True:
 | 
						|
            data = fp.read(1024)
 | 
						|
            if not data:
 | 
						|
                break
 | 
						|
            shasum.update(data)
 | 
						|
    new_digest = shasum.hexdigest()
 | 
						|
    old_digest = None
 | 
						|
    if isfile(prog_hash_path):
 | 
						|
        with open(prog_hash_path, "r") as fp:
 | 
						|
            old_digest = fp.read()
 | 
						|
    if new_digest == old_digest:
 | 
						|
        return False
 | 
						|
    with open(prog_hash_path, "w") as fp:
 | 
						|
        fp.write(new_digest)
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def reveal_debug_port(env_debug_port, tool_name, tool_settings):
 | 
						|
 | 
						|
    def _get_pattern():
 | 
						|
        if not env_debug_port:
 | 
						|
            return None
 | 
						|
        if set(["*", "?", "[", "]"]) & set(env_debug_port):
 | 
						|
            return env_debug_port
 | 
						|
        return None
 | 
						|
 | 
						|
    def _is_match_pattern(port):
 | 
						|
        pattern = _get_pattern()
 | 
						|
        if not pattern:
 | 
						|
            return True
 | 
						|
        return fnmatch(port, pattern)
 | 
						|
 | 
						|
    def _look_for_serial_port(hwids):
 | 
						|
        for item in util.get_serialports(filter_hwid=True):
 | 
						|
            if not _is_match_pattern(item['port']):
 | 
						|
                continue
 | 
						|
            port = item['port']
 | 
						|
            if tool_name.startswith("blackmagic"):
 | 
						|
                if "windows" in util.get_systype() and \
 | 
						|
                        port.startswith("COM") and len(port) > 4:
 | 
						|
                    port = "\\\\.\\%s" % port
 | 
						|
                if "GDB" in item['description']:
 | 
						|
                    return port
 | 
						|
            for hwid in hwids:
 | 
						|
                hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
 | 
						|
                if hwid_str in item['hwid']:
 | 
						|
                    return port
 | 
						|
        return None
 | 
						|
 | 
						|
    if env_debug_port and not _get_pattern():
 | 
						|
        return env_debug_port
 | 
						|
    if not tool_settings.get("require_debug_port"):
 | 
						|
        return None
 | 
						|
 | 
						|
    debug_port = _look_for_serial_port(tool_settings.get("hwids", []))
 | 
						|
    if not debug_port:
 | 
						|
        raise exception.DebugInvalidOptions(
 | 
						|
            "Please specify `debug_port` for environment")
 | 
						|
    return debug_port
 |