forked from platformio/platformio-core
Some boards, like ESP32 based boards, give some unintelligible data when connecting to them via Serial. This is sometimes data that is send with the wrong baud rate (hard baked into the boot loader), or something else. It's hard to prevent this from happening. When a build is uploaded to the ESP board for unit testing, the decoding of the incoming stream should not fail the test due to some garbled content. Since the read data is validated on line 95, any garbage is automatically ignored and only outputted to the console.
136 lines
4.5 KiB
Python
136 lines
4.5 KiB
Python
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from time import sleep
|
|
|
|
import click
|
|
import serial
|
|
|
|
from platformio import exception, util
|
|
from platformio.commands.test.processor import TestProcessorBase
|
|
from platformio.managers.platform import PlatformFactory
|
|
|
|
|
|
class EmbeddedTestProcessor(TestProcessorBase):
|
|
|
|
SERIAL_TIMEOUT = 600
|
|
|
|
def process(self):
|
|
if not self.options['without_building']:
|
|
self.print_progress("Building...")
|
|
target = ["__test"]
|
|
if self.options['without_uploading']:
|
|
target.append("checkprogsize")
|
|
if not self.build_or_upload(target):
|
|
return False
|
|
|
|
if not self.options['without_uploading']:
|
|
self.print_progress("Uploading...")
|
|
target = ["upload"]
|
|
if self.options['without_building']:
|
|
target.append("nobuild")
|
|
else:
|
|
target.append("__test")
|
|
if not self.build_or_upload(target):
|
|
return False
|
|
|
|
if self.options['without_testing']:
|
|
return None
|
|
|
|
self.print_progress("Testing...")
|
|
return self.run()
|
|
|
|
def run(self):
|
|
click.echo("If you don't see any output for the first 10 secs, "
|
|
"please reset board (press reset button)")
|
|
click.echo()
|
|
|
|
try:
|
|
ser = serial.Serial(baudrate=self.get_baudrate(),
|
|
timeout=self.SERIAL_TIMEOUT)
|
|
ser.port = self.get_test_port()
|
|
ser.rts = self.options['monitor_rts']
|
|
ser.dtr = self.options['monitor_dtr']
|
|
ser.open()
|
|
except serial.SerialException as e:
|
|
click.secho(str(e), fg="red", err=True)
|
|
return False
|
|
|
|
if not self.options['no_reset']:
|
|
ser.flushInput()
|
|
ser.setDTR(False)
|
|
ser.setRTS(False)
|
|
sleep(0.1)
|
|
ser.setDTR(True)
|
|
ser.setRTS(True)
|
|
sleep(0.1)
|
|
|
|
while True:
|
|
line = ser.readline().strip()
|
|
|
|
# fix non-ascii output from device
|
|
for i, c in enumerate(line[::-1]):
|
|
if not isinstance(c, int):
|
|
c = ord(c)
|
|
if c > 127:
|
|
line = line[-i:]
|
|
break
|
|
|
|
if not line:
|
|
continue
|
|
if isinstance(line, bytes):
|
|
line = line.decode("utf8", "ignore")
|
|
self.on_run_out(line)
|
|
if all([l in line for l in ("Tests", "Failures", "Ignored")]):
|
|
break
|
|
ser.close()
|
|
return not self._run_failed
|
|
|
|
def get_test_port(self):
|
|
# if test port is specified manually or in config
|
|
if self.options.get("test_port"):
|
|
return self.options.get("test_port")
|
|
if self.env_options.get("test_port"):
|
|
return self.env_options.get("test_port")
|
|
|
|
assert set(["platform", "board"]) & set(self.env_options.keys())
|
|
p = PlatformFactory.newPlatform(self.env_options['platform'])
|
|
board_hwids = p.board_config(self.env_options['board']).get(
|
|
"build.hwids", [])
|
|
port = None
|
|
elapsed = 0
|
|
while elapsed < 5 and not port:
|
|
for item in util.get_serialports():
|
|
port = item['port']
|
|
for hwid in board_hwids:
|
|
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
|
|
if hwid_str in item['hwid']:
|
|
return port
|
|
|
|
# check if port is already configured
|
|
try:
|
|
serial.Serial(port, timeout=self.SERIAL_TIMEOUT).close()
|
|
except serial.SerialException:
|
|
port = None
|
|
|
|
if not port:
|
|
sleep(0.25)
|
|
elapsed += 0.25
|
|
|
|
if not port:
|
|
raise exception.PlatformioException(
|
|
"Please specify `test_port` for environment or use "
|
|
"global `--test-port` option.")
|
|
return port
|