diff --git a/HISTORY.rst b/HISTORY.rst index 1ac233e5..821dbc4e 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -16,6 +16,7 @@ PlatformIO Core 6 6.1.4 (2022-??-??) ~~~~~~~~~~~~~~~~~~ +* Improved device port finder when using dual channel UART converter (`issue #4367 `_) * Improved project dependency resolving when using the `pio project init --ide `__ command * Fixed an issue when escaping macros/defines for IDE integration (`issue #4360 `_) diff --git a/platformio/builder/tools/pioupload.py b/platformio/builder/tools/pioupload.py index d46b1590..1f51ff5c 100644 --- a/platformio/builder/tools/pioupload.py +++ b/platformio/builder/tools/pioupload.py @@ -26,7 +26,7 @@ from SCons.Script import ARGUMENTS # pylint: disable=import-error from serial import Serial, SerialException from platformio import exception, fs -from platformio.device.finder import find_mbed_disk, find_serial_port, is_pattern_port +from platformio.device.finder import SerialPortFinder, find_mbed_disk, is_pattern_port from platformio.device.list.util import list_serial_ports from platformio.proc import exec_command @@ -112,13 +112,12 @@ def AutodetectUploadPort(*args, **kwargs): except exception.InvalidUdevRules as exc: sys.stderr.write("\n%s\n\n" % exc) env.Replace( - UPLOAD_PORT=find_serial_port( - initial_port=initial_port, + UPLOAD_PORT=SerialPortFinder( board_config=env.BoardConfig() if "BOARD" in env else None, upload_protocol=upload_protocol, prefer_gdb_port="blackmagic" in upload_protocol, verbose=int(ARGUMENTS.get("PIOVERBOSE", 0)), - ) + ).find(initial_port) ) if env.subst("$UPLOAD_PORT"): diff --git a/platformio/debug/config/blackmagic.py b/platformio/debug/config/blackmagic.py index 5a89c2f9..83d98d9c 100644 --- a/platformio/debug/config/blackmagic.py +++ b/platformio/debug/config/blackmagic.py @@ -14,7 +14,7 @@ from platformio.debug.config.base import DebugConfigBase from platformio.debug.exception import DebugInvalidOptionsError -from platformio.device.finder import find_serial_port, is_pattern_port +from platformio.device.finder import SerialPortFinder, is_pattern_port class BlackmagicDebugConfig(DebugConfigBase): @@ -56,12 +56,11 @@ set language auto initial_port = DebugConfigBase.port.fget(self) if initial_port and not is_pattern_port(initial_port): return initial_port - port = find_serial_port( - initial_port, + port = SerialPortFinder( board_config=self.board_config, upload_protocol=self.tool_name, prefer_gdb_port=True, - ) + ).find(initial_port) if port: return port raise DebugInvalidOptionsError( diff --git a/platformio/device/finder.py b/platformio/device/finder.py index 0d7eba90..bf35c542 100644 --- a/platformio/device/finder.py +++ b/platformio/device/finder.py @@ -18,7 +18,7 @@ from fnmatch import fnmatch import click import serial -from platformio.compat import IS_MACOS, IS_WINDOWS +from platformio.compat import IS_WINDOWS from platformio.device.list.util import list_logical_devices, list_serial_ports from platformio.fs import get_platformio_udev_rules_path from platformio.package.manager.platform import PlatformPackageManager @@ -53,190 +53,12 @@ def parse_udev_rules_hwids(path): return result -def normalize_board_hwid(value): - if isinstance(value, (list, tuple)): - value = ("%s:%s" % (value[0], value[1])).replace("0x", "") - return value.upper() - - def is_pattern_port(port): if not port: return False return set(["*", "?", "[", "]"]) & set(port) -def match_serial_port(pattern): - for item in list_serial_ports(): - if fnmatch(item["port"], pattern): - return item["port"] - return None - - -def is_serial_port_ready(port, timeout=1): - try: - serial.Serial(port, timeout=timeout).close() - return True - except: # pylint: disable=bare-except - pass - return False - - -def find_serial_port( # pylint: disable=too-many-arguments - initial_port, - board_config=None, - upload_protocol=None, - ensure_ready=False, - prefer_gdb_port=False, - timeout=2, - verbose=False, -): - if initial_port: - if not is_pattern_port(initial_port): - return initial_port - return match_serial_port(initial_port) - - if upload_protocol and upload_protocol.startswith("blackmagic"): - return find_blackmagic_serial_port(prefer_gdb_port, timeout) - port = None - if board_config and board_config.get("build.hwids", []): - port = find_board_serial_port(board_config, timeout, verbose) - if not port: - port = find_known_uart_port(ensure_ready, timeout, verbose) - if port: - return port - - # pick the best PID:VID USB device - best_port = None - for item in list_serial_ports(): - if ensure_ready and not is_serial_port_ready(item["port"]): - continue - port = item["port"] - if "VID:PID" in item["hwid"]: - best_port = port - return best_port or port - - -def find_blackmagic_serial_port(prefer_gdb_port=False, timeout=0): - try: - - @retry(timeout=timeout) - def wrapper(): - candidates = [] - for item in list_serial_ports(filter_hwid=True): - if ( - not any(hwid in item["hwid"].upper() for hwid in BLACK_MAGIC_HWIDS) - and not "Black Magic" in item["description"] - ): - continue - if ( - IS_WINDOWS - and item["port"].startswith("COM") - and len(item["port"]) > 4 - ): - item["port"] = "\\\\.\\%s" % item["port"] - candidates.append(item) - - if not candidates: - raise retry.RetryNextException() - - for item in candidates: - if ("GDB" if prefer_gdb_port else "UART") in item["description"]: - return item["port"] - if IS_MACOS: - # 1 - GDB, 3 - UART - for item in candidates: - if item["port"].endswith("1" if prefer_gdb_port else "3"): - return item["port"] - - candidates = sorted(candidates, key=lambda item: item["port"]) - return ( - candidates[0] # first port is GDB? - if len(candidates) == 1 or prefer_gdb_port - else candidates[1] - )["port"] - - return wrapper() - except retry.RetryStopException: - pass - return None - - -def find_board_serial_port(board_config, timeout=0, verbose=False): - hwids = board_config.get("build.hwids", []) - try: - - @retry(timeout=timeout) - def wrapper(): - for item in list_serial_ports(filter_hwid=True): - hwid = item["hwid"].upper() - for board_hwid in hwids: - if normalize_board_hwid(board_hwid) in hwid: - return item["port"] - raise retry.RetryNextException() - - return wrapper() - except retry.RetryStopException: - pass - - if verbose: - click.secho( - "TimeoutError: Could not automatically find serial port " - "for the `%s` board based on the declared HWIDs=%s" - % (board_config.get("name", "unknown"), hwids), - fg="yellow", - err=True, - ) - - return None - - -def find_known_uart_port(ensure_ready=False, timeout=0, verbose=False): - known_hwids = list(BLACK_MAGIC_HWIDS) - - # load from UDEV rules - udev_rules_path = get_platformio_udev_rules_path() - if os.path.isfile(udev_rules_path): - known_hwids.extend(parse_udev_rules_hwids(udev_rules_path)) - - # load from installed dev-platforms - for platform in PlatformPackageManager().get_installed(): - p = PlatformFactory.new(platform) - for board_config in p.get_boards().values(): - for board_hwid in board_config.get("build.hwids", []): - board_hwid = normalize_board_hwid(board_hwid) - if board_hwid not in known_hwids: - known_hwids.append(board_hwid) - - try: - - @retry(timeout=timeout) - def wrapper(): - for item in list_serial_ports(as_objects=True): - if not item.vid or not item.pid: - continue - hwid = "{:04X}:{:04X}".format(item.vid, item.pid) - for pattern in known_hwids: - if fnmatch(hwid, pattern) and ( - not ensure_ready or is_serial_port_ready(item.device) - ): - return item.device - raise retry.RetryNextException() - - return wrapper() - except retry.RetryStopException: - pass - - if verbose: - click.secho( - "TimeoutError: Could not automatically find serial port " - "based on the known UART bridges", - fg="yellow", - err=True, - ) - - return None - - def find_mbed_disk(initial_port): msdlabels = ("mbed", "nucleo", "frdm", "microbit") for item in list_logical_devices(): @@ -254,3 +76,173 @@ def find_mbed_disk(initial_port): if item["name"] and any(l in item["name"].lower() for l in msdlabels): return item["path"] return None + + +def is_serial_port_ready(port, timeout=1): + try: + serial.Serial(port, timeout=timeout).close() + return True + except: # pylint: disable=bare-except + pass + return False + + +class SerialPortFinder: + def __init__( # pylint: disable=too-many-arguments + self, + board_config=None, + upload_protocol=None, + ensure_ready=False, + prefer_gdb_port=False, + timeout=2, + verbose=False, + ): + self.board_config = board_config + self.upload_protocol = upload_protocol + self.ensure_ready = ensure_ready + self.prefer_gdb_port = prefer_gdb_port + self.timeout = timeout + self.verbose = verbose + + @staticmethod + def normalize_board_hwid(value): + if isinstance(value, (list, tuple)): + value = ("%s:%s" % (value[0], value[1])).replace("0x", "") + return value.upper() + + @staticmethod + def match_serial_port(pattern): + for item in list_serial_ports(): + if fnmatch(item["port"], pattern): + return item["port"] + return None + + @staticmethod + def match_device_hwid(patterns): + for item in list_serial_ports(as_objects=True): + if not item.vid or not item.pid: + continue + hwid = "{:04X}:{:04X}".format(item.vid, item.pid) + for pattern in patterns: + if fnmatch(hwid, pattern): + return item + return None + + def find(self, initial_port=None): + if initial_port: + if not is_pattern_port(initial_port): + return initial_port + return self.match_serial_port(initial_port) + + if self.upload_protocol and self.upload_protocol.startswith("blackmagic"): + return self._find_blackmagic_port() + + device = None + if self.board_config and self.board_config.get("build.hwids", []): + device = self._find_board_device() + if not device: + device = self._find_known_device() + if device: + port = self._reveal_device_port(device) + + # pick the best PID:VID USB device + best_port = None + for item in list_serial_ports(): + if self.ensure_ready and not is_serial_port_ready(item["port"]): + continue + port = item["port"] + if "VID:PID" in item["hwid"]: + best_port = port + return best_port or port + + def _reveal_device_port(self, device): + candidates = [] + for item in list_serial_ports(as_objects=True): + if item.vid == device.vid and item.pid == device.pid: + candidates.append(item) + if len(candidates) == 1: + return device.device + for item in candidates: + if ("GDB" if self.prefer_gdb_port else "UART") in item.description: + return item.device + candidates = sorted(candidates, key=lambda item: item.device) + # first port is GDB? BlackMagic, ESP-Prog + return candidates[0 if self.prefer_gdb_port else -1].device + + def _find_blackmagic_port(self): + device = self.match_device_hwid(BLACK_MAGIC_HWIDS) + if not device: + return None + port = self._reveal_device_port(device) + if IS_WINDOWS and port.startswith("COM") and len(port) > 4: + return "\\\\.\\%s" % port + return port + + def _find_board_device(self): + hwids = [ + self.normalize_board_hwid(hwid) + for hwid in self.board_config.get("build.hwids", []) + ] + try: + + @retry(timeout=self.timeout) + def wrapper(): + device = self.match_device_hwid(hwids) + if device: + return device + raise retry.RetryNextException() + + return wrapper() + except retry.RetryStopException: + pass + + if self.verbose: + click.secho( + "TimeoutError: Could not automatically find serial port " + "for the `%s` board based on the declared HWIDs=%s" + % (self.board_config.get("name", "unknown"), hwids), + fg="yellow", + err=True, + ) + + return None + + def _find_known_device(self): + hwids = list(BLACK_MAGIC_HWIDS) + + # load from UDEV rules + udev_rules_path = get_platformio_udev_rules_path() + if os.path.isfile(udev_rules_path): + hwids.extend(parse_udev_rules_hwids(udev_rules_path)) + + # load from installed dev-platforms + for platform in PlatformPackageManager().get_installed(): + p = PlatformFactory.new(platform) + for board_config in p.get_boards().values(): + for board_hwid in board_config.get("build.hwids", []): + board_hwid = self.normalize_board_hwid(board_hwid) + if board_hwid not in hwids: + hwids.append(board_hwid) + + try: + + @retry(timeout=self.timeout) + def wrapper(): + device = self.match_device_hwid(hwids) + if device: + return device + raise retry.RetryNextException() + + return wrapper() + except retry.RetryStopException: + pass + + if self.verbose: + click.secho( + "TimeoutError: Could not automatically find serial port " + "based on the known UART bridges", + fg="yellow", + err=True, + ) + + return None diff --git a/platformio/device/monitor/command.py b/platformio/device/monitor/command.py index 8ba61677..301de772 100644 --- a/platformio/device/monitor/command.py +++ b/platformio/device/monitor/command.py @@ -18,7 +18,7 @@ import sys import click from platformio import exception, fs -from platformio.device.finder import find_serial_port +from platformio.device.finder import SerialPortFinder from platformio.device.monitor.filters.base import register_filters from platformio.device.monitor.terminal import get_available_filters, start_terminal from platformio.platform.factory import PlatformFactory @@ -124,14 +124,13 @@ def device_monitor_cmd(**options): options = apply_project_monitor_options(options, project_options) register_filters(platform=platform, options=options) - options["port"] = find_serial_port( - initial_port=options["port"], + options["port"] = SerialPortFinder( board_config=platform.board_config(project_options.get("board")) if platform and project_options.get("board") else None, upload_protocol=project_options.get("upload_protocol"), ensure_ready=True, - ) + ).find(initial_port=options["port"]) if options["menu_char"] == options["exit_char"]: raise exception.UserSideException( diff --git a/platformio/test/runners/readers/serial.py b/platformio/test/runners/readers/serial.py index ac7765bc..ab35775b 100644 --- a/platformio/test/runners/readers/serial.py +++ b/platformio/test/runners/readers/serial.py @@ -17,7 +17,7 @@ from time import sleep import click import serial -from platformio.device.finder import find_serial_port +from platformio.device.finder import SerialPortFinder from platformio.exception import UserSideException @@ -66,15 +66,14 @@ class SerialTestOutputReader: project_options = self.test_runner.project_config.items( env=self.test_runner.test_suite.env_name, as_dict=True ) - port = find_serial_port( - initial_port=self.test_runner.get_test_port(), + port = SerialPortFinder( board_config=self.test_runner.platform.board_config( project_options["board"] ), upload_protocol=project_options.get("upload_protocol"), ensure_ready=True, verbose=self.test_runner.options.verbose, - ) + ).find(initial_port=self.test_runner.get_test_port()) if port: return port raise UserSideException(