forked from platformio/platformio-core
Improved automatic detection of a testing serial port // Resolve #4076
This commit is contained in:
@ -60,6 +60,7 @@ Please check the `Migration guide from 5.x to 6.0 <https://docs.platformio.org/e
|
||||
- Pass extra arguments to the testing program with a new `pio test --program-arg <https://docs.platformio.org/en/latest/core/userguide/cmd_test.html#cmdoption-pio-test-a>`__ option (`issue #3132 <https://github.com/platformio/platformio-core/issues/3132>`_)
|
||||
- Generate reports in JUnit and JSON formats using the `pio test <https://docs.platformio.org/en/latest/core/userguide/cmd_test.html>`__ command (`issue #2891 <https://github.com/platformio/platformio-core/issues/2891>`_)
|
||||
- Provide more information when the native program crashed on a host (errored with a non-zero return code) (`issue #3429 <https://github.com/platformio/platformio-core/issues/3429>`_)
|
||||
- Improved automatic detection of a testing serial port (`issue #4076 <https://github.com/platformio/platformio-core/issues/4076>`_)
|
||||
- Fixed an issue when command line parameters (``--ignore``, ``--filter``) do not override values defined in the |PIOCONF| (`issue #3845 <https://github.com/platformio/platformio-core/issues/3845>`_)
|
||||
- Renamed the "test_build_project_src" project configuration option to the `test_build_src <https://docs.platformio.org/en/latest//projectconf/section_env_test.html#test-build-src>`__
|
||||
- Removed the "test_transport" option in favor of the `Custom "unity_config.h" <https://docs.platformio.org/en/latest/advanced/unit-testing/frameworks/unity.html>`_
|
||||
|
2
docs
2
docs
Submodule docs updated: 1bf2eb97b3...a57afb023a
@ -14,14 +14,13 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
from fnmatch import fnmatch
|
||||
|
||||
import click
|
||||
from serial.tools import miniterm
|
||||
|
||||
from platformio import exception, fs
|
||||
from platformio.device.filters.base import register_filters
|
||||
from platformio.device.list import list_serial_ports
|
||||
from platformio.device.serial import scan_serial_port
|
||||
from platformio.platform.factory import PlatformFactory
|
||||
from platformio.project.config import ProjectConfig
|
||||
from platformio.project.exception import NotPlatformIOProjectError
|
||||
@ -100,40 +99,22 @@ from platformio.project.options import ProjectOptions
|
||||
def device_monitor_cmd(**kwargs): # pylint: disable=too-many-branches
|
||||
project_options = {}
|
||||
platform = None
|
||||
try:
|
||||
with fs.cd(kwargs["project_dir"]):
|
||||
with fs.cd(kwargs["project_dir"]):
|
||||
try:
|
||||
project_options = get_project_options(kwargs["environment"])
|
||||
kwargs = apply_project_monitor_options(kwargs, project_options)
|
||||
if "platform" in project_options:
|
||||
platform = PlatformFactory.new(project_options["platform"])
|
||||
except NotPlatformIOProjectError:
|
||||
pass
|
||||
|
||||
with fs.cd(kwargs["project_dir"]):
|
||||
except NotPlatformIOProjectError:
|
||||
pass
|
||||
register_filters(platform=platform, options=kwargs)
|
||||
|
||||
if not kwargs["port"]:
|
||||
ports = list_serial_ports(filter_hwid=True)
|
||||
if len(ports) == 1:
|
||||
kwargs["port"] = ports[0]["port"]
|
||||
elif "platform" in project_options and "board" in project_options:
|
||||
with fs.cd(kwargs["project_dir"]):
|
||||
board_hwids = platform.board_config(project_options["board"]).get(
|
||||
"build.hwids", []
|
||||
)
|
||||
for item in ports:
|
||||
for hwid in board_hwids:
|
||||
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
|
||||
if hwid_str in item["hwid"]:
|
||||
kwargs["port"] = item["port"]
|
||||
break
|
||||
if kwargs["port"]:
|
||||
break
|
||||
elif kwargs["port"] and (set(["*", "?", "[", "]"]) & set(kwargs["port"])):
|
||||
for item in list_serial_ports():
|
||||
if fnmatch(item["port"], kwargs["port"]):
|
||||
kwargs["port"] = item["port"]
|
||||
break
|
||||
kwargs["port"] = scan_serial_port(
|
||||
initial_port=kwargs["port"],
|
||||
board_config=platform.board_config(project_options.get("board"))
|
||||
if platform and project_options.get("board")
|
||||
else None,
|
||||
upload_protocol=project_options.get("upload_port"),
|
||||
)
|
||||
|
||||
# override system argv with patched options
|
||||
sys.argv = ["monitor"] + project_options_to_monitor_argv(
|
||||
|
@ -1,13 +0,0 @@
|
||||
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
91
platformio/device/serial.py
Normal file
91
platformio/device/serial.py
Normal file
@ -0,0 +1,91 @@
|
||||
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from fnmatch import fnmatch
|
||||
|
||||
import serial
|
||||
|
||||
from platformio.compat import IS_WINDOWS
|
||||
from platformio.device.list import list_serial_ports
|
||||
|
||||
|
||||
def is_pattern_port(port):
|
||||
if not port:
|
||||
return False
|
||||
return set(["*", "?", "[", "]"]) & set(port)
|
||||
|
||||
|
||||
def is_serial_port_ready(port, timeout=1):
|
||||
try:
|
||||
serial.Serial(port, timeout=timeout).close()
|
||||
return True
|
||||
except: # pylint: disable=bare-except
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def scan_serial_port(
|
||||
initial_port, board_config=None, upload_protocol=None, ensure_ready=False
|
||||
):
|
||||
if initial_port:
|
||||
if not is_pattern_port(initial_port):
|
||||
return initial_port
|
||||
return match_serial_port(initial_port)
|
||||
port = None
|
||||
if upload_protocol and upload_protocol.startswith("blackmagic"):
|
||||
port = scan_blackmagic_serial_port()
|
||||
if not port and board_config:
|
||||
port = scan_board_serial_port(board_config)
|
||||
if port:
|
||||
return port
|
||||
|
||||
# pick the first PID:VID USB device
|
||||
for item in list_serial_ports():
|
||||
if ensure_ready and not is_serial_port_ready(item["port"]):
|
||||
continue
|
||||
port = item["port"]
|
||||
if "VID:PID" in item["hwid"]:
|
||||
return port
|
||||
|
||||
return port
|
||||
|
||||
|
||||
def match_serial_port(pattern):
|
||||
for item in list_serial_ports():
|
||||
if fnmatch(item["port"], pattern):
|
||||
return item["port"]
|
||||
return None
|
||||
|
||||
|
||||
def scan_blackmagic_serial_port():
|
||||
for item in list_serial_ports():
|
||||
port = item["port"]
|
||||
if IS_WINDOWS and port.startswith("COM") and len(port) > 4:
|
||||
port = "\\\\.\\%s" % port
|
||||
if "GDB" in item["description"]:
|
||||
return port
|
||||
return None
|
||||
|
||||
|
||||
def scan_board_serial_port(board_config):
|
||||
board_hwids = board_config.get("build.hwids", [])
|
||||
if not board_hwids:
|
||||
return None
|
||||
for item in list_serial_ports(filter_hwid=True):
|
||||
port = item["port"]
|
||||
for hwid in board_hwids:
|
||||
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
|
||||
if hwid_str in item["hwid"]:
|
||||
return port
|
||||
return None
|
@ -17,7 +17,7 @@ from time import sleep
|
||||
import click
|
||||
import serial
|
||||
|
||||
from platformio.device.list import list_serial_ports
|
||||
from platformio.device.serial import scan_serial_port
|
||||
from platformio.exception import UserSideException
|
||||
|
||||
|
||||
@ -37,7 +37,7 @@ class SerialTestOutputReader:
|
||||
|
||||
try:
|
||||
ser = serial.serial_for_url(
|
||||
self.test_runner.get_test_port() or self.autodetect_test_port(),
|
||||
self.resolve_test_port(),
|
||||
do_not_open=True,
|
||||
baudrate=self.test_runner.get_test_speed(),
|
||||
timeout=self.SERIAL_TIMEOUT,
|
||||
@ -62,48 +62,28 @@ class SerialTestOutputReader:
|
||||
self.test_runner.on_testing_data_output(ser.read(ser.in_waiting or 1))
|
||||
ser.close()
|
||||
|
||||
def autodetect_test_port(self):
|
||||
board = self.test_runner.project_config.get(
|
||||
f"env:{self.test_runner.test_suite.env_name}", "board"
|
||||
def resolve_test_port(self):
|
||||
project_options = self.test_runner.project_config.items(
|
||||
env=self.test_runner.test_suite.env_name, as_dict=True
|
||||
)
|
||||
board_hwids = self.test_runner.platform.board_config(board).get(
|
||||
"build.hwids", []
|
||||
scan_options = dict(
|
||||
initial_port=self.test_runner.get_test_port(),
|
||||
board_config=self.test_runner.platform.board_config(
|
||||
project_options["board"]
|
||||
),
|
||||
upload_protocol=project_options.get("upload_port"),
|
||||
ensure_ready=True,
|
||||
)
|
||||
port = None
|
||||
|
||||
elapsed = 0
|
||||
while elapsed < 5 and not port:
|
||||
for item in list_serial_ports():
|
||||
port = item["port"]
|
||||
for hwid in board_hwids:
|
||||
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
|
||||
if hwid_str in item["hwid"] and self.is_serial_port_ready(port):
|
||||
return port
|
||||
while elapsed < 5:
|
||||
port = scan_serial_port(**scan_options)
|
||||
if port:
|
||||
return port
|
||||
sleep(0.25)
|
||||
elapsed += 0.25
|
||||
|
||||
if port and not self.is_serial_port_ready(port):
|
||||
port = None
|
||||
|
||||
if not port:
|
||||
sleep(0.25)
|
||||
elapsed += 0.25
|
||||
|
||||
if not port:
|
||||
raise UserSideException(
|
||||
"Please specify `test_port` for environment or use "
|
||||
"global `--test-port` option."
|
||||
)
|
||||
return port
|
||||
|
||||
@staticmethod
|
||||
def is_serial_port_ready(port, timeout=3):
|
||||
if not port:
|
||||
return False
|
||||
elapsed = 0
|
||||
while elapsed < timeout:
|
||||
try:
|
||||
serial.Serial(port, timeout=1).close()
|
||||
return True
|
||||
except: # pylint: disable=bare-except
|
||||
pass
|
||||
sleep(1)
|
||||
elapsed += 1
|
||||
return False
|
||||
raise UserSideException(
|
||||
"Please specify `test_port` for environment or use "
|
||||
"global `--test-port` option."
|
||||
)
|
||||
|
Reference in New Issue
Block a user