forked from platformio/platformio-core
Extend "pio device list" command with new options to list logical devices and multicast DNS services // Resolve #463
This commit is contained in:
@@ -32,6 +32,7 @@ from time import sleep, time
|
||||
|
||||
import click
|
||||
import requests
|
||||
import zeroconf
|
||||
|
||||
from platformio import __apiurl__, __version__, exception
|
||||
|
||||
@@ -417,7 +418,7 @@ def copy_pythonpath_to_osenv():
|
||||
os.environ['PYTHONPATH'] = os.pathsep.join(_PYTHONPATH)
|
||||
|
||||
|
||||
def get_serialports(filter_hwid=False):
|
||||
def get_serial_ports(filter_hwid=False):
|
||||
try:
|
||||
from serial.tools.list_ports import comports
|
||||
except ImportError:
|
||||
@@ -445,42 +446,106 @@ def get_serialports(filter_hwid=False):
|
||||
return result
|
||||
|
||||
|
||||
def get_logicaldisks():
|
||||
disks = []
|
||||
def get_logical_devices():
|
||||
items = []
|
||||
if platform.system() == "Windows":
|
||||
try:
|
||||
result = exec_command(
|
||||
["wmic", "logicaldisk", "get", "name,VolumeName"]).get(
|
||||
"out", "")
|
||||
disknamere = re.compile(r"^([A-Z]{1}\:)\s*(\S+)?")
|
||||
devicenamere = re.compile(r"^([A-Z]{1}\:)\s*(\S+)?")
|
||||
for line in result.split("\n"):
|
||||
match = disknamere.match(line.strip())
|
||||
match = devicenamere.match(line.strip())
|
||||
if not match:
|
||||
continue
|
||||
disks.append({
|
||||
"disk": match.group(1) + "\\",
|
||||
items.append({
|
||||
"device": match.group(1) + "\\",
|
||||
"name": match.group(2)
|
||||
})
|
||||
return disks
|
||||
return items
|
||||
except WindowsError: # pylint: disable=undefined-variable
|
||||
pass
|
||||
# try "fsutil"
|
||||
result = exec_command(["fsutil", "fsinfo", "drives"]).get("out", "")
|
||||
for disk in re.findall(r"[A-Z]:\\", result):
|
||||
disks.append({"disk": disk, "name": None})
|
||||
return disks
|
||||
for device in re.findall(r"[A-Z]:\\", result):
|
||||
items.append({"device": device, "name": None})
|
||||
return items
|
||||
else:
|
||||
result = exec_command(["df"]).get("out")
|
||||
disknamere = re.compile(r"^/.+\d+\%\s+([a-z\d\-_/]+)$", flags=re.I)
|
||||
devicenamere = re.compile(r"^/.+\d+\%\s+([a-z\d\-_/]+)$", flags=re.I)
|
||||
for line in result.split("\n"):
|
||||
match = disknamere.match(line.strip())
|
||||
match = devicenamere.match(line.strip())
|
||||
if not match:
|
||||
continue
|
||||
disks.append({
|
||||
"disk": match.group(1),
|
||||
items.append({
|
||||
"device": match.group(1),
|
||||
"name": basename(match.group(1))
|
||||
})
|
||||
return disks
|
||||
return items
|
||||
|
||||
|
||||
### Backward compatibility for PIO Core <3.5
|
||||
get_serialports = get_serial_ports
|
||||
get_logicaldisks = lambda: [{
|
||||
"disk": d['device'],
|
||||
"name": d['name']
|
||||
} for d in get_logical_devices()]
|
||||
|
||||
|
||||
def get_mdns_services():
|
||||
|
||||
class mDNSListener(object):
|
||||
|
||||
def __init__(self):
|
||||
self._zc = zeroconf.Zeroconf(
|
||||
interfaces=zeroconf.InterfaceChoice.All)
|
||||
self._found_types = []
|
||||
self._found_services = []
|
||||
|
||||
def __enter__(self):
|
||||
zeroconf.ServiceBrowser(self._zc, "_services._dns-sd._udp.local.",
|
||||
self)
|
||||
return self
|
||||
|
||||
def __exit__(self, etype, value, traceback):
|
||||
self._zc.close()
|
||||
|
||||
def remove_service(self, zc, type_, name):
|
||||
pass
|
||||
|
||||
def add_service(self, zc, type_, name):
|
||||
try:
|
||||
zeroconf.service_type_name(name)
|
||||
except zeroconf.BadTypeInNameException:
|
||||
return
|
||||
if name not in self._found_types:
|
||||
self._found_types.append(name)
|
||||
zeroconf.ServiceBrowser(self._zc, name, self)
|
||||
if type_ in self._found_types:
|
||||
s = zc.get_service_info(type_, name)
|
||||
if s:
|
||||
self._found_services.append(s)
|
||||
|
||||
def get_services(self):
|
||||
return self._found_services
|
||||
|
||||
items = []
|
||||
with mDNSListener() as mdns:
|
||||
sleep(5)
|
||||
for service in mdns.get_services():
|
||||
items.append({
|
||||
"type":
|
||||
service.type,
|
||||
"name":
|
||||
service.name,
|
||||
"ip":
|
||||
".".join([str(ord(c)) for c in service.address]),
|
||||
"port":
|
||||
service.port,
|
||||
"properties":
|
||||
service.properties
|
||||
})
|
||||
return items
|
||||
|
||||
|
||||
def get_request_defheaders():
|
||||
|
Reference in New Issue
Block a user