diff --git a/docs b/docs index e12174e6..f4d64ac5 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit e12174e6554b6eff70135a878f33bd3287ace18a +Subproject commit f4d64ac55d9a567491082687c52844a0fb72c711 diff --git a/platformio/test/runners/base.py b/platformio/test/runners/base.py index d9099a41..746defd7 100644 --- a/platformio/test/runners/base.py +++ b/platformio/test/runners/base.py @@ -66,6 +66,7 @@ class TestRunnerBase: autoinstall=True, ) self.cmd_ctx = None + self._testing_output_buffer = "" @property def name(self): @@ -184,33 +185,42 @@ class TestRunnerBase: """ return env - def on_test_output(self, data): - click.echo(data, nl=False) - self.parse_test_cases(data) + def on_testing_data_output(self, data): + if isinstance(data, bytes): + data = data.decode("utf8", "ignore") + self._testing_output_buffer += data + self._testing_output_buffer = self._testing_output_buffer.replace("\r", "") + while "\n" in self._testing_output_buffer: + nl_pos = self._testing_output_buffer.index("\n") + line = self._testing_output_buffer[: nl_pos + 1] + self._testing_output_buffer = self._testing_output_buffer[nl_pos + 1 :] + self.on_testing_line_output(line) - def parse_test_cases(self, data): + def on_testing_line_output(self, line): + click.echo(line, nl=False) + self.parse_test_case(line) + + def parse_test_case(self, line): if not self.TESTCASE_PARSE_RE: raise NotImplementedError() - - for line in data.split("\n"): - line = line.strip() - if not line: - continue - match = self.TESTCASE_PARSE_RE.search(line) - if not match: - continue - data = match.groupdict() - source = None - if "source_file" in data: - source = TestCaseSource( - file=data["source_file"], line=int(data.get("source_line")) - ) - self.test_suite.add_case( - TestCase( - name=data.get("name"), - status=TestStatus.from_string(data.get("status")), - message=data.get("message"), - stdout=line, - source=source, - ) + line = line.strip() + if not line: + return None + match = self.TESTCASE_PARSE_RE.search(line) + if not match: + return None + data = match.groupdict() + source = None + if "source_file" in data: + source = TestCaseSource( + file=data["source_file"], line=int(data.get("source_line")) ) + test_case = TestCase( + name=data.get("name"), + status=TestStatus.from_string(data.get("status")), + message=data.get("message"), + stdout=line, + source=source, + ) + self.test_suite.add_case(test_case) + return test_case diff --git a/platformio/test/runners/readers/program.py b/platformio/test/runners/readers/program.py index 6a0da1f4..4ceb5c0d 100644 --- a/platformio/test/runners/readers/program.py +++ b/platformio/test/runners/readers/program.py @@ -32,7 +32,7 @@ class ProgramProcessProtocol(asyncio.SubprocessProtocol): data = data.decode(get_locale_encoding() or get_filesystem_encoding()) except UnicodeDecodeError: data = data.decode("latin-1") - self.test_runner.on_test_output(data) + self.test_runner.on_testing_data_output(data) if self.test_runner.test_suite.is_finished(): self._stop_testing() diff --git a/platformio/test/runners/readers/serial.py b/platformio/test/runners/readers/serial.py index 5371eb04..6d96609c 100644 --- a/platformio/test/runners/readers/serial.py +++ b/platformio/test/runners/readers/serial.py @@ -59,21 +59,7 @@ class SerialTestOutputReader: sleep(0.1) while not self.test_runner.test_suite.is_finished(): - line = ser.readline().strip() - - # fix non-ascii output from device - for i, c in enumerate(line[::-1]): - if not isinstance(c, int): - c = ord(c) - if c > 127: - line = line[-i:] - break - - if not line: - continue - if isinstance(line, bytes): - line = line.decode("utf8", "ignore") - self.test_runner.on_test_output(line) + self.test_runner.on_testing_data_output(ser.read(ser.in_waiting or 1)) ser.close() def autodetect_test_port(self): diff --git a/platformio/test/runners/unity.py b/platformio/test/runners/unity.py index 3d17a693..d89d6478 100644 --- a/platformio/test/runners/unity.py +++ b/platformio/test/runners/unity.py @@ -249,33 +249,33 @@ void unityOutputComplete(void) { unittest_uart_end(); } encoding="utf8", ) - def on_test_output(self, data): - data = strip_ansi_codes(data or "") - if not data.strip(): - return click.echo(data, nl=False) + def on_testing_line_output(self, line): + line = strip_ansi_codes(line or "") + if not line.strip(): + click.echo(line, nl=False) + return - if all(s in data for s in ("Tests", "Failures", "Ignored")): + if all(s in line for s in ("Tests", "Failures", "Ignored")): self.test_suite.on_finish() # beautify output - for line in data.strip().split("\n"): - line = line.strip() - if line.strip(".").endswith(":PASS"): - click.echo( - "%s\t[%s]" - % (line[: line.rindex(":PASS")], click.style("PASSED", fg="green")) + line = line.strip() + if line.strip(".").endswith(":PASS"): + click.echo( + "%s\t[%s]" + % (line[: line.rindex(":PASS")], click.style("PASSED", fg="green")) + ) + elif line.strip(".").endswith(":IGNORE"): + click.echo( + "%s\t[%s]" + % ( + line[: line.rindex(":IGNORE")], + click.style("IGNORED", fg="yellow"), ) - elif line.strip(".").endswith(":IGNORE"): - click.echo( - "%s\t[%s]" - % ( - line[: line.rindex(":IGNORE")], - click.style("IGNORED", fg="yellow"), - ) - ) - elif ":FAIL" in line: - click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red"))) - else: - click.echo(line) + ) + elif ":FAIL" in line: + click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red"))) + else: + click.echo(line) - return self.parse_test_cases(data) + self.parse_test_case(line) diff --git a/tests/commands/test_test.py b/tests/commands/test_test.py index b11b243d..a21eaed6 100644 --- a/tests/commands/test_test.py +++ b/tests/commands/test_test.py @@ -216,7 +216,7 @@ int main(int argc, char *argv[]) { @pytest.mark.skipif( - sys.platform == "win32", reason="runs only on Unix (issue with SimAVR)" + sys.platform != "darwin", reason="runs only on macOS (issue with SimAVR)" ) def test_custom_testing_command(clirunner, validate_cliresult, tmp_path: Path): project_dir = tmp_path / "project"