Introduce PlatformIO Plus

This commit is contained in:
Ivan Kravets
2016-08-31 16:07:35 +03:00
parent 86794d2cf1
commit 3be35f9987
7 changed files with 73 additions and 343 deletions

View File

@ -14,46 +14,7 @@
from __future__ import absolute_import
import atexit
import sys
from os import remove
from os.path import isdir, isfile, join, sep
from string import Template
FRAMEWORK_PARAMETERS = {
"arduino": {
"framework": "Arduino.h",
"serial_obj": "",
"serial_putc": "Serial.write(a)",
"serial_flush": "Serial.flush()",
"serial_begin": "Serial.begin(9600)",
"serial_end": "Serial.end()"
},
"mbed": {
"framework": "mbed.h",
"serial_obj": "Serial pc(USBTX, USBRX);",
"serial_putc": "pc.putc(a)",
"serial_flush": "",
"serial_begin": "pc.baud(9600)",
"serial_end": ""
},
"energia": {
"framework": "Energia.h",
"serial_obj": "",
"serial_putc": "Serial.write(a)",
"serial_flush": "Serial.flush()",
"serial_begin": "Serial.begin(9600)",
"serial_end": "Serial.end()"
},
"native": {
"framework": "stdio.h",
"serial_obj": "",
"serial_putc": "putchar(a)",
"serial_flush": "fflush(stdout)",
"serial_begin": "",
"serial_end": ""
}
}
from os.path import join, sep
def ProcessTest(env):
@ -69,79 +30,16 @@ def ProcessTest(env):
env.PioPlatform().get_package_dir("tool-unity"))
env.Prepend(LIBS=[unitylib])
test_dir = env.subst("$PROJECTTEST_DIR")
env.GenerateOutputReplacement(test_dir)
src_filter = None
if "PIOTEST" in env:
src_filter = "+<output_export.cpp>"
src_filter += " +<%s%s>" % (env['PIOTEST'], sep)
return env.CollectBuildFiles(
"$BUILDTEST_DIR", test_dir, src_filter=src_filter, duplicate=False)
def GenerateOutputReplacement(env, destination_dir):
if not isdir(env.subst(destination_dir)):
sys.stderr.write(
"Error: Test folder does not exist. Please put your test suite "
"to \"test\" folder in project's root directory.\n")
env.Exit(1)
TEMPLATECPP = """
# include <$framework>
# include <output_export.h>
$serial_obj
void output_char(int a)
{
$serial_putc;
}
void output_flush(void)
{
$serial_flush;
}
void output_start(unsigned int baudrate)
{
$serial_begin;
}
void output_complete(void)
{
$serial_end;
}
"""
def delete_tmptest_file(file_):
try:
remove(file_)
except: # pylint: disable=bare-except
if isfile(file_):
print("Warning: Could not remove temporary file '%s'. "
"Please remove it manually." % file_)
if env['PIOPLATFORM'] == "native":
framework = "native"
else:
framework = env.subst("$PIOFRAMEWORK").lower()
if framework not in FRAMEWORK_PARAMETERS:
sys.stderr.write(
"Error: %s framework doesn't support testing feature!\n" %
framework)
env.Exit(1)
else:
data = Template(TEMPLATECPP).substitute(FRAMEWORK_PARAMETERS[
framework])
tmp_file = join(destination_dir, "output_export.cpp")
with open(tmp_file, "w") as f:
f.write(data)
atexit.register(delete_tmptest_file, tmp_file)
"$BUILDTEST_DIR",
"$PROJECTTEST_DIR",
src_filter=src_filter,
duplicate=False)
def exists(_):
@ -150,5 +48,4 @@ def exists(_):
def generate(env):
env.AddMethod(ProcessTest)
env.AddMethod(GenerateOutputReplacement)
return env

View File

@ -12,20 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=R0913,R0914
from fnmatch import fnmatch
from os import getcwd, listdir
from os.path import isdir, join
from time import sleep, time
import sys
from os import getcwd
import click
import serial
from platformio import exception, util
from platformio.commands.run import cli as cmd_run
from platformio.commands.run import check_project_envs, print_header
from platformio.managers.platform import PlatformFactory
from platformio.pioplus import pioplus_call
@click.command("test", short_help="Unit Testing")
@ -43,220 +35,5 @@ from platformio.managers.platform import PlatformFactory
writable=True,
resolve_path=True))
@click.option("--verbose", "-v", is_flag=True)
@click.pass_context
def cli(ctx, environment, ignore, upload_port, project_dir, verbose):
with util.cd(project_dir):
test_dir = util.get_projecttest_dir()
if not isdir(test_dir):
raise exception.TestDirEmpty(test_dir)
test_names = get_test_names(test_dir)
projectconf = util.load_project_config()
assert check_project_envs(projectconf, environment)
click.echo("Verbose mode can be enabled via `-v, --verbose` option")
click.echo("Collected %d items" % len(test_names))
start_time = time()
results = []
for testname in test_names:
for section in projectconf.sections():
if not section.startswith("env:"):
continue
envname = section[4:]
if environment and envname not in environment:
continue
# check ignore patterns
_ignore = list(ignore)
if projectconf.has_option(section, "test_ignore"):
_ignore.extend([
p.strip()
for p in projectconf.get(section, "test_ignore").split(",")
if p.strip()
])
if testname != "*" and \
any([fnmatch(testname, p) for p in _ignore]):
results.append((None, testname, envname))
continue
cls = (LocalTestProcessor
if projectconf.get(section, "platform") == "native" else
EmbeddedTestProcessor)
tp = cls(ctx, testname, envname, {
"project_config": projectconf,
"project_dir": project_dir,
"upload_port": upload_port,
"verbose": verbose
})
results.append((tp.process(), testname, envname))
click.echo()
print_header("[%s]" % click.style("TEST SUMMARY"))
passed = True
for result in results:
status, testname, envname = result
status_str = click.style("PASSED", fg="green")
if status is False:
passed = False
status_str = click.style("FAILED", fg="red")
elif status is None:
status_str = click.style("IGNORED", fg="yellow")
click.echo(
"test:%s/env:%s\t[%s]" % (click.style(
testname, fg="yellow"), click.style(
envname, fg="cyan"), status_str),
err=status is False)
print_header(
"[%s] Took %.2f seconds" % ((click.style(
"PASSED", fg="green", bold=True) if passed else click.style(
"FAILED", fg="red", bold=True)), time() - start_time),
is_error=not passed)
if not passed:
raise exception.ReturnErrorCode()
class TestProcessorBase(object):
def __init__(self, cmd_ctx, testname, envname, options):
self.cmd_ctx = cmd_ctx
self.cmd_ctx.meta['piotest_processor'] = True
self.test_name = testname
self.env_name = envname
self.options = options
self._run_failed = False
def print_progress(self, text, is_error=False):
click.echo()
print_header(
"[test::%s] %s" % (click.style(
self.test_name, fg="yellow", bold=True), text),
is_error=is_error)
def build_or_upload(self, target):
if self.test_name != "*":
self.cmd_ctx.meta['piotest'] = self.test_name
return self.cmd_ctx.invoke(
cmd_run,
project_dir=self.options['project_dir'],
upload_port=self.options['upload_port'],
silent=not self.options['verbose'],
environment=[self.env_name],
target=target)
def run(self):
raise NotImplementedError
def on_run_out(self, line):
if line.endswith(":PASS"):
click.echo("%s\t[%s]" % (line[:-5], click.style(
"PASSED", fg="green")))
elif ":FAIL:" in line:
self._run_failed = True
click.echo("%s\t[%s]" % (line, click.style("FAILED", fg="red")))
else:
click.echo(line)
class LocalTestProcessor(TestProcessorBase):
def process(self):
self.print_progress("Building... (1/2)")
self.build_or_upload(["test"])
self.print_progress("Testing... (2/2)")
return self.run()
def run(self):
with util.cd(self.options['project_dir']):
pioenvs_dir = util.get_projectpioenvs_dir()
result = util.exec_command(
[join(pioenvs_dir, self.env_name, "program")],
stdout=util.AsyncPipe(self.on_run_out),
stderr=util.AsyncPipe(self.on_run_out))
assert "returncode" in result
return result['returncode'] == 0 and not self._run_failed
class EmbeddedTestProcessor(TestProcessorBase):
SERIAL_TIMEOUT = 600
SERIAL_BAUDRATE = 9600
def process(self):
self.print_progress("Building... (1/3)")
self.build_or_upload(["test"])
self.print_progress("Uploading... (2/3)")
self.build_or_upload(["test", "upload"])
self.print_progress("Testing... (3/3)")
sleep(1.0) # wait while board is starting...
return self.run()
def run(self):
click.echo("If you don't see any output for the first 10 secs, "
"please reset board (press reset button)")
click.echo()
ser = serial.Serial(
self.get_serial_port(),
self.SERIAL_BAUDRATE,
timeout=self.SERIAL_TIMEOUT)
while True:
line = ser.readline().strip()
# fix non-ascii output from device
for i, c in enumerate(line[::-1]):
if ord(c) > 127:
line = line[-i:]
break
if not line:
continue
self.on_run_out(line)
if all([l in line for l in ("Tests", "Failures", "Ignored")]):
break
ser.close()
return not self._run_failed
def get_serial_port(self):
config = self.options['project_config']
envdata = {}
for k, v in config.items("env:" + self.env_name):
envdata[k] = v
# if upload port is specified manually
if self.options.get("upload_port", envdata.get("upload_port")):
return self.options.get("upload_port", envdata.get("upload_port"))
p = PlatformFactory.newPlatform(envdata['platform'])
bconfig = p.board_config(envdata['board'])
port = None
board_hwids = []
if "build.hwids" in bconfig:
board_hwids = bconfig.get("build.hwids")
for item in util.get_serialports():
if "VID:PID" not in item['hwid']:
continue
port = item['port']
for hwid in board_hwids:
hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
if hwid_str in item['hwid']:
return port
if not port:
raise exception.PlatformioException(
"Please specify `upload_port` for environment or use "
"global `--upload-port` option.")
return port
def get_test_names(test_dir):
names = []
for item in sorted(listdir(test_dir)):
if isdir(join(test_dir, item)):
names.append(item)
if not names:
names = ["*"]
return names
def cli(*args, **kwargs): # pylint: disable=unused-argument
pioplus_call(sys.argv[1:])

View File

@ -17,6 +17,7 @@ import click
from platformio.commands.lib import lib_update as cmd_lib_update
from platformio.commands.platform import platform_update as cmd_platform_update
from platformio.managers.lib import LibraryManager
from platformio.pioplus import pioplus_update
@click.command(
@ -31,6 +32,7 @@ def cli(ctx, only_check):
click.echo("Platform Manager")
click.echo("================")
ctx.invoke(cmd_platform_update, only_check=only_check)
pioplus_update()
click.echo()
click.echo("Library Manager")

View File

@ -29,6 +29,7 @@ from platformio.commands.platform import platform_update as cmd_platform_update
from platformio.commands.upgrade import get_latest_version
from platformio.managers.lib import LibraryManager
from platformio.managers.platform import PlatformManager
from platformio.pioplus import pioplus_update
def in_silence(ctx):
@ -140,6 +141,9 @@ def after_upgrade(ctx):
# pm.update(manifest['name'], "^" + manifest['version'])
pm.update(manifest['name'])
# update PlatformIO Plus tool if installed
pioplus_update()
click.secho(
"PlatformIO has been successfully upgraded to %s!\n" %
__version__,

53
platformio/pioplus.py Normal file
View File

@ -0,0 +1,53 @@
# Copyright 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
from os.path import join
from platformio import util
from platformio.managers.package import PackageManager
PACKAGE_PIOPLUS_NAME = "tool-pioplus"
class PioPlusPackageManager(PackageManager):
def __init__(self):
PackageManager.__init__(
self, join(util.get_home_dir(), "packages"),
["https://dl.bintray.com/platformio/dl-packages/manifest.json",
"https://dl.platformio.org/packages/manifest.json"])
def get_pioplusexe_path():
pm = PioPlusPackageManager()
package_dir = pm.get_package_dir(PACKAGE_PIOPLUS_NAME)
if not package_dir:
pm.install(PACKAGE_PIOPLUS_NAME)
package_dir = pm.get_package_dir(PACKAGE_PIOPLUS_NAME)
assert package_dir
return join(package_dir, "pioplus")
def pioplus_update():
pm = PioPlusPackageManager()
if pm.get_package_dir(PACKAGE_PIOPLUS_NAME):
pm.update(PACKAGE_PIOPLUS_NAME)
def pioplus_call(args, **kwargs):
os.environ['PYTHONEXEPATH'] = util.get_pythonexe_path()
util.copy_pythonpath_to_osenv()
subprocess.call([get_pioplusexe_path()] + args, **kwargs)

View File

@ -21,6 +21,6 @@ def test_local_env(clirunner, validate_cliresult):
result = clirunner.invoke(
cli_test,
["-d", join("examples", "unit-testing", "calculator"), "-e", "local"])
result.exit_code == -1
validate_cliresult(result)
assert all(
[s in result.output for s in ("[PASSED]", "[IGNORED]", "[FAILED]")])

View File

@ -13,7 +13,7 @@
# limitations under the License.
[tox]
envlist = py26, py27, docs, lint
envlist = py27, docs, lint
[testenv:develop]
basepython = python2.7
@ -53,10 +53,7 @@ commands =
pylint --rcfile=./.pylintrc ./platformio
[testenv]
basepython =
py26: python2.6
py27: python2.7
usedevelop = True
basepython = python2.7
passenv = *
deps =
pytest
@ -65,8 +62,8 @@ commands =
py.test -v --basetemp="{envtmpdir}" tests
[testenv:coverage]
passenv = *
basepython = python2.7
passenv = *
deps =
pytest
pytest-cov