Introduce a new PlatformIO Unit Testing engine

This commit is contained in:
Ivan Kravets
2022-04-21 18:11:49 +03:00
parent 93bfc57dea
commit ee43b86742
36 changed files with 1296 additions and 815 deletions

View File

@ -49,7 +49,6 @@ __default_requests_timeout__ = (10, None) # (connect, read)
__core_packages__ = {
"contrib-piohome": "~3.4.1",
"contrib-pysite": "~2.%d%d.0" % (sys.version_info.major, sys.version_info.minor),
"tool-unity": "~1.20500.0",
"tool-scons": "~4.40300.0",
"tool-cppcheck": "~1.260.0",
"tool-clangtidy": "~1.120001.0",

View File

@ -54,15 +54,17 @@ DEFAULT_ENV_OPTIONS = dict(
"link",
"pioasm",
"platformio",
"piotarget",
"pioplatform",
"pioproject",
"pioplatform",
"piotest",
"piotarget",
"piomaxlen",
"piolib",
"pioupload",
"piosize",
"pioino",
"piomisc",
"piointegration",
"piosize",
],
toolpath=[os.path.join(fs.get_source_dir(), "builder", "tools")],
variables=clivars,

View File

@ -20,12 +20,11 @@ import os
import SCons.Defaults # pylint: disable=import-error
import SCons.Subst # pylint: disable=import-error
from platformio.package.manager.core import get_core_package_dir
from platformio.proc import exec_command, where_is_program
def DumpIntegrationIncludes(env):
result = dict(build=[], compatlib=[], toolchain=[], unity=[])
result = dict(build=[], compatlib=[], toolchain=[])
result["build"].extend(
[
@ -58,18 +57,6 @@ def DumpIntegrationIncludes(env):
for g in toolchain_incglobs:
result["toolchain"].extend([os.path.abspath(inc) for inc in glob.glob(g)])
# include Unity framework if there are tests in project
auto_install_unity = False
test_dir = env.GetProjectConfig().get("platformio", "test_dir")
if os.path.isdir(test_dir) and os.listdir(test_dir) != ["README"]:
auto_install_unity = True
unity_dir = get_core_package_dir(
"tool-unity",
auto_install=auto_install_unity,
)
if unity_dir:
result["unity"].append(unity_dir)
return result

View File

@ -27,7 +27,6 @@ import sys
import click
import SCons.Scanner # pylint: disable=import-error
from SCons.Script import ARGUMENTS # pylint: disable=import-error
from SCons.Script import COMMAND_LINE_TARGETS # pylint: disable=import-error
from SCons.Script import DefaultEnvironment # pylint: disable=import-error
from platformio import exception, fs, util
@ -57,9 +56,9 @@ class LibBuilderFactory(object):
used_frameworks = LibBuilderFactory.get_used_frameworks(env, path)
common_frameworks = set(env.get("PIOFRAMEWORK", [])) & set(used_frameworks)
if common_frameworks:
clsname = "%sLibBuilder" % list(common_frameworks)[0].title()
clsname = "%sLibBuilder" % list(common_frameworks)[0].capitalize()
elif used_frameworks:
clsname = "%sLibBuilder" % used_frameworks[0].title()
clsname = "%sLibBuilder" % used_frameworks[0].capitalize()
obj = getattr(sys.modules[__name__], clsname)(env, path, verbose=verbose)
@ -877,7 +876,7 @@ class ProjectAsLibBuilder(LibBuilderBase):
# project files
items = LibBuilderBase.get_search_files(self)
# test files
if "__test" in COMMAND_LINE_TARGETS:
if "test" in self.env.GetBuildType():
items.extend(
[
os.path.join("$PROJECT_TEST_DIR", item)
@ -1106,7 +1105,7 @@ def ConfigureProjectLibBuilder(env):
click.echo("%s|-- %s" % (margin, title), nl=False)
if int(ARGUMENTS.get("PIOVERBOSE", 0)):
click.echo(
"(License: %s, " % (_get_lib_license(pkg) or "Unknown"), nl=False
" (License: %s, " % (_get_lib_license(pkg) or "Unknown"), nl=False
)
if pkg.metadata and pkg.metadata.spec.external:
click.echo("URI: %s, " % pkg.metadata.spec.uri, nl=False)

View File

@ -117,22 +117,6 @@ def ConfigureDebugTarget(env):
env.AppendUnique(ASFLAGS=optimization_flags, LINKFLAGS=optimization_flags)
def ConfigureTestTarget(env):
env.Append(
CPPDEFINES=["UNIT_TEST", "UNITY_INCLUDE_CONFIG_H"],
CPPPATH=[os.path.join("$BUILD_DIR", "UnityTestLib")],
)
unitylib = env.BuildLibrary(
os.path.join("$BUILD_DIR", "UnityTestLib"), get_core_package_dir("tool-unity")
)
env.Prepend(LIBS=[unitylib])
src_filter = ["+<*.cpp>", "+<*.c>"]
if "PIOTEST_RUNNING_NAME" in env:
src_filter.append("+<%s%s>" % (env["PIOTEST_RUNNING_NAME"], os.path.sep))
env.Replace(PIOTEST_SRC_FILTER=src_filter)
def GetExtraScripts(env, scope):
items = []
for item in env.GetProjectOption("extra_scripts", []):
@ -146,14 +130,12 @@ def GetExtraScripts(env, scope):
return [os.path.abspath(env.subst(item)) for item in items]
def exists(_):
return True
def generate(env):
env.AddMethod(GetCompilerType)
env.AddMethod(GetActualLDScript)
env.AddMethod(ConfigureDebugFlags)
env.AddMethod(ConfigureTestTarget)
env.AddMethod(ConfigureDebugTarget)
env.AddMethod(GetExtraScripts)
return env
def exists(_):
return True

View File

@ -0,0 +1,48 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import os
from platformio.unittest.result import TestSuite
from platformio.unittest.runners.factory import TestRunnerFactory
def ConfigureTestTarget(env):
env.Append(
CPPDEFINES=["UNIT_TEST", "PIO_UNIT_TESTING"],
PIOTEST_SRC_FILTER=["+<*.cpp>", "+<*.c>"],
)
if "PIOTEST_RUNNING_NAME" in env:
env.Append(
PIOTEST_SRC_FILTER=[f"+<$PIOTEST_RUNNING_NAME{os.path.sep}>"],
CPPPATH=[os.path.join("$PROJECT_TEST_DIR", "$PIOTEST_RUNNING_NAME")],
)
env.Append(CPPPATH=["$PROJECT_TEST_DIR"])
test_runner = TestRunnerFactory.new(
TestSuite(env["PIOENV"], env.get("PIOTEST_RUNNING_NAME", "*")),
env.GetProjectConfig(),
)
test_runner.configure_build_env(env)
def generate(env):
env.AddMethod(ConfigureTestTarget)
def exists(_):
return True

View File

@ -47,14 +47,16 @@ def scons_patched_match_splitext(path, suffixes=None):
def GetBuildType(env):
return (
"debug"
if (
set(["__debug", "sizedata"]) & set(COMMAND_LINE_TARGETS)
or env.GetProjectOption("build_type") == "debug"
)
else "release"
)
modes = []
if (
set(["__debug", "sizedata"]) # sizedata = for memory inspection
& set(COMMAND_LINE_TARGETS)
or env.GetProjectOption("build_type") == "debug"
):
modes.append("debug")
if "__test" in COMMAND_LINE_TARGETS or env.GetProjectOption("build_type") == "test":
modes.append("test")
return "+".join(modes or ["release"])
def BuildProgram(env):
@ -123,20 +125,19 @@ def ProcessProgramDeps(env):
# process framework scripts
env.BuildFrameworks(env.get("PIOFRAMEWORK"))
if env.GetBuildType() == "debug":
env.ConfigureDebugFlags()
if "debug" in env.GetBuildType():
env.ConfigureDebugTarget()
if "test" in env.GetBuildType():
env.ConfigureTestTarget()
# remove specified flags
env.ProcessUnFlags(env.get("BUILD_UNFLAGS"))
if "__test" in COMMAND_LINE_TARGETS:
env.ConfigureTestTarget()
if "compiledb" in COMMAND_LINE_TARGETS and env.get(
"COMPILATIONDB_INCLUDE_TOOLCHAIN"
):
for scope, includes in env.DumpIntegrationIncludes().items():
if scope in ("toolchain", "unity"):
if scope in ("toolchain",):
env.Append(CPPPATH=includes)
@ -161,12 +162,13 @@ def ProcessProjectDeps(env):
# extra build flags from `platformio.ini`
projenv.ProcessFlags(env.get("SRC_BUILD_FLAGS"))
is_test = "__test" in COMMAND_LINE_TARGETS
if is_test:
if "test" in env.GetBuildType():
projenv.BuildSources(
"$BUILD_TEST_DIR", "$PROJECT_TEST_DIR", "$PIOTEST_SRC_FILTER"
)
if not is_test or env.GetProjectOption("test_build_project_src"):
if "test" not in env.GetBuildType() or env.GetProjectOption(
"test_build_project_src"
):
projenv.BuildSources(
"$BUILD_SRC_DIR", "$PROJECT_SRC_DIR", env.get("SRC_FILTER")
)

View File

@ -27,9 +27,9 @@ from platformio import fs, proc
from platformio.commands.device import helpers as device_helpers
from platformio.commands.device.command import device_monitor as cmd_device_monitor
from platformio.commands.run.command import cli as cmd_run
from platformio.commands.test.command import cli as cmd_test
from platformio.package.manager.core import inject_contrib_pysite
from platformio.project.exception import NotPlatformIOProjectError
from platformio.unittest.command import unittest_cmd
@click.group("remote", short_help="Remote Development")
@ -217,7 +217,7 @@ def remote_test(
click.secho("Building project locally", bold=True)
ctx.invoke(
cmd_test,
unittest_cmd,
environment=environment,
ignore=ignore,
project_dir=project_dir,

View File

@ -25,9 +25,9 @@ from platformio import app, exception, fs, util
from platformio.commands.device.command import device_monitor as cmd_device_monitor
from platformio.commands.run.helpers import clean_build_dir, handle_legacy_libdeps
from platformio.commands.run.processor import EnvironmentProcessor
from platformio.commands.test.processor import CTX_META_TEST_IS_RUNNING
from platformio.project.config import ProjectConfig
from platformio.project.helpers import find_project_dir_above, load_project_ide_data
from platformio.unittest.runners.base import CTX_META_TEST_IS_RUNNING
# pylint: disable=too-many-arguments,too-many-locals,too-many-branches

View File

@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.commands.test.processor import CTX_META_TEST_RUNNING_NAME
from platformio.package.commands.install import install_project_env_dependencies
from platformio.platform.factory import PlatformFactory
from platformio.project.exception import UndefinedEnvPlatformError
from platformio.unittest.runners.base import CTX_META_TEST_RUNNING_NAME
# pylint: disable=too-many-instance-attributes

View File

@ -12,19 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
# pylint: disable=unused-import
from platformio import exception
def get_test_names(config):
test_dir = config.get("platformio", "test_dir")
if not os.path.isdir(test_dir):
raise exception.TestDirNotExists(test_dir)
names = []
for item in sorted(os.listdir(test_dir)):
if os.path.isdir(os.path.join(test_dir, item)):
names.append(item)
if not names:
names = ["*"]
return names
from platformio.unittest.command import unittest_cmd as cli

View File

@ -1,271 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-arguments, too-many-locals, too-many-branches
import fnmatch
import os
import shutil
from time import time
import click
from tabulate import tabulate
from platformio import app, exception, fs, util
from platformio.commands.test.embedded import EmbeddedTestProcessor
from platformio.commands.test.helpers import get_test_names
from platformio.commands.test.native import NativeTestProcessor
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectConfig
@click.command("test", short_help="Unit Testing")
@click.option("--environment", "-e", multiple=True, metavar="<environment>")
@click.option(
"--filter",
"-f",
multiple=True,
metavar="<pattern>",
help="Filter tests by a pattern",
)
@click.option(
"--ignore",
"-i",
multiple=True,
metavar="<pattern>",
help="Ignore tests by a pattern",
)
@click.option("--upload-port")
@click.option("--test-port")
@click.option(
"-d",
"--project-dir",
default=os.getcwd,
type=click.Path(
exists=True, file_okay=False, dir_okay=True, writable=True, resolve_path=True
),
)
@click.option(
"-c",
"--project-conf",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True
),
)
@click.option("--without-building", is_flag=True)
@click.option("--without-uploading", is_flag=True)
@click.option("--without-testing", is_flag=True)
@click.option("--no-reset", is_flag=True)
@click.option(
"--monitor-rts",
default=None,
type=click.IntRange(0, 1),
help="Set initial RTS line state for Serial Monitor",
)
@click.option(
"--monitor-dtr",
default=None,
type=click.IntRange(0, 1),
help="Set initial DTR line state for Serial Monitor",
)
@click.option("--verbose", "-v", is_flag=True)
@click.pass_context
def cli( # pylint: disable=redefined-builtin
ctx,
environment,
ignore,
filter,
upload_port,
test_port,
project_dir,
project_conf,
without_building,
without_uploading,
without_testing,
no_reset,
monitor_rts,
monitor_dtr,
verbose,
):
app.set_session_var("custom_project_conf", project_conf)
with fs.cd(project_dir):
config = ProjectConfig.get_instance(project_conf)
config.validate(envs=environment)
test_names = get_test_names(config)
if not verbose:
click.echo("Verbose mode can be enabled via `-v, --verbose` option")
click.secho("Collected %d items" % len(test_names), bold=True)
results = []
default_envs = config.default_envs()
for testname in test_names:
for envname in config.envs():
section = "env:%s" % envname
# filter and ignore patterns
patterns = dict(filter=list(filter), ignore=list(ignore))
for key in patterns:
patterns[key].extend(config.get(section, "test_%s" % key, []))
skip_conditions = [
environment and envname not in environment,
not environment and default_envs and envname not in default_envs,
testname != "*"
and patterns["filter"]
and not any(
fnmatch.fnmatch(testname, p) for p in patterns["filter"]
),
testname != "*"
and any(fnmatch.fnmatch(testname, p) for p in patterns["ignore"]),
]
if any(skip_conditions):
results.append({"env": envname, "test": testname})
continue
click.echo()
print_processing_header(testname, envname)
cls = (
EmbeddedTestProcessor
if config.get(section, "platform")
and PlatformFactory.new(
config.get(section, "platform"), autoinstall=True
).is_embedded()
else NativeTestProcessor
)
tp = cls(
ctx,
testname,
envname,
dict(
project_config=config,
project_dir=project_dir,
upload_port=upload_port,
test_port=test_port,
without_building=without_building,
without_uploading=without_uploading,
without_testing=without_testing,
no_reset=no_reset,
monitor_rts=monitor_rts,
monitor_dtr=monitor_dtr,
verbose=verbose,
silent=not verbose,
),
)
result = {
"env": envname,
"test": testname,
"duration": time(),
"succeeded": tp.process(),
}
result["duration"] = time() - result["duration"]
results.append(result)
print_processing_footer(result)
# Reset custom project config
app.set_session_var("custom_project_conf", None)
if without_testing:
return
print_testing_summary(results, verbose)
command_failed = any(r.get("succeeded") is False for r in results)
if command_failed:
raise exception.ReturnErrorCode(1)
def print_processing_header(test, env):
click.echo(
"Processing %s in %s environment"
% (
click.style(test, fg="yellow", bold=True),
click.style(env, fg="cyan", bold=True),
)
)
terminal_width, _ = shutil.get_terminal_size()
click.secho("-" * terminal_width, bold=True)
def print_processing_footer(result):
is_failed = not result.get("succeeded")
util.print_labeled_bar(
"[%s] Took %.2f seconds"
% (
(
click.style("FAILED", fg="red", bold=True)
if is_failed
else click.style("PASSED", fg="green", bold=True)
),
result["duration"],
),
is_error=is_failed,
)
def print_testing_summary(results, verbose=False):
click.echo()
tabular_data = []
succeeded_nums = 0
failed_nums = 0
duration = 0
for result in results:
duration += result.get("duration", 0)
if result.get("succeeded") is False:
failed_nums += 1
status_str = click.style("FAILED", fg="red")
elif result.get("succeeded") is None:
if not verbose:
continue
status_str = "IGNORED"
else:
succeeded_nums += 1
status_str = click.style("PASSED", fg="green")
tabular_data.append(
(
result["test"],
click.style(result["env"], fg="cyan"),
status_str,
util.humanize_duration_time(result.get("duration")),
)
)
click.echo(
tabulate(
tabular_data,
headers=[
click.style(s, bold=True)
for s in ("Test", "Environment", "Status", "Duration")
],
),
err=failed_nums,
)
util.print_labeled_bar(
"%s%d succeeded in %s"
% (
"%d failed, " % failed_nums if failed_nums else "",
succeeded_nums,
util.humanize_duration_time(duration),
),
is_error=failed_nums,
fg="red" if failed_nums else "green",
)

View File

@ -1,41 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os.path import join
from platformio import proc
from platformio.commands.test.processor import TestProcessorBase
from platformio.proc import LineBufferedAsyncPipe
class NativeTestProcessor(TestProcessorBase):
def process(self):
if not self.options["without_building"]:
self.print_progress("Building...")
if not self.build_or_upload(["__test"]):
return False
if self.options["without_testing"]:
return None
self.print_progress("Testing...")
return self.run()
def run(self):
build_dir = self.options["project_config"].get("platformio", "build_dir")
result = proc.exec_command(
[join(build_dir, self.env_name, "program")],
stdout=LineBufferedAsyncPipe(self.on_run_out),
stderr=LineBufferedAsyncPipe(self.on_run_out),
)
assert "returncode" in result
return result["returncode"] == 0 and not self._run_failed

View File

@ -1,235 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
import re
from os import listdir, remove
from os.path import isdir, isfile, join
from string import Template
import click
from platformio import exception
TRANSPORT_OPTIONS = {
"arduino": {
"include": "#include <Arduino.h>",
"object": "",
"putchar": "Serial.write(c);",
"flush": "Serial.flush();",
"begin": "Serial.begin($baudrate);",
"end": "Serial.end();",
"language": "cpp",
},
"mbed": {
"include": "#include <mbed.h>",
"object": (
"#if MBED_MAJOR_VERSION == 6\nUnbufferedSerial pc(USBTX, USBRX);\n"
"#else\nRawSerial pc(USBTX, USBRX);\n#endif"
),
"putchar": (
"#if MBED_MAJOR_VERSION == 6\npc.write(&c, 1);\n"
"#else\npc.putc(c);\n#endif"
),
"flush": "",
"begin": "pc.baud($baudrate);",
"end": "",
"language": "cpp",
},
"espidf": {
"include": "#include <stdio.h>",
"object": "",
"putchar": "putchar(c);",
"flush": "fflush(stdout);",
"begin": "",
"end": "",
},
"zephyr": {
"include": "#include <sys/printk.h>",
"object": "",
"putchar": 'printk("%c", c);',
"flush": "",
"begin": "",
"end": "",
},
"native": {
"include": "#include <stdio.h>",
"object": "",
"putchar": "putchar(c);",
"flush": "fflush(stdout);",
"begin": "",
"end": "",
},
"custom": {
"include": '#include "unittest_transport.h"',
"object": "",
"putchar": "unittest_uart_putchar(c);",
"flush": "unittest_uart_flush();",
"begin": "unittest_uart_begin();",
"end": "unittest_uart_end();",
"language": "cpp",
},
}
CTX_META_TEST_IS_RUNNING = __name__ + ".test_running"
CTX_META_TEST_RUNNING_NAME = __name__ + ".test_running_name"
class TestProcessorBase(object):
DEFAULT_BAUDRATE = 115200
def __init__(self, cmd_ctx, testname, envname, options):
self.cmd_ctx = cmd_ctx
self.cmd_ctx.meta[CTX_META_TEST_IS_RUNNING] = True
self.test_name = testname
self.options = options
self.env_name = envname
self.env_options = options["project_config"].items(env=envname, as_dict=True)
self._run_failed = False
self._output_file_generated = False
def get_transport(self):
transport = None
if self.env_options.get("platform") == "native":
transport = "native"
elif "framework" in self.env_options:
transport = self.env_options.get("framework")[0]
if "test_transport" in self.env_options:
transport = self.env_options["test_transport"]
if transport not in TRANSPORT_OPTIONS:
raise exception.PlatformioException(
"Unknown Unit Test transport `%s`. Please check a documentation how "
"to create an own 'Test Transport':\n"
"- https://docs.platformio.org/page/plus/unit-testing.html" % transport
)
return transport.lower()
def get_baudrate(self):
return int(self.env_options.get("test_speed", self.DEFAULT_BAUDRATE))
def print_progress(self, text):
click.secho(text, bold=self.options.get("verbose"))
def build_or_upload(self, target):
if not self._output_file_generated:
self.generate_output_file(
self.options["project_config"].get("platformio", "test_dir")
)
self._output_file_generated = True
if self.test_name != "*":
self.cmd_ctx.meta[CTX_META_TEST_RUNNING_NAME] = self.test_name
try:
# pylint: disable=import-outside-toplevel
from platformio.commands.run.command import cli as cmd_run
return self.cmd_ctx.invoke(
cmd_run,
project_dir=self.options["project_dir"],
project_conf=self.options["project_config"].path,
upload_port=self.options.get("upload_port"),
verbose=self.options["verbose"],
silent=self.options.get("silent"),
environment=[self.env_name],
disable_auto_clean="nobuild" in target,
target=target,
)
except exception.ReturnErrorCode:
return False
def process(self):
raise NotImplementedError
def run(self):
raise NotImplementedError
def on_run_out(self, line):
line = line.strip()
if line.endswith(":PASS"):
click.echo("%s\t[%s]" % (line[:-5], click.style("PASSED", fg="green")))
elif ":FAIL" in line:
self._run_failed = True
click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red")))
else:
if "Failures" in line:
match = re.match(r"\d+\s+Tests\s+(\d+)\s+Failures", line)
if match and int(match.group(1)) > 0:
self._run_failed = True
click.echo(line)
def generate_output_file(self, test_dir):
assert isdir(test_dir)
file_tpl = "\n".join(
[
"$include",
"#include <output_export.h>",
"",
"$object",
"",
"#ifdef __GNUC__",
"void output_start(unsigned int baudrate __attribute__((unused)))",
"#else",
"void output_start(unsigned int baudrate)",
"#endif",
"{",
" $begin",
"}",
"",
"void output_char(int c)",
"{",
" $putchar",
"}",
"",
"void output_flush(void)",
"{",
" $flush",
"}",
"",
"void output_complete(void)",
"{",
" $end",
"}",
]
)
tmp_file_prefix = "tmp_pio_test_transport"
def delete_tmptest_files(test_dir):
for item in listdir(test_dir):
if item.startswith(tmp_file_prefix) and isfile(join(test_dir, item)):
try:
remove(join(test_dir, item))
except: # pylint: disable=bare-except
click.secho(
"Warning: Could not remove temporary file '%s'. "
"Please remove it manually." % join(test_dir, item),
fg="yellow",
)
transport_options = TRANSPORT_OPTIONS[self.get_transport()]
tpl = Template(file_tpl).substitute(transport_options)
data = Template(tpl).substitute(baudrate=self.get_baudrate())
delete_tmptest_files(test_dir)
tmp_file = join(
test_dir,
"%s.%s" % (tmp_file_prefix, transport_options.get("language", "c")),
)
with open(tmp_file, mode="w", encoding="utf8") as fp:
fp.write(data)
atexit.register(delete_tmptest_files, test_dir)

View File

@ -81,7 +81,29 @@ def debug_cmd(
project_dir = os.getenv(name)
with fs.cd(project_dir):
project_config = ProjectConfig.get_instance(project_conf)
return _debug_in_project_dir(
ctx,
project_dir,
project_conf,
environment,
load_mode,
verbose,
interface,
__unprocessed,
)
def _debug_in_project_dir(
ctx,
project_dir,
project_conf,
environment,
load_mode,
verbose,
interface,
__unprocessed,
):
project_config = ProjectConfig.get_instance(project_conf)
project_config.validate(envs=[environment] if environment else None)
env_name = environment or helpers.get_default_debug_env(project_config)
@ -94,12 +116,11 @@ def debug_cmd(
if "platform" not in env_options:
raise ProjectEnvsNotAvailableError()
with fs.cd(project_dir):
debug_config = DebugConfigFactory.new(
PlatformFactory.new(env_options["platform"], autoinstall=True),
project_config,
env_name,
)
debug_config = DebugConfigFactory.new(
PlatformFactory.new(env_options["platform"], autoinstall=True),
project_config,
env_name,
)
if "--version" in __unprocessed:
return subprocess.run(
@ -165,19 +186,18 @@ def debug_cmd(
loop = asyncio.ProactorEventLoop() if IS_WINDOWS else asyncio.get_event_loop()
asyncio.set_event_loop(loop)
with fs.cd(project_dir):
client = GDBClientProcess(project_dir, debug_config)
coro = client.run(__unprocessed)
try:
signal.signal(signal.SIGINT, signal.SIG_IGN)
loop.run_until_complete(coro)
if IS_WINDOWS:
client.close()
# an issue with `asyncio` executor and STIDIN,
# it cannot be closed gracefully
proc.force_exit()
finally:
client = GDBClientProcess(project_dir, debug_config)
coro = client.run(__unprocessed)
try:
signal.signal(signal.SIGINT, signal.SIG_IGN)
loop.run_until_complete(coro)
if IS_WINDOWS:
client.close()
loop.close()
# an issue with `asyncio` executor and STIDIN,
# it cannot be closed gracefully
proc.force_exit()
finally:
client.close()
loop.close()
return True

View File

@ -24,10 +24,12 @@ from platformio import util
from platformio.commands import PlatformioCLI
from platformio.commands.run.command import cli as cmd_run
from platformio.commands.run.command import print_processing_header
from platformio.commands.test.helpers import get_test_names
from platformio.commands.test.processor import TestProcessorBase
from platformio.compat import IS_WINDOWS, is_bytes
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.unittest.command import get_test_names
from platformio.unittest.result import TestSuite
from platformio.unittest.runners.base import TestRunnerOptions
from platformio.unittest.runners.factory import TestRunnerFactory
class GDBMIConsoleStream(BytesIO): # pylint: disable=too-few-public-methods
@ -87,20 +89,18 @@ def predebug_project(
% (debug_testname, ", ".join(test_names))
)
print_processing_header(env_name, project_config, verbose)
tp = TestProcessorBase(
ctx,
debug_testname,
env_name,
dict(
project_config=project_config,
project_dir=project_dir,
test_runner = TestRunnerFactory.new(
TestSuite(env_name, debug_testname),
project_config,
TestRunnerOptions(
verbose=verbose,
without_building=False,
without_uploading=True,
without_debugging=False,
without_uploading=not preload,
without_testing=True,
verbose=False,
),
)
tp.build_or_upload(["__debug", "__test"] + (["upload"] if preload else []))
test_runner.start(ctx)
else:
ctx.invoke(
cmd_run,

View File

@ -48,7 +48,7 @@ class AbortedByUser(UserSideException):
#
class InvalidUdevRules(PlatformioException):
class InvalidUdevRules(UserSideException):
pass
@ -135,14 +135,3 @@ class CygwinEnvDetected(PlatformioException):
"PlatformIO does not work within Cygwin environment. "
"Use native Terminal instead."
)
class TestDirNotExists(UserSideException):
MESSAGE = (
"A test folder '{0}' does not exist.\nPlease create 'test' "
"directory in project's root and put a test set.\n"
"More details about Unit "
"Testing: https://docs.platformio.org/page/plus/"
"unit-testing.html"
)

View File

@ -26,6 +26,8 @@ from platformio.package.manager.tool import ToolPackageManager
from platformio.package.meta import PackageSpec
from platformio.project.config import ProjectConfig
from platformio.project.savedeps import pkg_to_save_spec, save_project_dependencies
from platformio.unittest.result import TestSuite
from platformio.unittest.runners.factory import TestRunnerFactory
@click.command(
@ -211,7 +213,13 @@ def _install_project_env_libraries(project_env, options):
if options.get("silent"):
env_lm.set_log_level(logging.WARN)
private_lm.set_log_level(logging.WARN)
for library in config.get(f"env:{project_env}", "lib_deps"):
lib_deps = config.get(f"env:{project_env}", "lib_deps")
if "__test" in options.get("project_targets", []):
test_runner = TestRunnerFactory.new(TestSuite(project_env, "*"), config)
lib_deps.extend(test_runner.EXTRA_LIB_DEPS or [])
for library in lib_deps:
spec = PackageSpec(library)
# skip built-in dependencies
if not spec.external and not spec.owner:
@ -223,9 +231,11 @@ def _install_project_env_libraries(project_env, options):
skip_dependencies=options.get("skip_dependencies"),
force=options.get("force"),
)
# install dependencies from the private libraries
for pkg in private_lm.get_installed():
_install_project_private_library_deps(pkg, private_lm, env_lm, options)
return not already_up_to_date

View File

@ -403,7 +403,7 @@ ProjectOptions = OrderedDict(
group="build",
name="build_type",
description="Project build configuration",
type=click.Choice(["release", "debug"]),
type=click.Choice(["release", "test", "debug"]),
default="release",
),
ConfigEnvOption(
@ -646,6 +646,13 @@ ProjectOptions = OrderedDict(
default=False,
),
# Test
ConfigEnvOption(
group="test",
name="test_framework",
description="A unit testing framework",
type=click.Choice(["unity", "custom"]),
default="unity",
),
ConfigEnvOption(
group="test",
name="test_filter",
@ -668,11 +675,7 @@ ProjectOptions = OrderedDict(
name="test_speed",
description="A connection speed (baud rate) to communicate with a target device",
type=click.INT,
),
ConfigEnvOption(
group="test",
name="test_transport",
description="A transport to communicate with a target device",
default=115200,
),
ConfigEnvOption(
group="test",

View File

@ -0,0 +1,209 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fnmatch
import os
import shutil
import click
from platformio import app, exception, fs, util
from platformio.project.config import ProjectConfig
from platformio.unittest.exception import TestDirNotExistsError
from platformio.unittest.reports.base import TestReportFactory
from platformio.unittest.result import TestStatus, TestSuite, TestSummary
from platformio.unittest.runners.base import TestRunnerOptions
from platformio.unittest.runners.factory import TestRunnerFactory
@click.command("test", short_help="Unit Testing")
@click.option("--environment", "-e", multiple=True, metavar="<environment>")
@click.option(
"--filter",
"-f",
multiple=True,
metavar="<pattern>",
help="Filter tests by a pattern",
)
@click.option(
"--ignore",
"-i",
multiple=True,
metavar="<pattern>",
help="Ignore tests by a pattern",
)
@click.option("--upload-port")
@click.option("--test-port")
@click.option(
"-d",
"--project-dir",
default=os.getcwd,
type=click.Path(
exists=True, file_okay=False, dir_okay=True, writable=True, resolve_path=True
),
)
@click.option(
"-c",
"--project-conf",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True
),
)
@click.option("--without-building", is_flag=True)
@click.option("--without-uploading", is_flag=True)
@click.option("--without-testing", is_flag=True)
@click.option("--no-reset", is_flag=True)
@click.option(
"--monitor-rts",
default=None,
type=click.IntRange(0, 1),
help="Set initial RTS line state for Serial Monitor",
)
@click.option(
"--monitor-dtr",
default=None,
type=click.IntRange(0, 1),
help="Set initial DTR line state for Serial Monitor",
)
@click.option("--verbose", "-v", is_flag=True)
@click.pass_context
def unittest_cmd( # pylint: disable=too-many-arguments,too-many-locals,redefined-builtin
ctx,
environment,
ignore,
filter,
upload_port,
test_port,
project_dir,
project_conf,
without_building,
without_uploading,
without_testing,
no_reset,
monitor_rts,
monitor_dtr,
verbose,
):
app.set_session_var("custom_project_conf", project_conf)
with fs.cd(project_dir):
config = ProjectConfig.get_instance(project_conf)
config.validate(envs=environment)
test_names = get_test_names(config)
if not verbose:
click.echo("Verbose mode can be enabled via `-v, --verbose` option")
click.secho("Collected %d test suites" % len(test_names), bold=True)
test_summary = TestSummary(os.path.basename(project_dir))
default_envs = config.default_envs()
for env_name in config.envs():
for test_name in test_names:
test_suite = TestSuite(env_name, test_name)
test_summary.add_suite(test_suite)
# filter and ignore patterns
patterns = dict(filter=list(filter), ignore=list(ignore))
for key in patterns:
patterns[key].extend(
config.get(f"env:{env_name}", f"test_{key}", [])
)
skip_conditions = [
environment and env_name not in environment,
not environment and default_envs and env_name not in default_envs,
test_name != "*"
and patterns["filter"]
and not any(
fnmatch.fnmatch(test_name, p) for p in patterns["filter"]
),
test_name != "*"
and any(fnmatch.fnmatch(test_name, p) for p in patterns["ignore"]),
]
if any(skip_conditions):
continue
runner = TestRunnerFactory.new(
test_suite,
config,
TestRunnerOptions(
verbose=verbose,
without_building=without_building,
without_uploading=without_uploading,
without_testing=without_testing,
upload_port=upload_port,
test_port=test_port,
no_reset=no_reset,
monitor_rts=monitor_rts,
monitor_dtr=monitor_dtr,
),
)
click.echo()
print_suite_header(test_suite)
runner.start(ctx)
print_suite_footer(test_suite)
# Reset custom project config
app.set_session_var("custom_project_conf", None)
stdout_report = TestReportFactory.new("stdout", test_summary)
stdout_report.generate(verbose=verbose)
if test_summary.is_errored or test_summary.get_status_nums(TestStatus.FAILED):
raise exception.ReturnErrorCode(1)
def get_test_names(config):
test_dir = config.get("platformio", "test_dir")
if not os.path.isdir(test_dir):
raise TestDirNotExistsError(test_dir)
names = []
for item in sorted(os.listdir(test_dir)):
if os.path.isdir(os.path.join(test_dir, item)):
names.append(item)
if not names:
names = ["*"]
return names
def print_suite_header(test_suite):
click.echo(
"Processing %s in %s environment"
% (
click.style(test_suite.test_name, fg="yellow", bold=True),
click.style(test_suite.env_name, fg="cyan", bold=True),
)
)
terminal_width, _ = shutil.get_terminal_size()
click.secho("-" * terminal_width, bold=True)
def print_suite_footer(test_suite):
is_error = test_suite.status in (TestStatus.FAILED, TestStatus.ERRORED)
util.print_labeled_bar(
"%s [%s] Took %.2f seconds"
% (
click.style(
"%s:%s" % (test_suite.env_name, test_suite.test_name), bold=True
),
(
click.style(test_suite.status.name, fg="red", bold=True)
if is_error
else click.style("PASSED", fg="green", bold=True)
),
test_suite.duration,
),
is_error=is_error,
sep="-",
)

View File

@ -0,0 +1,34 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.exception import PlatformioException, UserSideException
class UnitTestError(PlatformioException):
pass
class TestDirNotExistsError(UnitTestError, UserSideException):
MESSAGE = (
"A test folder '{0}' does not exist.\nPlease create 'test' "
"directory in the project root and put a test set.\n"
"More details about Unit "
"Testing: https://docs.platformio.org/page/plus/"
"unit-testing.html"
)
class UnitTestSuiteError(UnitTestError):
pass

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,36 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
from platformio.unittest.result import TestSummary
class TestReportBase:
def __init__(self, test_summary):
self.test_summary = test_summary
def generate(self):
raise NotImplementedError()
class TestReportFactory:
@staticmethod
def new( # pylint: disable=redefined-builtin
format, test_summary
) -> TestReportBase:
assert isinstance(test_summary, TestSummary)
mod = importlib.import_module(f"platformio.unittest.reports.{format}")
report_cls = getattr(mod, "%sTestReport" % format.lower().capitalize())
return report_cls(test_summary)

View File

@ -0,0 +1,73 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
from tabulate import tabulate
from platformio import util
from platformio.unittest.reports.base import TestReportBase
from platformio.unittest.result import TestStatus
class StdoutTestReport(TestReportBase):
def generate(self, verbose=False):
click.echo()
tabular_data = []
failed_nums = self.test_summary.get_status_nums(TestStatus.FAILED)
skipped_nums = self.test_summary.get_status_nums(TestStatus.SKIPPED)
is_error = failed_nums > 0 or self.test_summary.is_errored
for test_suite in self.test_summary.suites:
if not verbose and test_suite.status == TestStatus.SKIPPED:
continue
status_str = test_suite.status.name
if test_suite.status in (TestStatus.FAILED, TestStatus.ERRORED):
status_str = click.style(status_str, fg="red")
elif test_suite.status == TestStatus.PASSED:
status_str = click.style(status_str, fg="green")
tabular_data.append(
(
click.style(test_suite.env_name, fg="cyan"),
test_suite.test_name,
status_str,
util.humanize_duration_time(test_suite.duration or None),
)
)
if tabular_data:
click.echo(
tabulate(
tabular_data,
headers=[
click.style(s, bold=True)
for s in ("Environment", "Test", "Status", "Duration")
],
),
err=is_error,
)
util.print_labeled_bar(
"%d test cases: %s%s%d succeeded in %s"
% (
self.test_summary.case_nums,
("%d failed, " % failed_nums) if failed_nums else "",
("%d skipped, " % skipped_nums) if skipped_nums else "",
self.test_summary.get_status_nums(TestStatus.PASSED),
util.humanize_duration_time(self.test_summary.duration),
),
is_error=is_error,
fg="red" if is_error else "green",
)

View File

@ -0,0 +1,132 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import functools
import operator
import time
class TestStatus(enum.Enum):
PASSED = enum.auto()
FAILED = enum.auto()
SKIPPED = enum.auto()
ERRORED = enum.auto()
@classmethod
def from_string(cls, value: str):
value = value.lower()
if value.startswith("pass"):
return cls.PASSED
if value.startswith(("ignore", "skip")):
return cls.SKIPPED
if value.startswith("fail"):
return cls.FAILED
raise ValueError(f"Unknown test status `{value}`")
class TestCaseSource:
def __init__(self, file, line=None):
self.file = file
self.line = line
class TestCase:
def __init__( # pylint: disable=too-many-arguments
self, name, status, message=None, stdout=None, source=None
):
assert isinstance(status, TestStatus)
self.name = name.strip()
self.status = status
self.message = message.strip() if message else None
self.stdout = stdout.strip() if stdout else None
self.source = source
class TestSuite:
def __init__(self, env_name, test_name):
self.env_name = env_name
self.test_name = test_name
self.duration = 0
self._cases = []
self._start_timestamp = 0
self._finished = False
self._error = None
@property
def cases(self):
return self._cases
def get_status_nums(self, status):
return len([True for c in self._cases if c.status == status])
@property
def status(self):
if self._error:
return TestStatus.ERRORED
if self.get_status_nums(TestStatus.FAILED):
return TestStatus.FAILED
if self._cases and any(c.status == TestStatus.PASSED for c in self._cases):
return TestStatus.PASSED
return TestStatus.SKIPPED
def add_case(self, case: TestCase):
assert isinstance(case, TestCase)
self._cases.append(case)
def is_finished(self):
return self._finished
def on_start(self):
self._start_timestamp = time.time()
def on_error(self, exc):
self._error = exc
def on_finish(self):
if self.is_finished():
return
self._finished = True
self.duration = time.time() - self._start_timestamp
class TestSummary:
def __init__(self, name):
self.name = name
self._suites = []
@property
def suites(self):
return self._suites
def add_suite(self, suite):
assert isinstance(suite, TestSuite)
self._suites.append(suite)
@property
def duration(self):
return functools.reduce(operator.add, [s.duration for s in self._suites])
@property
def case_nums(self):
return functools.reduce(operator.add, [len(s.cases) for s in self._suites])
@property
def is_errored(self):
return any(s.status == TestStatus.ERRORED for s in self._suites)
def get_status_nums(self, status):
return functools.reduce(
operator.add, [s.get_status_nums(status) for s in self._suites]
)

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,190 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
from platformio.platform.factory import PlatformFactory
from platformio.unittest.exception import UnitTestSuiteError
from platformio.unittest.result import TestCase, TestCaseSource, TestStatus
from platformio.unittest.runners.mixins.embedded import TestRunnerEmbeddedMixin
from platformio.unittest.runners.mixins.native import TestRunnerNativeMixin
CTX_META_TEST_IS_RUNNING = __name__ + ".test_running"
CTX_META_TEST_RUNNING_NAME = __name__ + ".test_running_name"
class TestRunnerOptions: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
verbose=False,
without_building=False,
without_uploading=False,
without_testing=False,
without_debugging=True,
upload_port=None,
test_port=None,
no_reset=False,
monitor_rts=None,
monitor_dtr=None,
):
self.verbose = verbose
self.without_building = without_building
self.without_uploading = without_uploading
self.without_testing = without_testing
self.without_debugging = without_debugging
self.upload_port = upload_port
self.test_port = test_port
self.no_reset = no_reset
self.monitor_rts = monitor_rts
self.monitor_dtr = monitor_dtr
class TestRunnerBase(TestRunnerNativeMixin, TestRunnerEmbeddedMixin):
NAME = None
EXTRA_LIB_DEPS = None
TESTCASE_PARSE_RE = None
def __init__(self, test_suite, project_config, options=None):
self.test_suite = test_suite
self.options = options
self.project_config = project_config
self.platform = PlatformFactory.new(
self.project_config.get(f"env:{self.test_suite.env_name}", "platform"),
autoinstall=True,
)
self.cmd_ctx = None
@property
def name(self):
return self.__class__.__name__.replace("TestRunner", "").lower()
def get_test_speed(self):
return int(
self.project_config.get(f"env:{self.test_suite.env_name}", "test_speed")
)
def start(self, cmd_ctx):
# setup command context
self.cmd_ctx = cmd_ctx
self.cmd_ctx.meta[CTX_META_TEST_IS_RUNNING] = True
if self.test_suite.test_name != "*":
self.cmd_ctx.meta[CTX_META_TEST_RUNNING_NAME] = self.test_suite.test_name
self.test_suite.on_start()
try:
self.setup()
for stage in ("build", "upload", "run"):
getattr(self, f"stage_{stage}")()
except Exception as exc: # pylint: disable=broad-except
click.secho(str(exc), fg="red", err=True)
self.test_suite.on_error(exc)
finally:
self.test_suite.on_finish()
self.teardown()
def setup(self):
pass
def stage_build(self):
if self.options.without_building:
return None
click.secho("Building...", bold=self.options.verbose)
targets = ["__test"]
if not self.options.without_debugging:
targets.append("__debug")
if self.platform.is_embedded():
targets.append("checkprogsize")
return self.run_project_targets(targets)
def stage_upload(self):
if self.options.without_uploading or not self.platform.is_embedded():
return None
click.secho("Uploading...", bold=self.options.verbose)
targets = ["upload"]
if self.options.without_building:
targets.append("nobuild")
else:
targets.append("__test")
if not self.options.without_debugging:
targets.append("__debug")
return self.run_project_targets(targets)
def stage_run(self):
if self.options.without_testing:
return None
click.secho("Running...", bold=self.options.verbose)
if self.platform.is_embedded():
return self.stage_run_on_target()
return self.stage_run_on_host()
def teardown(self):
pass
def run_project_targets(self, targets):
# pylint: disable=import-outside-toplevel
from platformio.commands.run.command import cli as run_cmd
assert self.cmd_ctx
try:
return self.cmd_ctx.invoke(
run_cmd,
project_conf=self.project_config.path,
upload_port=self.options.upload_port,
verbose=self.options.verbose,
silent=not self.options.verbose,
environment=[self.test_suite.env_name],
disable_auto_clean="nobuild" in targets,
target=targets,
)
except Exception as exc:
raise UnitTestSuiteError(exc)
def configure_build_env(self, env): # pylint: disable=no-self-use
"""
Configure SCons build environment
Called in "builder/tools/piotest" tool
"""
return env
def on_run_output(self, data):
click.echo(data, nl=False)
self.parse_testcases(data)
def parse_testcases(self, data):
if not self.TESTCASE_PARSE_RE:
raise NotImplementedError()
for line in data.split("\n"):
line = line.strip()
if not line:
continue
match = self.TESTCASE_PARSE_RE.search(line)
if not match:
continue
data = match.groupdict()
source = None
if "source_file" in data:
source = TestCaseSource(
file=data["source_file"], line=data.get("source_line")
)
self.test_suite.add_case(
TestCase(
name=data.get("name"),
status=TestStatus.from_string(data.get("status")),
message=data.get("message"),
stdout=line,
source=source,
)
)

View File

@ -0,0 +1,57 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import os
import re
from platformio.compat import load_python_module
from platformio.exception import UserSideException
from platformio.project.config import ProjectConfig
from platformio.unittest.result import TestSuite
from platformio.unittest.runners.base import TestRunnerBase, TestRunnerOptions
class TestRunnerFactory(object):
@staticmethod
def get_clsname(name):
name = re.sub(r"[^\da-z\_\-]+", "", name, flags=re.I)
return "%sTestRunner" % name.lower().capitalize()
@classmethod
def new(cls, test_suite, project_config, options=None) -> TestRunnerBase:
assert isinstance(test_suite, TestSuite)
assert isinstance(project_config, ProjectConfig)
if options:
assert isinstance(options, TestRunnerOptions)
test_framework = project_config.get(
f"env:{test_suite.env_name}", "test_framework"
)
module_name = f"platformio.unittest.runners.{test_framework}"
runner_cls = None
if test_framework == "custom":
custom_runner_path = os.path.join(
project_config.get("platformio", "test_dir"), "custom_runner.py"
)
try:
mod = load_python_module(module_name, custom_runner_path)
except ImportError:
raise UserSideException(
"Could not find custom unit testing runner "
f"by this path -> {custom_runner_path}"
)
else:
mod = importlib.import_module(module_name)
runner_cls = getattr(mod, cls.get_clsname(test_framework))
return runner_cls(test_suite, project_config, options)

View File

@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -17,41 +17,15 @@ from time import sleep
import click
import serial
from platformio import exception, util
from platformio.commands.test.processor import TestProcessorBase
from platformio.platform.factory import PlatformFactory
from platformio import util
from platformio.exception import UserSideException
class EmbeddedTestProcessor(TestProcessorBase):
class TestRunnerEmbeddedMixin:
SERIAL_TIMEOUT = 600
def process(self):
if not self.options["without_building"]:
self.print_progress("Building...")
target = ["__test"]
if self.options["without_uploading"]:
target.append("checkprogsize")
if not self.build_or_upload(target):
return False
if not self.options["without_uploading"]:
self.print_progress("Uploading...")
target = ["upload"]
if self.options["without_building"]:
target.append("nobuild")
else:
target.append("__test")
if not self.build_or_upload(target):
return False
if self.options["without_testing"]:
return True
self.print_progress("Testing...")
return self.run()
def run(self):
def stage_run_on_target(self):
click.echo(
"If you don't see any output for the first 10 secs, "
"please reset board (press reset button)"
@ -60,17 +34,17 @@ class EmbeddedTestProcessor(TestProcessorBase):
try:
ser = serial.Serial(
baudrate=self.get_baudrate(), timeout=self.SERIAL_TIMEOUT
baudrate=self.get_test_speed(), timeout=self.SERIAL_TIMEOUT
)
ser.port = self.get_test_port()
ser.rts = self.options["monitor_rts"]
ser.dtr = self.options["monitor_dtr"]
ser.rts = self.options.monitor_rts
ser.dtr = self.options.monitor_dtr
ser.open()
except serial.SerialException as e:
click.secho(str(e), fg="red", err=True)
return False
return None
if not self.options["no_reset"]:
if not self.options.no_reset:
ser.flushInput()
ser.setDTR(False)
ser.setRTS(False)
@ -79,7 +53,7 @@ class EmbeddedTestProcessor(TestProcessorBase):
ser.setRTS(True)
sleep(0.1)
while True:
while not self.test_suite.is_finished():
line = ser.readline().strip()
# fix non-ascii output from device
@ -94,22 +68,19 @@ class EmbeddedTestProcessor(TestProcessorBase):
continue
if isinstance(line, bytes):
line = line.decode("utf8", "ignore")
self.on_run_out(line)
if all(l in line for l in ("Tests", "Failures", "Ignored")):
break
self.on_run_output(line)
ser.close()
return not self._run_failed
def get_test_port(self):
# if test port is specified manually or in config
if self.options.get("test_port"):
return self.options.get("test_port")
if self.env_options.get("test_port"):
return self.env_options.get("test_port")
port = self.options.test_port or self.project_config.get(
f"env:{self.test_suite.env_name}", "test_port"
)
if port:
return port
assert set(["platform", "board"]) & set(self.env_options.keys())
p = PlatformFactory.new(self.env_options["platform"])
board_hwids = p.board_config(self.env_options["board"]).get("build.hwids", [])
board = self.project_config.get(f"env:{self.test_suite.env_name}", "board")
board_hwids = self.platform.board_config(board).get("build.hwids", [])
port = None
elapsed = 0
while elapsed < 5 and not port:
@ -128,7 +99,7 @@ class EmbeddedTestProcessor(TestProcessorBase):
elapsed += 0.25
if not port:
raise exception.PlatformioException(
raise UserSideException(
"Please specify `test_port` for environment or use "
"global `--test-port` option."
)

View File

@ -0,0 +1,29 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from platformio import proc
class TestRunnerNativeMixin:
def stage_run_on_host(self):
build_dir = self.project_config.get("platformio", "build_dir")
result = proc.exec_command(
[os.path.join(build_dir, self.test_suite.env_name, "program")],
stdout=proc.LineBufferedAsyncPipe(self.on_run_output),
stderr=proc.LineBufferedAsyncPipe(self.on_run_output),
)
assert "returncode" in result
return result["returncode"] == 0

View File

@ -0,0 +1,252 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import string
from pathlib import Path
import click
from platformio.unittest.exception import UnitTestSuiteError
from platformio.unittest.runners.base import TestRunnerBase
class UnityTestRunner(TestRunnerBase):
EXTRA_LIB_DEPS = ["throwtheswitch/Unity@^2.5.2"]
# example
# test/test_foo.cpp:44:test_function_foo:FAIL: Expected 32 Was 33
TESTCASE_PARSE_RE = re.compile(
r"(?P<source_file>[^:]+):(?P<source_line>\d+):(?P<name>[^:]+):"
r"(?P<status>PASS|IGNORE|FAIL)(?:(?P<message>.+)$)?"
)
UNITY_CONFIG_H = """
#ifndef UNITY_CONFIG_H
#define UNITY_CONFIG_H
#ifndef NULL
#ifndef __cplusplus
#define NULL (void*)0
#else
#define NULL 0
#endif
#endif
#ifdef __cplusplus
extern "C"
{
#endif
void unityOutputStart(unsigned int);
void unityOutputChar(unsigned int);
void unityOutputFlush();
void unityOutputComplete();
#define UNITY_OUTPUT_START() unityOutputStart($baudrate)
#define UNITY_OUTPUT_CHAR(a) unityOutputChar(a)
#define UNITY_OUTPUT_FLUSH() unityOutputFlush()
#define UNITY_OUTPUT_COMPLETE() unityOutputComplete()
#ifdef __cplusplus
}
#endif /* extern "C" */
#endif /* UNITY_CONFIG_H */
"""
UNITY_CONFIG_C = """
#include <unity_config.h>
#if !defined(UNITY_WEAK_ATTRIBUTE) && !defined(UNITY_WEAK_PRAGMA)
# define UNITY_WEAK_ATTRIBUTE __attribute__((weak))
#endif
#ifdef __cplusplus
extern "C"
{
#endif
#ifdef UNITY_WEAK_ATTRIBUTE
UNITY_WEAK_ATTRIBUTE void setUp(void) { }
UNITY_WEAK_ATTRIBUTE void tearDown(void) { }
UNITY_WEAK_ATTRIBUTE void suiteSetUp(void) { }
UNITY_WEAK_ATTRIBUTE int suiteTearDown(int num_failures) { return num_failures; }
#elif defined(UNITY_WEAK_PRAGMA)
#pragma weak setUp
void setUp(void) { }
#pragma weak tearDown
void tearDown(void) { }
#pragma weak suiteSetUp
void suiteSetUp(void) { }
#pragma weak suiteTearDown
int suiteTearDown(int num_failures) { return num_failures; }
#endif
#ifdef __cplusplus
}
#endif /* extern "C" */
$framework_config_code
"""
UNITY_FRAMEWORK_CONFIG = dict(
native=dict(
code="""
#include <stdio.h>
void unityOutputStart(unsigned int baudrate) { }
void unityOutputChar(unsigned int c) { putchar(c); }
void unityOutputFlush(void) { fflush(stdout); }
void unityOutputComplete(void) { }
""",
language="c",
),
arduino=dict(
code="""
#include <Arduino.h>
void unityOutputStart(unsigned int baudrate) { Serial.begin(baudrate); }
void unityOutputChar(unsigned int c) { Serial.write(c); }
void unityOutputFlush(void) { Serial.flush(); }
void unityOutputComplete(void) { Serial.end(); }
""",
language="cpp",
),
mbed=dict(
code="""
#include <mbed.h>
#if MBED_MAJOR_VERSION == 6
UnbufferedSerial pc(USBTX, USBRX);
#else
RawSerial pc(USBTX, USBRX);
#endif
void unityOutputStart(unsigned int baudrate) { pc.baud(baudrate); }
void unityOutputChar(unsigned int c) {
#if MBED_MAJOR_VERSION == 6
pc.write(&c, 1);
#else
pc.putc(c);
#endif
}
void unityOutputFlush(void) { }
void unityOutputComplete(void) { }
""",
language="cpp",
),
espidf=dict(
code="""
#include <stdio.h>
void unityOutputStart(unsigned int baudrate) { }
void unityOutputChar(unsigned int c) { putchar(c); }
void unityOutputFlush(void) { fflush(stdout); }
void unityOutputComplete(void) { }
""",
language="c",
),
zephyr=dict(
code="""
#include <sys/printk.h>
void unityOutputStart(unsigned int baudrate) { }
void unityOutputChar(unsigned int c) { printk("%c", c); }
void unityOutputFlush(void) { }
void unityOutputComplete(void) { }
""",
language="c",
),
legacy_custom_transport=dict(
code="""
#include <unittest_transport.h>
void unityOutputStart(unsigned int baudrate) { unittest_uart_begin(); }
void unityOutputChar(unsigned int c) { unittest_uart_putchar(c); }
void unityOutputFlush(void) { unittest_uart_flush(); }
void unityOutputComplete(void) { unittest_uart_end(); }
""",
language="cpp",
),
)
def get_unity_framework_config(self):
if not self.platform.is_embedded():
return self.UNITY_FRAMEWORK_CONFIG["native"]
if (
self.project_config.get(f"env:{self.test_suite.env_name}", "test_transport")
== "custom"
):
framework = "legacy_custom_transport"
else:
framework = (
self.project_config.get(f"env:{self.test_suite.env_name}", "framework")
or [None]
)[0]
if framework and framework in self.UNITY_FRAMEWORK_CONFIG:
return self.UNITY_FRAMEWORK_CONFIG[framework]
raise UnitTestSuiteError(
f"Could not find Unity configuration for the `{framework}` framework.\n"
"Learn how to create a custom Unity configuration at"
"https://docs.platformio.org/page/plus/unit-testing.html"
)
def configure_build_env(self, env):
env.Append(CPPDEFINES=["UNITY_INCLUDE_CONFIG_H"])
env.Replace(
UNITY_CONFIG_DIR=os.path.join("$BUILD_DIR", "unity_config"),
BUILD_UNITY_CONFIG_DIR=os.path.join("$BUILD_DIR", "unity_config_build"),
)
env.Append(CPPPATH=["$UNITY_CONFIG_DIR"])
self.generate_unity_extras(env.subst("$UNITY_CONFIG_DIR"))
env.BuildSources("$BUILD_UNITY_CONFIG_DIR", "$UNITY_CONFIG_DIR")
def generate_unity_extras(self, dst_dir):
dst_dir = Path(dst_dir)
dst_dir.mkdir(parents=True, exist_ok=True)
unity_h = dst_dir / "unity_config.h"
if not unity_h.is_file():
unity_h.write_text(
string.Template(self.UNITY_CONFIG_H).substitute(
baudrate=self.get_test_speed()
)
)
framework_config = self.get_unity_framework_config()
unity_c = dst_dir / ("unity_config.%s" % framework_config.get("language", "c"))
if not unity_c.is_file():
unity_c.write_text(
string.Template(self.UNITY_CONFIG_C).substitute(
framework_config_code=framework_config["code"]
)
)
def on_run_output(self, data):
if not data.strip():
return click.echo(data, nl=False)
if all(s in data for s in ("Tests", "Failures", "Ignored")):
self.test_suite.on_finish()
# beautify output
for line in data.strip().split("\n"):
line = line.strip()
if line.endswith(":PASS"):
click.echo("%s\t[%s]" % (line[:-5], click.style("PASSED", fg="green")))
elif line.endswith(":IGNORE"):
click.echo(
"%s\t[%s]" % (line[:-7], click.style("IGNORED", fg="yellow"))
)
elif ":FAIL" in line:
click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red")))
else:
click.echo(line)
return self.parse_testcases(data)

View File

@ -270,10 +270,10 @@ def merge_dicts(d1, d2, path=None):
return d1
def print_labeled_bar(label, is_error=False, fg=None):
def print_labeled_bar(label, is_error=False, fg=None, sep="="):
terminal_width, _ = shutil.get_terminal_size()
width = len(click.unstyle(label))
half_line = "=" * int((terminal_width - width - 2) / 2)
half_line = sep * int((terminal_width - width - 2) / 2)
click.secho("%s %s %s" % (half_line, label, half_line), fg=fg, err=is_error)

View File

@ -13,15 +13,13 @@
# limitations under the License.
import os
import subprocess
import pytest
from platformio import proc
from platformio.commands.test.command import cli as cmd_test
from platformio.unittest.command import unittest_cmd
def test_local_env():
result = proc.exec_command(
def test_unity_calculator():
result = subprocess.run( # pylint: disable=subprocess-run-check
[
"platformio",
"test",
@ -29,77 +27,22 @@ def test_local_env():
os.path.join("examples", "unit-testing", "calculator"),
"-e",
"native",
]
],
capture_output=True,
text=True,
)
if result["returncode"] != 1:
pytest.fail(str(result))
# pylint: disable=unsupported-membership-test
assert all(s in result["err"] for s in ("PASSED", "FAILED")), result["out"]
assert result.returncode != 0
assert all(s in str(result) for s in ("PASSED", "FAILED"))
def test_multiple_env_build(clirunner, validate_cliresult, tmpdir):
def test_unity_setup_teardown(clirunner, validate_cliresult, tmpdir):
project_dir = tmpdir.mkdir("project")
project_dir.join("platformio.ini").write(
"""
[env:teensy31]
platform = teensy
framework = arduino
board = teensy31
[env:native]
platform = native
[env:espressif8266]
platform = espressif8266
framework = arduino
board = nodemcuv2
"""
)
project_dir.mkdir("test").join("test_main.cpp").write(
"""
#include <unity.h>
#ifdef ARDUINO
void setup()
#else
int main()
#endif
{
UNITY_BEGIN();
UNITY_END();
}
void loop() {}
"""
)
result = clirunner.invoke(
cmd_test,
["-d", str(project_dir), "--without-testing", "--without-uploading"],
)
validate_cliresult(result)
assert "Multiple ways to build" not in result.output
def test_setup_teardown_are_compilable(clirunner, validate_cliresult, tmpdir):
project_dir = tmpdir.mkdir("project")
project_dir.join("platformio.ini").write(
"""
[env:embedded]
platform = ststm32
framework = stm32cube
board = nucleo_f401re
test_transport = custom
[env:native]
platform = native
"""
)
test_dir = project_dir.mkdir("test")
test_dir.join("test_main.c").write(
"""
@ -124,9 +67,8 @@ int main() {
}
"""
)
native_result = clirunner.invoke(
cmd_test,
result = clirunner.invoke(
unittest_cmd,
["-d", str(project_dir), "-e", "native"],
)
@ -146,25 +88,61 @@ void unittest_uart_end(){}
#endif
"""
)
validate_cliresult(result)
assert all(f in result.output for f in ("setUp called", "tearDown called"))
embedded_result = clirunner.invoke(
cmd_test,
def test_legacy_unity_custom_transport(clirunner, validate_cliresult, tmpdir):
project_dir = tmpdir.mkdir("project")
project_dir.join("platformio.ini").write(
"""
[env:embedded]
platform = ststm32
framework = stm32cube
board = nucleo_f401re
test_transport = custom
"""
)
test_dir = project_dir.mkdir("test")
test_dir.join("test_main.c").write(
"""
#include <unity.h>
void dummy_test(void) {
TEST_ASSERT_EQUAL(1, 1);
}
int main() {
UNITY_BEGIN();
RUN_TEST(dummy_test);
UNITY_END();
}
"""
)
test_dir.join("unittest_transport.h").write(
"""
#ifdef __cplusplus
extern "C" {
#endif
void unittest_uart_begin(){}
void unittest_uart_putchar(char c){}
void unittest_uart_flush(){}
void unittest_uart_end(){}
#ifdef __cplusplus
}
#endif
"""
)
result = clirunner.invoke(
unittest_cmd,
[
"-d",
str(project_dir),
"--without-testing",
"--without-uploading",
"-e",
"embedded",
],
)
validate_cliresult(native_result)
validate_cliresult(embedded_result)
print("native_result.output", native_result.output)
print("embedded_result.output", embedded_result.output)
assert all(f in native_result.output for f in ("setUp called", "tearDown called"))
assert all(
"[FAILED]" not in out for out in (native_result.output, embedded_result.output)
)
validate_cliresult(result)