Regroup device command

This commit is contained in:
Ivan Kravets
2022-05-14 18:21:44 +03:00
parent 19853b0b66
commit d22b479bd3
24 changed files with 614 additions and 514 deletions

2
docs

Submodule docs updated: def7ca7a23...1bf2eb97b3

View File

@ -25,8 +25,9 @@ from time import sleep
from SCons.Script import ARGUMENTS # pylint: disable=import-error
from serial import Serial, SerialException
from platformio import exception, fs, util
from platformio import exception, fs
from platformio.compat import IS_WINDOWS
from platformio.device.list import list_logical_devices, list_serial_ports
from platformio.proc import exec_command
# pylint: disable=unused-argument
@ -62,7 +63,7 @@ def WaitForNewSerialPort(env, before):
elapsed = 0
before = [p["port"] for p in before]
while elapsed < 5 and new_port is None:
now = [p["port"] for p in util.get_serial_ports()]
now = [p["port"] for p in list_serial_ports()]
for p in now:
if p not in before:
new_port = p
@ -113,7 +114,7 @@ def AutodetectUploadPort(*args, **kwargs):
def _look_for_mbed_disk():
msdlabels = ("mbed", "nucleo", "frdm", "microbit")
for item in util.get_logical_devices():
for item in list_logical_devices():
if item["path"].startswith("/net") or not _is_match_pattern(item["path"]):
continue
mbed_pages = [join(item["path"], n) for n in ("mbed.htm", "mbed.html")]
@ -129,7 +130,7 @@ def AutodetectUploadPort(*args, **kwargs):
upload_protocol = env.subst("$UPLOAD_PROTOCOL")
if "BOARD" in env and "build.hwids" in env.BoardConfig():
board_hwids = env.BoardConfig().get("build.hwids")
for item in util.get_serial_ports(filter_hwid=True):
for item in list_serial_ports(filter_hwid=True):
if not _is_match_pattern(item["port"]):
continue
port = item["port"]

View File

@ -12,4 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.commands.device.filters.base import DeviceMonitorFilter
# pylint: disable=unused-import
from platformio.device.filters.base import (
DeviceMonitorFilterBase as DeviceMonitorFilter,
)

View File

@ -12,225 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import sys
from fnmatch import fnmatch
import click
from serial.tools import miniterm
from platformio import exception, fs, util
from platformio.commands.device import helpers as device_helpers
from platformio.platform.factory import PlatformFactory
from platformio.project.exception import NotPlatformIOProjectError
from platformio.device.commands.list import device_list_cmd
from platformio.device.commands.monitor import device_monitor_cmd
@click.group(short_help="Device manager & serial/socket monitor")
@click.group(
"device",
commands=[
device_list_cmd,
device_monitor_cmd,
],
short_help="Device manager & Serial/Socket monitor",
)
def cli():
pass
@cli.command("list", short_help="List devices")
@click.option("--serial", is_flag=True, help="List serial ports, default")
@click.option("--logical", is_flag=True, help="List logical devices")
@click.option("--mdns", is_flag=True, help="List multicast DNS services")
@click.option("--json-output", is_flag=True)
def device_list( # pylint: disable=too-many-branches
serial, logical, mdns, json_output
):
if not logical and not mdns:
serial = True
data = {}
if serial:
data["serial"] = util.get_serial_ports()
if logical:
data["logical"] = util.get_logical_devices()
if mdns:
data["mdns"] = util.get_mdns_services()
single_key = list(data)[0] if len(list(data)) == 1 else None
if json_output:
return click.echo(json.dumps(data[single_key] if single_key else data))
titles = {
"serial": "Serial Ports",
"logical": "Logical Devices",
"mdns": "Multicast DNS Services",
}
for key, value in data.items():
if not single_key:
click.secho(titles[key], bold=True)
click.echo("=" * len(titles[key]))
if key == "serial":
for item in value:
click.secho(item["port"], fg="cyan")
click.echo("-" * len(item["port"]))
click.echo("Hardware ID: %s" % item["hwid"])
click.echo("Description: %s" % item["description"])
click.echo("")
if key == "logical":
for item in value:
click.secho(item["path"], fg="cyan")
click.echo("-" * len(item["path"]))
click.echo("Name: %s" % item["name"])
click.echo("")
if key == "mdns":
for item in value:
click.secho(item["name"], fg="cyan")
click.echo("-" * len(item["name"]))
click.echo("Type: %s" % item["type"])
click.echo("IP: %s" % item["ip"])
click.echo("Port: %s" % item["port"])
if item["properties"]:
click.echo(
"Properties: %s"
% (
"; ".join(
[
"%s=%s" % (k, v)
for k, v in item["properties"].items()
]
)
)
)
click.echo("")
if single_key:
click.echo("")
return True
@cli.command("monitor", short_help="Monitor device (Serial)")
@click.option("--port", "-p", help="Port, a number or a device name")
@click.option("--baud", "-b", type=int, help="Set baud rate, default=9600")
@click.option(
"--parity",
default="N",
type=click.Choice(["N", "E", "O", "S", "M"]),
help="Set parity, default=N",
)
@click.option("--rtscts", is_flag=True, help="Enable RTS/CTS flow control, default=Off")
@click.option(
"--xonxoff", is_flag=True, help="Enable software flow control, default=Off"
)
@click.option(
"--rts", default=None, type=click.IntRange(0, 1), help="Set initial RTS line state"
)
@click.option(
"--dtr", default=None, type=click.IntRange(0, 1), help="Set initial DTR line state"
)
@click.option("--echo", is_flag=True, help="Enable local echo, default=Off")
@click.option(
"--encoding",
default="UTF-8",
help="Set the encoding for the serial port (e.g. hexlify, "
"Latin1, UTF-8), default: UTF-8",
)
@click.option("--filter", "-f", multiple=True, help="Add filters/text transformations")
@click.option(
"--eol",
default="CRLF",
type=click.Choice(["CR", "LF", "CRLF"]),
help="End of line mode, default=CRLF",
)
@click.option("--raw", is_flag=True, help="Do not apply any encodings/transformations")
@click.option(
"--exit-char",
type=int,
default=3,
help="ASCII code of special character that is used to exit "
"the application, default=3 (Ctrl+C)",
)
@click.option(
"--menu-char",
type=int,
default=20,
help="ASCII code of special character that is used to "
"control miniterm (menu), default=20 (DEC)",
)
@click.option(
"--quiet",
is_flag=True,
help="Diagnostics: suppress non-error messages, default=Off",
)
@click.option(
"-d",
"--project-dir",
default=os.getcwd,
type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True),
)
@click.option(
"-e",
"--environment",
help="Load configuration from `platformio.ini` and specified environment",
)
def device_monitor(**kwargs): # pylint: disable=too-many-branches
project_options = {}
platform = None
try:
with fs.cd(kwargs["project_dir"]):
project_options = device_helpers.get_project_options(kwargs["environment"])
kwargs = device_helpers.apply_project_monitor_options(
kwargs, project_options
)
if "platform" in project_options:
platform = PlatformFactory.new(project_options["platform"])
except NotPlatformIOProjectError:
pass
with fs.cd(kwargs["project_dir"]):
device_helpers.register_filters(platform=platform, options=kwargs)
if not kwargs["port"]:
ports = util.get_serial_ports(filter_hwid=True)
if len(ports) == 1:
kwargs["port"] = ports[0]["port"]
elif "platform" in project_options and "board" in project_options:
board_hwids = device_helpers.get_board_hwids(
kwargs["project_dir"],
platform,
project_options["board"],
)
for item in ports:
for hwid in board_hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item["hwid"]:
kwargs["port"] = item["port"]
break
if kwargs["port"]:
break
elif kwargs["port"] and (set(["*", "?", "[", "]"]) & set(kwargs["port"])):
for item in util.get_serial_ports():
if fnmatch(item["port"], kwargs["port"]):
kwargs["port"] = item["port"]
break
# override system argv with patched options
sys.argv = ["monitor"] + device_helpers.options_to_argv(
kwargs,
project_options,
ignore=("port", "baud", "rts", "dtr", "environment", "project_dir"),
)
if not kwargs["quiet"]:
click.echo(
"--- Available filters and text transformations: %s"
% ", ".join(sorted(miniterm.TRANSFORMATIONS.keys()))
)
click.echo("--- More details at https://bit.ly/pio-monitor-filters")
try:
miniterm.main(
default_port=kwargs["port"],
default_baudrate=kwargs["baud"] or 9600,
default_rts=kwargs["rts"],
default_dtr=kwargs["dtr"],
)
except Exception as e:
raise exception.MinitermException(e)

View File

@ -1,43 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from serial.tools import miniterm
from platformio.project.config import ProjectConfig
class DeviceMonitorFilter(miniterm.Transform):
def __init__(self, options=None):
"""Called by PlatformIO to pass context"""
miniterm.Transform.__init__(self)
self.options = options or {}
self.project_dir = self.options.get("project_dir")
self.environment = self.options.get("environment")
self.config = ProjectConfig.get_instance()
if not self.environment:
default_envs = self.config.default_envs()
if default_envs:
self.environment = default_envs[0]
elif self.config.envs():
self.environment = self.config.envs()[0]
def __call__(self):
"""Called by the miniterm library when the filter is actually used"""
return self
@property
def NAME(self):
raise NotImplementedError("Please declare NAME attribute for the filter class")

View File

@ -22,10 +22,11 @@ from functools import cmp_to_key
import click
from platformio import __default_requests_timeout__, fs, util
from platformio import __default_requests_timeout__, fs
from platformio.cache import ContentCache
from platformio.clients.http import ensure_internet_on
from platformio.commands.home import helpers
from platformio.device.list import list_logical_devices
class OSRPC:
@ -154,7 +155,7 @@ class OSRPC:
@staticmethod
def get_logical_devices():
items = []
for item in util.get_logical_devices():
for item in list_logical_devices():
if item["name"]:
item["name"] = item["name"]
items.append(item)

View File

@ -17,11 +17,12 @@ import os
from twisted.logger import LogLevel # pylint: disable=import-error
from twisted.spread import pb # pylint: disable=import-error
from platformio import proc, util
from platformio import proc
from platformio.commands.remote.ac.process import ProcessAsyncCmd
from platformio.commands.remote.ac.psync import ProjectSyncAsyncCmd
from platformio.commands.remote.ac.serial import SerialPortAsyncCmd
from platformio.commands.remote.client.base import RemoteClientBase
from platformio.device.list import list_serial_ports
from platformio.project.config import ProjectConfig
from platformio.project.exception import NotPlatformIOProjectError
@ -84,11 +85,11 @@ class RemoteAgentService(RemoteClientBase):
return (self.id, ac.id)
def _process_cmd_device_list(self, _):
return (self.name, util.get_serialports())
return (self.name, list_serial_ports())
def _process_cmd_device_monitor(self, options):
if not options["port"]:
for item in util.get_serialports():
for item in list_serial_ports():
if "VID:PID" in item["hwid"]:
options["port"] = item["port"]
break

View File

@ -24,11 +24,16 @@ from time import sleep
import click
from platformio import fs, proc
from platformio.commands.device import helpers as device_helpers
from platformio.commands.device.command import device_monitor as cmd_device_monitor
from platformio.commands.run.command import cli as cmd_run
from platformio.device.commands.monitor import (
apply_project_monitor_options,
device_monitor_cmd,
get_project_options,
project_options_to_monitor_argv,
)
from platformio.package.manager.core import inject_contrib_pysite
from platformio.project.exception import NotPlatformIOProjectError
from platformio.project.options import ProjectOptions
from platformio.test.command import test_cmd
@ -265,7 +270,12 @@ def device_list(agents, json_output):
@remote_device.command("monitor", short_help="Monitor remote device")
@click.option("--port", "-p", help="Port, a number or a device name")
@click.option("--baud", "-b", type=int, help="Set baud rate, default=9600")
@click.option(
"--baud",
"-b",
type=int,
help="Set baud rate, default=%d" % ProjectOptions["env.monitor_speed"].default,
)
@click.option(
"--parity",
default="N",
@ -344,19 +354,19 @@ def device_monitor(ctx, agents, **kwargs):
project_options = {}
try:
with fs.cd(kwargs["project_dir"]):
project_options = device_helpers.get_project_options(kwargs["environment"])
kwargs = device_helpers.apply_project_monitor_options(kwargs, project_options)
project_options = get_project_options(kwargs["environment"])
kwargs = apply_project_monitor_options(kwargs, project_options)
except NotPlatformIOProjectError:
pass
kwargs["baud"] = kwargs["baud"] or 9600
kwargs["baud"] = kwargs["baud"] or ProjectOptions["env.monitor_speed"].default
def _tx_target(sock_dir):
subcmd_argv = ["remote"]
for agent in agents:
subcmd_argv.extend(["--agent", agent])
subcmd_argv.extend(["device", "monitor"])
subcmd_argv.extend(device_helpers.options_to_argv(kwargs, project_options))
subcmd_argv.extend(project_options_to_monitor_argv(kwargs, project_options))
subcmd_argv.extend(["--sock", sock_dir])
subprocess.call([proc.where_is_program("platformio")] + subcmd_argv)
@ -371,7 +381,7 @@ def device_monitor(ctx, agents, **kwargs):
return
with open(sock_file, encoding="utf8") as fp:
kwargs["port"] = fp.read()
ctx.invoke(cmd_device_monitor, **kwargs)
ctx.invoke(device_monitor_cmd, **kwargs)
t.join(2)
finally:
fs.rmtree(sock_dir)

View File

@ -22,9 +22,9 @@ import click
from tabulate import tabulate
from platformio import app, exception, fs, util
from platformio.commands.device.command import device_monitor as cmd_device_monitor
from platformio.commands.run.helpers import clean_build_dir, handle_legacy_libdeps
from platformio.commands.run.processor import EnvironmentProcessor
from platformio.device.commands.monitor import device_monitor_cmd
from platformio.project.config import ProjectConfig
from platformio.project.helpers import find_project_dir_above, load_build_metadata
from platformio.test.runners.base import CTX_META_TEST_IS_RUNNING
@ -207,7 +207,7 @@ def process_env(
and "nobuild" not in ep.get_build_targets()
):
ctx.invoke(
cmd_device_monitor, environment=environments[0] if environments else None
device_monitor_cmd, environment=environments[0] if environments else None
)
return result

View File

@ -12,20 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import sys
import time
from fnmatch import fnmatch
from hashlib import sha1
from io import BytesIO
from os.path import isfile
from platformio import util
from platformio.commands import PlatformioCLI
from platformio.commands.run.command import cli as cmd_run
from platformio.commands.run.command import print_processing_header
from platformio.compat import IS_WINDOWS, is_bytes
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.device.list import list_serial_ports
from platformio.test.helpers import list_test_names
from platformio.test.result import TestSuite
from platformio.test.runners.base import TestRunnerOptions
@ -116,7 +116,7 @@ def predebug_project(
def has_debug_symbols(prog_path):
if not isfile(prog_path):
if not os.path.isfile(prog_path):
return False
matched = {
b".debug_info": False,
@ -142,7 +142,7 @@ def has_debug_symbols(prog_path):
def is_prog_obsolete(prog_path):
prog_hash_path = prog_path + ".sha1"
if not isfile(prog_path):
if not os.path.isfile(prog_path):
return True
shasum = sha1()
with open(prog_path, "rb") as fp:
@ -153,7 +153,7 @@ def is_prog_obsolete(prog_path):
shasum.update(data)
new_digest = shasum.hexdigest()
old_digest = None
if isfile(prog_hash_path):
if os.path.isfile(prog_hash_path):
with open(prog_hash_path, encoding="utf8") as fp:
old_digest = fp.read()
if new_digest == old_digest:
@ -178,7 +178,7 @@ def reveal_debug_port(env_debug_port, tool_name, tool_settings):
return fnmatch(port, pattern)
def _look_for_serial_port(hwids):
for item in util.get_serialports(filter_hwid=True):
for item in list_serial_ports(filter_hwid=True):
if not _is_match_pattern(item["port"]):
continue
port = item["port"]

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,99 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import click
from platformio.device.list import (
list_logical_devices,
list_mdns_services,
list_serial_ports,
)
@click.command("list", short_help="List devices")
@click.option("--serial", is_flag=True, help="List serial ports, default")
@click.option("--logical", is_flag=True, help="List logical devices")
@click.option("--mdns", is_flag=True, help="List multicast DNS services")
@click.option("--json-output", is_flag=True)
def device_list_cmd( # pylint: disable=too-many-branches
serial, logical, mdns, json_output
):
if not logical and not mdns:
serial = True
data = {}
if serial:
data["serial"] = list_serial_ports()
if logical:
data["logical"] = list_logical_devices()
if mdns:
data["mdns"] = list_mdns_services()
single_key = list(data)[0] if len(list(data)) == 1 else None
if json_output:
return click.echo(json.dumps(data[single_key] if single_key else data))
titles = {
"serial": "Serial Ports",
"logical": "Logical Devices",
"mdns": "Multicast DNS Services",
}
for key, value in data.items():
if not single_key:
click.secho(titles[key], bold=True)
click.echo("=" * len(titles[key]))
if key == "serial":
for item in value:
click.secho(item["port"], fg="cyan")
click.echo("-" * len(item["port"]))
click.echo("Hardware ID: %s" % item["hwid"])
click.echo("Description: %s" % item["description"])
click.echo("")
if key == "logical":
for item in value:
click.secho(item["path"], fg="cyan")
click.echo("-" * len(item["path"]))
click.echo("Name: %s" % item["name"])
click.echo("")
if key == "mdns":
for item in value:
click.secho(item["name"], fg="cyan")
click.echo("-" * len(item["name"]))
click.echo("Type: %s" % item["type"])
click.echo("IP: %s" % item["ip"])
click.echo("Port: %s" % item["port"])
if item["properties"]:
click.echo(
"Properties: %s"
% (
"; ".join(
[
"%s=%s" % (k, v)
for k, v in item["properties"].items()
]
)
)
)
click.echo("")
if single_key:
click.echo("")
return True

View File

@ -0,0 +1,203 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from fnmatch import fnmatch
import click
from serial.tools import miniterm
from platformio import exception, fs
from platformio.device.filters.base import register_filters
from platformio.device.list import list_serial_ports
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectConfig
from platformio.project.exception import NotPlatformIOProjectError
from platformio.project.options import ProjectOptions
@click.command("monitor", short_help="Monitor device (Serial/Socket)")
@click.option("--port", "-p", help="Port, a number or a device name")
@click.option(
"--baud",
"-b",
type=int,
help="Set baud rate, default=%d" % ProjectOptions["env.monitor_speed"].default,
)
@click.option(
"--parity",
default="N",
type=click.Choice(["N", "E", "O", "S", "M"]),
help="Set parity, default=N",
)
@click.option("--rtscts", is_flag=True, help="Enable RTS/CTS flow control, default=Off")
@click.option(
"--xonxoff", is_flag=True, help="Enable software flow control, default=Off"
)
@click.option(
"--rts", default=None, type=click.IntRange(0, 1), help="Set initial RTS line state"
)
@click.option(
"--dtr", default=None, type=click.IntRange(0, 1), help="Set initial DTR line state"
)
@click.option("--echo", is_flag=True, help="Enable local echo, default=Off")
@click.option(
"--encoding",
default="UTF-8",
help="Set the encoding for the serial port (e.g. hexlify, "
"Latin1, UTF-8), default: UTF-8",
)
@click.option("--filter", "-f", multiple=True, help="Add filters/text transformations")
@click.option(
"--eol",
default="CRLF",
type=click.Choice(["CR", "LF", "CRLF"]),
help="End of line mode, default=CRLF",
)
@click.option("--raw", is_flag=True, help="Do not apply any encodings/transformations")
@click.option(
"--exit-char",
type=int,
default=3,
help="ASCII code of special character that is used to exit "
"the application, default=3 (Ctrl+C)",
)
@click.option(
"--menu-char",
type=int,
default=20,
help="ASCII code of special character that is used to "
"control miniterm (menu), default=20 (DEC)",
)
@click.option(
"--quiet",
is_flag=True,
help="Diagnostics: suppress non-error messages, default=Off",
)
@click.option(
"-d",
"--project-dir",
default=os.getcwd,
type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True),
)
@click.option(
"-e",
"--environment",
help="Load configuration from `platformio.ini` and specified environment",
)
def device_monitor_cmd(**kwargs): # pylint: disable=too-many-branches
project_options = {}
platform = None
try:
with fs.cd(kwargs["project_dir"]):
project_options = get_project_options(kwargs["environment"])
kwargs = apply_project_monitor_options(kwargs, project_options)
if "platform" in project_options:
platform = PlatformFactory.new(project_options["platform"])
except NotPlatformIOProjectError:
pass
with fs.cd(kwargs["project_dir"]):
register_filters(platform=platform, options=kwargs)
if not kwargs["port"]:
ports = list_serial_ports(filter_hwid=True)
if len(ports) == 1:
kwargs["port"] = ports[0]["port"]
elif "platform" in project_options and "board" in project_options:
with fs.cd(kwargs["project_dir"]):
board_hwids = platform.board_config(project_options["board"]).get(
"build.hwids", []
)
for item in ports:
for hwid in board_hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item["hwid"]:
kwargs["port"] = item["port"]
break
if kwargs["port"]:
break
elif kwargs["port"] and (set(["*", "?", "[", "]"]) & set(kwargs["port"])):
for item in list_serial_ports():
if fnmatch(item["port"], kwargs["port"]):
kwargs["port"] = item["port"]
break
# override system argv with patched options
sys.argv = ["monitor"] + project_options_to_monitor_argv(
kwargs,
project_options,
ignore=("port", "baud", "rts", "dtr", "environment", "project_dir"),
)
if not kwargs["quiet"]:
click.echo(
"--- Available filters and text transformations: %s"
% ", ".join(sorted(miniterm.TRANSFORMATIONS.keys()))
)
click.echo("--- More details at https://bit.ly/pio-monitor-filters")
try:
miniterm.main(
default_port=kwargs["port"],
default_baudrate=kwargs["baud"]
or ProjectOptions["env.monitor_speed"].default,
default_rts=kwargs["rts"],
default_dtr=kwargs["dtr"],
)
except Exception as e:
raise exception.MinitermException(e)
def get_project_options(environment=None):
config = ProjectConfig.get_instance()
config.validate(envs=[environment] if environment else None)
environment = environment or config.get_default_env()
return config.items(env=environment, as_dict=True)
def apply_project_monitor_options(cli_options, project_options):
for k in ("port", "speed", "rts", "dtr"):
k2 = "monitor_%s" % k
if k == "speed":
k = "baud"
if cli_options[k] is None and k2 in project_options:
cli_options[k] = project_options[k2]
if k != "port":
cli_options[k] = int(cli_options[k])
return cli_options
def project_options_to_monitor_argv(cli_options, project_options, ignore=None):
confmon_flags = project_options.get("monitor_flags", [])
result = confmon_flags[::]
for f in project_options.get("monitor_filters", []):
result.extend(["--filter", f])
for k, v in cli_options.items():
if v is None or (ignore and k in ignore):
continue
k = "--" + k.replace("_", "-")
if k in confmon_flags:
continue
if isinstance(v, bool):
if v:
result.append(k)
elif isinstance(v, tuple):
for i in v:
result.extend([k, i])
else:
result.extend([k, str(v)])
return result

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -18,91 +18,35 @@ import os
from serial.tools import miniterm
from platformio import fs
from platformio.commands.device import DeviceMonitorFilter
from platformio.compat import get_object_members, load_python_module
from platformio.package.manager.tool import ToolPackageManager
from platformio.project.config import ProjectConfig
def apply_project_monitor_options(cli_options, project_options):
for k in ("port", "speed", "rts", "dtr"):
k2 = "monitor_%s" % k
if k == "speed":
k = "baud"
if cli_options[k] is None and k2 in project_options:
cli_options[k] = project_options[k2]
if k != "port":
cli_options[k] = int(cli_options[k])
return cli_options
class DeviceMonitorFilterBase(miniterm.Transform):
def __init__(self, options=None):
"""Called by PlatformIO to pass context"""
miniterm.Transform.__init__(self)
self.options = options or {}
self.project_dir = self.options.get("project_dir")
self.environment = self.options.get("environment")
def options_to_argv(cli_options, project_options, ignore=None):
confmon_flags = project_options.get("monitor_flags", [])
result = confmon_flags[::]
self.config = ProjectConfig.get_instance()
if not self.environment:
default_envs = self.config.default_envs()
if default_envs:
self.environment = default_envs[0]
elif self.config.envs():
self.environment = self.config.envs()[0]
for f in project_options.get("monitor_filters", []):
result.extend(["--filter", f])
def __call__(self):
"""Called by the miniterm library when the filter is actually used"""
return self
for k, v in cli_options.items():
if v is None or (ignore and k in ignore):
continue
k = "--" + k.replace("_", "-")
if k in confmon_flags:
continue
if isinstance(v, bool):
if v:
result.append(k)
elif isinstance(v, tuple):
for i in v:
result.extend([k, i])
else:
result.extend([k, str(v)])
return result
def get_project_options(environment=None):
config = ProjectConfig.get_instance()
config.validate(envs=[environment] if environment else None)
if not environment:
default_envs = config.default_envs()
if default_envs:
environment = default_envs[0]
else:
environment = config.envs()[0]
return config.items(env=environment, as_dict=True)
def get_board_hwids(project_dir, platform, board):
with fs.cd(project_dir):
return platform.board_config(board).get("build.hwids", [])
def load_monitor_filter(path, options=None):
name = os.path.basename(path)
name = name[: name.find(".")]
module = load_python_module("platformio.commands.device.filters.%s" % name, path)
for cls in get_object_members(module).values():
if (
not inspect.isclass(cls)
or not issubclass(cls, DeviceMonitorFilter)
or cls == DeviceMonitorFilter
):
continue
obj = cls(options)
miniterm.TRANSFORMATIONS[obj.NAME] = obj
return True
def load_monitor_filters(monitor_dir, prefix=None, options=None):
if not os.path.isdir(monitor_dir):
return
for name in os.listdir(monitor_dir):
if (prefix and not name.startswith(prefix)) or not name.endswith(".py"):
continue
path = os.path.join(monitor_dir, name)
if not os.path.isfile(path):
continue
load_monitor_filter(path, options)
@property
def NAME(self):
raise NotImplementedError("Please declare NAME attribute for the filter class")
def register_filters(platform=None, options=None):
@ -130,3 +74,31 @@ def register_filters(platform=None, options=None):
os.path.join(fs.get_source_dir(), "commands", "device", "filters"),
options=options,
)
def load_monitor_filters(monitor_dir, prefix=None, options=None):
if not os.path.isdir(monitor_dir):
return
for name in os.listdir(monitor_dir):
if (prefix and not name.startswith(prefix)) or not name.endswith(".py"):
continue
path = os.path.join(monitor_dir, name)
if not os.path.isfile(path):
continue
load_monitor_filter(path, options)
def load_monitor_filter(path, options=None):
name = os.path.basename(path)
name = name[: name.find(".")]
module = load_python_module("platformio.device.filters.%s" % name, path)
for cls in get_object_members(module).values():
if (
not inspect.isclass(cls)
or not issubclass(cls, DeviceMonitorFilterBase)
or cls == DeviceMonitorFilterBase
):
continue
obj = cls(options)
miniterm.TRANSFORMATIONS[obj.NAME] = obj
return True

View File

@ -14,10 +14,10 @@
import serial
from platformio.commands.device import DeviceMonitorFilter
from platformio.device.filters.base import DeviceMonitorFilterBase
class Hexlify(DeviceMonitorFilter):
class Hexlify(DeviceMonitorFilterBase):
NAME = "hexlify"
def __init__(self, *args, **kwargs):

View File

@ -16,10 +16,10 @@ import io
import os.path
from datetime import datetime
from platformio.commands.device import DeviceMonitorFilter
from platformio.device.filters.base import DeviceMonitorFilterBase
class LogToFile(DeviceMonitorFilter):
class LogToFile(DeviceMonitorFilterBase):
NAME = "log2file"
def __init__(self, *args, **kwargs):

View File

@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.commands.device import DeviceMonitorFilter
from platformio.device.filters.base import DeviceMonitorFilterBase
class SendOnEnter(DeviceMonitorFilter):
class SendOnEnter(DeviceMonitorFilterBase):
NAME = "send_on_enter"
def __init__(self, *args, **kwargs):

View File

@ -14,10 +14,10 @@
from datetime import datetime
from platformio.commands.device import DeviceMonitorFilter
from platformio.device.filters.base import DeviceMonitorFilterBase
class Timestamp(DeviceMonitorFilter):
class Timestamp(DeviceMonitorFilterBase):
NAME = "time"
def __init__(self, *args, **kwargs):

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

154
platformio/device/list.py Normal file
View File

@ -0,0 +1,154 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import re
import time
from glob import glob
import zeroconf
from platformio import __version__, exception, proc
from platformio.compat import IS_MACOS, IS_WINDOWS
def list_serial_ports(filter_hwid=False):
try:
# pylint: disable=import-outside-toplevel
from serial.tools.list_ports import comports
except ImportError:
raise exception.GetSerialPortsError(os.name)
result = []
for p, d, h in comports():
if not p:
continue
if not filter_hwid or "VID:PID" in h:
result.append({"port": p, "description": d, "hwid": h})
if filter_hwid:
return result
# fix for PySerial
if not result and IS_MACOS:
for p in glob("/dev/tty.*"):
result.append({"port": p, "description": "n/a", "hwid": "n/a"})
return result
def list_logical_devices():
items = []
if IS_WINDOWS:
try:
result = proc.exec_command(
["wmic", "logicaldisk", "get", "name,VolumeName"]
).get("out", "")
devicenamere = re.compile(r"^([A-Z]{1}\:)\s*(\S+)?")
for line in result.split("\n"):
match = devicenamere.match(line.strip())
if not match:
continue
items.append({"path": match.group(1) + "\\", "name": match.group(2)})
return items
except WindowsError: # pylint: disable=undefined-variable
pass
# try "fsutil"
result = proc.exec_command(["fsutil", "fsinfo", "drives"]).get("out", "")
for device in re.findall(r"[A-Z]:\\", result):
items.append({"path": device, "name": None})
return items
result = proc.exec_command(["df"]).get("out")
devicenamere = re.compile(r"^/.+\d+\%\s+([a-z\d\-_/]+)$", flags=re.I)
for line in result.split("\n"):
match = devicenamere.match(line.strip())
if not match:
continue
items.append({"path": match.group(1), "name": os.path.basename(match.group(1))})
return items
def list_mdns_services():
class mDNSListener(object):
def __init__(self):
self._zc = zeroconf.Zeroconf(interfaces=zeroconf.InterfaceChoice.All)
self._found_types = []
self._found_services = []
def __enter__(self):
zeroconf.ServiceBrowser(
self._zc,
[
"_http._tcp.local.",
"_hap._tcp.local.",
"_services._dns-sd._udp.local.",
],
self,
)
return self
def __exit__(self, etype, value, traceback):
self._zc.close()
def add_service(self, zc, type_, name):
try:
assert zeroconf.service_type_name(name)
assert str(name)
except (AssertionError, UnicodeError, zeroconf.BadTypeInNameException):
return
if name not in self._found_types:
self._found_types.append(name)
zeroconf.ServiceBrowser(self._zc, name, self)
if type_ in self._found_types:
s = zc.get_service_info(type_, name)
if s:
self._found_services.append(s)
def remove_service(self, zc, type_, name):
pass
def update_service(self, zc, type_, name):
pass
def get_services(self):
return self._found_services
items = []
with mDNSListener() as mdns:
time.sleep(3)
for service in mdns.get_services():
properties = None
if service.properties:
try:
properties = {
k.decode("utf8"): v.decode("utf8")
if isinstance(v, bytes)
else v
for k, v in service.properties.items()
}
json.dumps(properties)
except UnicodeDecodeError:
properties = None
items.append(
{
"type": service.type,
"name": service.name,
"ip": ", ".join(service.parsed_addresses()),
"port": service.port,
"properties": properties,
}
)
return items

View File

@ -17,7 +17,7 @@ from time import sleep
import click
import serial
from platformio import util
from platformio.device.list import list_serial_ports
from platformio.exception import UserSideException
@ -72,7 +72,7 @@ class SerialTestOutputReader:
port = None
elapsed = 0
while elapsed < 5 and not port:
for item in util.get_serialports():
for item in list_serial_ports():
port = item["port"]
for hwid in board_hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")

View File

@ -15,23 +15,23 @@
from __future__ import absolute_import
import functools
import json
import math
import os
import platform
import re
import shutil
import time
from datetime import datetime
from glob import glob
import click
import zeroconf
from platformio import __version__, exception, proc
from platformio.compat import IS_MACOS, IS_WINDOWS
from platformio.fs import cd, load_json # pylint: disable=unused-import
from platformio.proc import exec_command # pylint: disable=unused-import
from platformio import __version__
# pylint: disable=unused-import
from platformio.device.list import list_serial_ports as get_serial_ports
from platformio.fs import cd, load_json
from platformio.proc import exec_command
# pylint: enable=unused-import
class memoized(object):
@ -98,140 +98,6 @@ def get_systype():
return "%s_%s" % (type_, arch) if arch else type_
def get_serial_ports(filter_hwid=False):
try:
# pylint: disable=import-outside-toplevel
from serial.tools.list_ports import comports
except ImportError:
raise exception.GetSerialPortsError(os.name)
result = []
for p, d, h in comports():
if not p:
continue
if not filter_hwid or "VID:PID" in h:
result.append({"port": p, "description": d, "hwid": h})
if filter_hwid:
return result
# fix for PySerial
if not result and IS_MACOS:
for p in glob("/dev/tty.*"):
result.append({"port": p, "description": "n/a", "hwid": "n/a"})
return result
# Backward compatibility for PIO Core <3.5
get_serialports = get_serial_ports
def get_logical_devices():
items = []
if IS_WINDOWS:
try:
result = proc.exec_command(
["wmic", "logicaldisk", "get", "name,VolumeName"]
).get("out", "")
devicenamere = re.compile(r"^([A-Z]{1}\:)\s*(\S+)?")
for line in result.split("\n"):
match = devicenamere.match(line.strip())
if not match:
continue
items.append({"path": match.group(1) + "\\", "name": match.group(2)})
return items
except WindowsError: # pylint: disable=undefined-variable
pass
# try "fsutil"
result = proc.exec_command(["fsutil", "fsinfo", "drives"]).get("out", "")
for device in re.findall(r"[A-Z]:\\", result):
items.append({"path": device, "name": None})
return items
result = proc.exec_command(["df"]).get("out")
devicenamere = re.compile(r"^/.+\d+\%\s+([a-z\d\-_/]+)$", flags=re.I)
for line in result.split("\n"):
match = devicenamere.match(line.strip())
if not match:
continue
items.append({"path": match.group(1), "name": os.path.basename(match.group(1))})
return items
def get_mdns_services():
class mDNSListener(object):
def __init__(self):
self._zc = zeroconf.Zeroconf(interfaces=zeroconf.InterfaceChoice.All)
self._found_types = []
self._found_services = []
def __enter__(self):
zeroconf.ServiceBrowser(
self._zc,
[
"_http._tcp.local.",
"_hap._tcp.local.",
"_services._dns-sd._udp.local.",
],
self,
)
return self
def __exit__(self, etype, value, traceback):
self._zc.close()
def add_service(self, zc, type_, name):
try:
assert zeroconf.service_type_name(name)
assert str(name)
except (AssertionError, UnicodeError, zeroconf.BadTypeInNameException):
return
if name not in self._found_types:
self._found_types.append(name)
zeroconf.ServiceBrowser(self._zc, name, self)
if type_ in self._found_types:
s = zc.get_service_info(type_, name)
if s:
self._found_services.append(s)
def remove_service(self, zc, type_, name):
pass
def update_service(self, zc, type_, name):
pass
def get_services(self):
return self._found_services
items = []
with mDNSListener() as mdns:
time.sleep(3)
for service in mdns.get_services():
properties = None
if service.properties:
try:
properties = {
k.decode("utf8"): v.decode("utf8")
if isinstance(v, bytes)
else v
for k, v in service.properties.items()
}
json.dumps(properties)
except UnicodeDecodeError:
properties = None
items.append(
{
"type": service.type,
"name": service.name,
"ip": ", ".join(service.parsed_addresses()),
"port": service.port,
"properties": properties,
}
)
return items
def pioversion_to_intstr():
"""Legacy for framework-zephyr/scripts/platformio/platformio-build-pre.py"""
vermatch = re.match(r"^([\d\.]+)", __version__)