From 743a3e2c02f3d54cb7f2aaa12181e405206c2605 Mon Sep 17 00:00:00 2001 From: Ivan Kravets Date: Wed, 15 Jun 2022 12:51:06 +0300 Subject: [PATCH] Improved a serial port finder for a board with predefined HWIDs // Resolve #4308 --- HISTORY.rst | 1 + platformio/device/finder.py | 63 ++++++++++++++--------- platformio/device/monitor/command.py | 3 ++ platformio/test/runners/readers/serial.py | 13 ++--- platformio/util.py | 44 ++++++++++++++++ 5 files changed, 90 insertions(+), 34 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 2b2fa4cb..b3f0490e 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -22,6 +22,7 @@ PlatformIO Core 6 - Added new `pio device monitor --no-reconnect `__ option to disable automatic reconnection - Handle disconnects more gracefully (`issue #3939 `_) +* Improved a serial port finder for a board with predefined HWIDs * Merged the |UNITTESTING| "building" stage with "uploading" for the embedded target (`issue #4307 `_) * Fixed an issue when a custom `pio test --project-config `__ was not handled properly (`issue #4299 `_) diff --git a/platformio/device/finder.py b/platformio/device/finder.py index 382fe000..126df554 100644 --- a/platformio/device/finder.py +++ b/platformio/device/finder.py @@ -19,6 +19,7 @@ import serial from platformio.compat import IS_WINDOWS from platformio.device.list.util import list_logical_devices, list_serial_ports +from platformio.util import retry def is_pattern_port(port): @@ -44,21 +45,20 @@ def is_serial_port_ready(port, timeout=1): def find_serial_port( - initial_port, board_config=None, upload_protocol=None, ensure_ready=False + initial_port, board_config=None, upload_protocol=None, ensure_ready=False, timeout=3 ): if initial_port: if not is_pattern_port(initial_port): return initial_port return match_serial_port(initial_port) - port = None + if upload_protocol and upload_protocol.startswith("blackmagic"): - port = find_blackmagic_serial_port() - if not port and board_config: - port = find_board_serial_port(board_config) - if port: - return port + return find_blackmagic_serial_port(timeout=timeout) + if board_config and board_config.get("build.hwids"): + return find_board_serial_port(board_config.get("build.hwids"), timeout=timeout) # pick the last PID:VID USB device + port = None usb_port = None for item in list_serial_ports(): if ensure_ready and not is_serial_port_ready(item["port"]): @@ -69,26 +69,41 @@ def find_serial_port( return usb_port or port -def find_blackmagic_serial_port(): - for item in list_serial_ports(): - port = item["port"] - if IS_WINDOWS and port.startswith("COM") and len(port) > 4: - port = "\\\\.\\%s" % port - if "GDB" in item["description"]: - return port +def find_blackmagic_serial_port(timeout=0): + try: + + @retry(timeout=timeout) + def wrapper(): + for item in list_serial_ports(): + port = item["port"] + if IS_WINDOWS and port.startswith("COM") and len(port) > 4: + port = "\\\\.\\%s" % port + if "GDB" in item["description"]: + return port + raise retry.RetryNextException() + + return wrapper() + except retry.RetryStopException: + pass return None -def find_board_serial_port(board_config): - board_hwids = board_config.get("build.hwids", []) - if not board_hwids: - return None - for item in list_serial_ports(filter_hwid=True): - port = item["port"] - for hwid in board_hwids: - hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "") - if hwid_str in item["hwid"]: - return port +def find_board_serial_port(hwids, timeout=0): + try: + + @retry(timeout=timeout) + def wrapper(): + for item in list_serial_ports(filter_hwid=True): + port = item["port"] + for hwid in hwids: + hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "") + if hwid_str in item["hwid"]: + return port + raise retry.RetryNextException() + + return wrapper() + except retry.RetryStopException: + pass return None diff --git a/platformio/device/monitor/command.py b/platformio/device/monitor/command.py index ac74b336..16f3e8df 100644 --- a/platformio/device/monitor/command.py +++ b/platformio/device/monitor/command.py @@ -122,8 +122,11 @@ def device_monitor_cmd(**options): if platform and project_options.get("board") else None, upload_protocol=project_options.get("upload_protocol"), + ensure_ready=True, ) + options["baud"] = options["baud"] or ProjectOptions["env.monitor_speed"].default + if options["menu_char"] == options["exit_char"]: raise exception.UserSideException( "--exit-char can not be the same as --menu-char" diff --git a/platformio/test/runners/readers/serial.py b/platformio/test/runners/readers/serial.py index 5ac5e0f3..e2a7eddb 100644 --- a/platformio/test/runners/readers/serial.py +++ b/platformio/test/runners/readers/serial.py @@ -66,7 +66,7 @@ class SerialTestOutputReader: project_options = self.test_runner.project_config.items( env=self.test_runner.test_suite.env_name, as_dict=True ) - scan_options = dict( + port = find_serial_port( initial_port=self.test_runner.get_test_port(), board_config=self.test_runner.platform.board_config( project_options["board"] @@ -74,15 +74,8 @@ class SerialTestOutputReader: upload_protocol=project_options.get("upload_protocol"), ensure_ready=True, ) - - elapsed = 0 - while elapsed < 5: - port = find_serial_port(**scan_options) - if port: - return port - sleep(0.25) - elapsed += 0.25 - + if port: + return port raise UserSideException( "Please specify `test_port` for environment or use " "global `--test-port` option." diff --git a/platformio/util.py b/platformio/util.py index 1dbac429..56e95d2f 100644 --- a/platformio/util.py +++ b/platformio/util.py @@ -78,6 +78,50 @@ class throttle(object): return wrapper +# Retry: Begin + + +class RetryException(Exception): + pass + + +class RetryNextException(RetryException): + pass + + +class RetryStopException(RetryException): + pass + + +class retry(object): + + RetryNextException = RetryNextException + RetryStopException = RetryStopException + + def __init__(self, timeout=0, step=0.25): + self.timeout = timeout + self.step = step + + def __call__(self, func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + elapsed = 0 + while True: + try: + return func(*args, **kwargs) + except self.RetryNextException: + pass + if elapsed >= self.timeout: + raise self.RetryStopException() + elapsed += self.step + time.sleep(self.step) + + return wrapper + + +# Retry: End + + def singleton(cls): """From PEP-318 http://www.python.org/dev/peps/pep-0318/#examples""" _instances = {}