forked from platformio/platformio-core
Initial commit of PIO Unit Testing
This commit is contained in:
133
platformio/commands/test/embedded.py
Normal file
133
platformio/commands/test/embedded.py
Normal file
@@ -0,0 +1,133 @@
|
||||
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from time import sleep
|
||||
|
||||
import click
|
||||
import serial
|
||||
|
||||
from platformio import exception, util
|
||||
from platformio.commands.test.processor import TestProcessorBase
|
||||
from platformio.managers.platform import PlatformFactory
|
||||
|
||||
|
||||
class EmbeddedTestProcessor(TestProcessorBase):
|
||||
|
||||
SERIAL_TIMEOUT = 600
|
||||
|
||||
def process(self):
|
||||
if not self.options['without_building']:
|
||||
self.print_progress("Building... (1/3)")
|
||||
target = ["__test"]
|
||||
if self.options['without_uploading']:
|
||||
target.append("checkprogsize")
|
||||
self.build_or_upload(target)
|
||||
|
||||
if not self.options['without_uploading']:
|
||||
self.print_progress("Uploading... (2/3)")
|
||||
target = ["upload"]
|
||||
if self.options['without_building']:
|
||||
target.append("nobuild")
|
||||
else:
|
||||
target.append("__test")
|
||||
self.build_or_upload(target)
|
||||
|
||||
if self.options['without_testing']:
|
||||
return None
|
||||
|
||||
self.print_progress("Testing... (3/3)")
|
||||
return self.run()
|
||||
|
||||
def run(self):
|
||||
click.echo("If you don't see any output for the first 10 secs, "
|
||||
"please reset board (press reset button)")
|
||||
click.echo()
|
||||
|
||||
try:
|
||||
ser = serial.Serial(
|
||||
baudrate=self.get_baudrate(), timeout=self.SERIAL_TIMEOUT)
|
||||
ser.port = self.get_test_port()
|
||||
ser.rts = self.options['monitor_rts']
|
||||
ser.dtr = self.options['monitor_dtr']
|
||||
ser.open()
|
||||
except serial.SerialException as e:
|
||||
click.secho(str(e), fg="red", err=True)
|
||||
return False
|
||||
|
||||
if not self.options['no_reset']:
|
||||
ser.flushInput()
|
||||
ser.setDTR(False)
|
||||
ser.setRTS(False)
|
||||
sleep(0.1)
|
||||
ser.setDTR(True)
|
||||
ser.setRTS(True)
|
||||
sleep(0.1)
|
||||
|
||||
while True:
|
||||
line = ser.readline().strip()
|
||||
|
||||
# fix non-ascii output from device
|
||||
for i, c in enumerate(line[::-1]):
|
||||
if not isinstance(c, int):
|
||||
c = ord(c)
|
||||
if c > 127:
|
||||
line = line[-i:]
|
||||
break
|
||||
|
||||
if not line:
|
||||
continue
|
||||
if isinstance(line, bytes):
|
||||
line = line.decode("utf8")
|
||||
self.on_run_out(line)
|
||||
if all([l in line for l in ("Tests", "Failures", "Ignored")]):
|
||||
break
|
||||
ser.close()
|
||||
return not self._run_failed
|
||||
|
||||
def get_test_port(self):
|
||||
# if test port is specified manually or in config
|
||||
if self.options.get("test_port"):
|
||||
return self.options.get("test_port")
|
||||
if self.env_options.get("test_port"):
|
||||
return self.env_options.get("test_port")
|
||||
|
||||
assert set(["platform", "board"]) & set(self.env_options.keys())
|
||||
p = PlatformFactory.newPlatform(self.env_options['platform'])
|
||||
board_hwids = p.board_config(self.env_options['board']).get(
|
||||
"build.hwids", [])
|
||||
port = None
|
||||
elapsed = 0
|
||||
while elapsed < 5 and not port:
|
||||
for item in util.get_serialports():
|
||||
port = item['port']
|
||||
for hwid in board_hwids:
|
||||
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
|
||||
if hwid_str in item['hwid']:
|
||||
return port
|
||||
|
||||
# check if port is already configured
|
||||
try:
|
||||
serial.Serial(port, timeout=self.SERIAL_TIMEOUT).close()
|
||||
except serial.SerialException:
|
||||
port = None
|
||||
|
||||
if not port:
|
||||
sleep(0.25)
|
||||
elapsed += 0.25
|
||||
|
||||
if not port:
|
||||
raise exception.PlatformioException(
|
||||
"Please specify `test_port` for environment or use "
|
||||
"global `--test-port` option.")
|
||||
return port
|
||||
Reference in New Issue
Block a user