Implement buffering for the testing output

This commit is contained in:
Ivan Kravets
2022-05-05 13:02:27 +03:00
parent c0cfbe2ce0
commit 0b317ef04b
6 changed files with 64 additions and 68 deletions

2
docs

Submodule docs updated: e12174e655...f4d64ac55d

View File

@ -66,6 +66,7 @@ class TestRunnerBase:
autoinstall=True, autoinstall=True,
) )
self.cmd_ctx = None self.cmd_ctx = None
self._testing_output_buffer = ""
@property @property
def name(self): def name(self):
@ -184,33 +185,42 @@ class TestRunnerBase:
""" """
return env return env
def on_test_output(self, data): def on_testing_data_output(self, data):
click.echo(data, nl=False) if isinstance(data, bytes):
self.parse_test_cases(data) data = data.decode("utf8", "ignore")
self._testing_output_buffer += data
self._testing_output_buffer = self._testing_output_buffer.replace("\r", "")
while "\n" in self._testing_output_buffer:
nl_pos = self._testing_output_buffer.index("\n")
line = self._testing_output_buffer[: nl_pos + 1]
self._testing_output_buffer = self._testing_output_buffer[nl_pos + 1 :]
self.on_testing_line_output(line)
def parse_test_cases(self, data): def on_testing_line_output(self, line):
click.echo(line, nl=False)
self.parse_test_case(line)
def parse_test_case(self, line):
if not self.TESTCASE_PARSE_RE: if not self.TESTCASE_PARSE_RE:
raise NotImplementedError() raise NotImplementedError()
line = line.strip()
for line in data.split("\n"): if not line:
line = line.strip() return None
if not line: match = self.TESTCASE_PARSE_RE.search(line)
continue if not match:
match = self.TESTCASE_PARSE_RE.search(line) return None
if not match: data = match.groupdict()
continue source = None
data = match.groupdict() if "source_file" in data:
source = None source = TestCaseSource(
if "source_file" in data: file=data["source_file"], line=int(data.get("source_line"))
source = TestCaseSource(
file=data["source_file"], line=int(data.get("source_line"))
)
self.test_suite.add_case(
TestCase(
name=data.get("name"),
status=TestStatus.from_string(data.get("status")),
message=data.get("message"),
stdout=line,
source=source,
)
) )
test_case = TestCase(
name=data.get("name"),
status=TestStatus.from_string(data.get("status")),
message=data.get("message"),
stdout=line,
source=source,
)
self.test_suite.add_case(test_case)
return test_case

View File

@ -32,7 +32,7 @@ class ProgramProcessProtocol(asyncio.SubprocessProtocol):
data = data.decode(get_locale_encoding() or get_filesystem_encoding()) data = data.decode(get_locale_encoding() or get_filesystem_encoding())
except UnicodeDecodeError: except UnicodeDecodeError:
data = data.decode("latin-1") data = data.decode("latin-1")
self.test_runner.on_test_output(data) self.test_runner.on_testing_data_output(data)
if self.test_runner.test_suite.is_finished(): if self.test_runner.test_suite.is_finished():
self._stop_testing() self._stop_testing()

View File

@ -59,21 +59,7 @@ class SerialTestOutputReader:
sleep(0.1) sleep(0.1)
while not self.test_runner.test_suite.is_finished(): while not self.test_runner.test_suite.is_finished():
line = ser.readline().strip() self.test_runner.on_testing_data_output(ser.read(ser.in_waiting or 1))
# fix non-ascii output from device
for i, c in enumerate(line[::-1]):
if not isinstance(c, int):
c = ord(c)
if c > 127:
line = line[-i:]
break
if not line:
continue
if isinstance(line, bytes):
line = line.decode("utf8", "ignore")
self.test_runner.on_test_output(line)
ser.close() ser.close()
def autodetect_test_port(self): def autodetect_test_port(self):

View File

@ -249,33 +249,33 @@ void unityOutputComplete(void) { unittest_uart_end(); }
encoding="utf8", encoding="utf8",
) )
def on_test_output(self, data): def on_testing_line_output(self, line):
data = strip_ansi_codes(data or "") line = strip_ansi_codes(line or "")
if not data.strip(): if not line.strip():
return click.echo(data, nl=False) click.echo(line, nl=False)
return
if all(s in data for s in ("Tests", "Failures", "Ignored")): if all(s in line for s in ("Tests", "Failures", "Ignored")):
self.test_suite.on_finish() self.test_suite.on_finish()
# beautify output # beautify output
for line in data.strip().split("\n"): line = line.strip()
line = line.strip() if line.strip(".").endswith(":PASS"):
if line.strip(".").endswith(":PASS"): click.echo(
click.echo( "%s\t[%s]"
"%s\t[%s]" % (line[: line.rindex(":PASS")], click.style("PASSED", fg="green"))
% (line[: line.rindex(":PASS")], click.style("PASSED", fg="green")) )
elif line.strip(".").endswith(":IGNORE"):
click.echo(
"%s\t[%s]"
% (
line[: line.rindex(":IGNORE")],
click.style("IGNORED", fg="yellow"),
) )
elif line.strip(".").endswith(":IGNORE"): )
click.echo( elif ":FAIL" in line:
"%s\t[%s]" click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red")))
% ( else:
line[: line.rindex(":IGNORE")], click.echo(line)
click.style("IGNORED", fg="yellow"),
)
)
elif ":FAIL" in line:
click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red")))
else:
click.echo(line)
return self.parse_test_cases(data) self.parse_test_case(line)

View File

@ -216,7 +216,7 @@ int main(int argc, char *argv[]) {
@pytest.mark.skipif( @pytest.mark.skipif(
sys.platform == "win32", reason="runs only on Unix (issue with SimAVR)" sys.platform != "darwin", reason="runs only on macOS (issue with SimAVR)"
) )
def test_custom_testing_command(clirunner, validate_cliresult, tmp_path: Path): def test_custom_testing_command(clirunner, validate_cliresult, tmp_path: Path):
project_dir = tmp_path / "project" project_dir = tmp_path / "project"