Improved a serial port finder for a board with predefined HWIDs // Resolve #4308

This commit is contained in:
Ivan Kravets
2022-06-15 12:51:06 +03:00
parent bd21ff0d3e
commit 743a3e2c02
5 changed files with 90 additions and 34 deletions

View File

@ -22,6 +22,7 @@ PlatformIO Core 6
- Added new `pio device monitor --no-reconnect <https://docs.platformio.org/en/latest/core/userguide/device/cmd_monitor.html#cmdoption-pio-device-monitor-no-reconnect>`__ option to disable automatic reconnection
- Handle disconnects more gracefully (`issue #3939 <https://github.com/platformio/platformio-core/issues/3939>`_)
* Improved a serial port finder for a board with predefined HWIDs
* Merged the |UNITTESTING| "building" stage with "uploading" for the embedded target (`issue #4307 <https://github.com/platformio/platformio-core/issues/4307>`_)
* Fixed an issue when a custom `pio test --project-config <https://docs.platformio.org/en/latest/core/userguide/cmd_test.html#cmdoption-pio-test-c>`__ was not handled properly (`issue #4299 <https://github.com/platformio/platformio-core/issues/4299>`_)

View File

@ -19,6 +19,7 @@ import serial
from platformio.compat import IS_WINDOWS
from platformio.device.list.util import list_logical_devices, list_serial_ports
from platformio.util import retry
def is_pattern_port(port):
@ -44,21 +45,20 @@ def is_serial_port_ready(port, timeout=1):
def find_serial_port(
initial_port, board_config=None, upload_protocol=None, ensure_ready=False
initial_port, board_config=None, upload_protocol=None, ensure_ready=False, timeout=3
):
if initial_port:
if not is_pattern_port(initial_port):
return initial_port
return match_serial_port(initial_port)
port = None
if upload_protocol and upload_protocol.startswith("blackmagic"):
port = find_blackmagic_serial_port()
if not port and board_config:
port = find_board_serial_port(board_config)
if port:
return port
return find_blackmagic_serial_port(timeout=timeout)
if board_config and board_config.get("build.hwids"):
return find_board_serial_port(board_config.get("build.hwids"), timeout=timeout)
# pick the last PID:VID USB device
port = None
usb_port = None
for item in list_serial_ports():
if ensure_ready and not is_serial_port_ready(item["port"]):
@ -69,26 +69,41 @@ def find_serial_port(
return usb_port or port
def find_blackmagic_serial_port():
for item in list_serial_ports():
port = item["port"]
if IS_WINDOWS and port.startswith("COM") and len(port) > 4:
port = "\\\\.\\%s" % port
if "GDB" in item["description"]:
return port
def find_blackmagic_serial_port(timeout=0):
try:
@retry(timeout=timeout)
def wrapper():
for item in list_serial_ports():
port = item["port"]
if IS_WINDOWS and port.startswith("COM") and len(port) > 4:
port = "\\\\.\\%s" % port
if "GDB" in item["description"]:
return port
raise retry.RetryNextException()
return wrapper()
except retry.RetryStopException:
pass
return None
def find_board_serial_port(board_config):
board_hwids = board_config.get("build.hwids", [])
if not board_hwids:
return None
for item in list_serial_ports(filter_hwid=True):
port = item["port"]
for hwid in board_hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item["hwid"]:
return port
def find_board_serial_port(hwids, timeout=0):
try:
@retry(timeout=timeout)
def wrapper():
for item in list_serial_ports(filter_hwid=True):
port = item["port"]
for hwid in hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item["hwid"]:
return port
raise retry.RetryNextException()
return wrapper()
except retry.RetryStopException:
pass
return None

View File

@ -122,8 +122,11 @@ def device_monitor_cmd(**options):
if platform and project_options.get("board")
else None,
upload_protocol=project_options.get("upload_protocol"),
ensure_ready=True,
)
options["baud"] = options["baud"] or ProjectOptions["env.monitor_speed"].default
if options["menu_char"] == options["exit_char"]:
raise exception.UserSideException(
"--exit-char can not be the same as --menu-char"

View File

@ -66,7 +66,7 @@ class SerialTestOutputReader:
project_options = self.test_runner.project_config.items(
env=self.test_runner.test_suite.env_name, as_dict=True
)
scan_options = dict(
port = find_serial_port(
initial_port=self.test_runner.get_test_port(),
board_config=self.test_runner.platform.board_config(
project_options["board"]
@ -74,15 +74,8 @@ class SerialTestOutputReader:
upload_protocol=project_options.get("upload_protocol"),
ensure_ready=True,
)
elapsed = 0
while elapsed < 5:
port = find_serial_port(**scan_options)
if port:
return port
sleep(0.25)
elapsed += 0.25
if port:
return port
raise UserSideException(
"Please specify `test_port` for environment or use "
"global `--test-port` option."

View File

@ -78,6 +78,50 @@ class throttle(object):
return wrapper
# Retry: Begin
class RetryException(Exception):
pass
class RetryNextException(RetryException):
pass
class RetryStopException(RetryException):
pass
class retry(object):
RetryNextException = RetryNextException
RetryStopException = RetryStopException
def __init__(self, timeout=0, step=0.25):
self.timeout = timeout
self.step = step
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
elapsed = 0
while True:
try:
return func(*args, **kwargs)
except self.RetryNextException:
pass
if elapsed >= self.timeout:
raise self.RetryStopException()
elapsed += self.step
time.sleep(self.step)
return wrapper
# Retry: End
def singleton(cls):
"""From PEP-318 http://www.python.org/dev/peps/pep-0318/#examples"""
_instances = {}