forked from platformio/platformio-core
		
	
		
			
				
	
	
		
			139 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
from time import sleep
 | 
						|
 | 
						|
import click
 | 
						|
import serial
 | 
						|
 | 
						|
from platformio import exception, util
 | 
						|
from platformio.commands.test.processor import TestProcessorBase
 | 
						|
from platformio.managers.platform import PlatformFactory
 | 
						|
 | 
						|
 | 
						|
class EmbeddedTestProcessor(TestProcessorBase):
 | 
						|
 | 
						|
    SERIAL_TIMEOUT = 600
 | 
						|
 | 
						|
    def process(self):
 | 
						|
        if not self.options["without_building"]:
 | 
						|
            self.print_progress("Building...")
 | 
						|
            target = ["__test"]
 | 
						|
            if self.options["without_uploading"]:
 | 
						|
                target.append("checkprogsize")
 | 
						|
            if not self.build_or_upload(target):
 | 
						|
                return False
 | 
						|
 | 
						|
        if not self.options["without_uploading"]:
 | 
						|
            self.print_progress("Uploading...")
 | 
						|
            target = ["upload"]
 | 
						|
            if self.options["without_building"]:
 | 
						|
                target.append("nobuild")
 | 
						|
            else:
 | 
						|
                target.append("__test")
 | 
						|
            if not self.build_or_upload(target):
 | 
						|
                return False
 | 
						|
 | 
						|
        if self.options["without_testing"]:
 | 
						|
            return True
 | 
						|
 | 
						|
        self.print_progress("Testing...")
 | 
						|
        return self.run()
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        click.echo(
 | 
						|
            "If you don't see any output for the first 10 secs, "
 | 
						|
            "please reset board (press reset button)"
 | 
						|
        )
 | 
						|
        click.echo()
 | 
						|
 | 
						|
        try:
 | 
						|
            ser = serial.Serial(
 | 
						|
                baudrate=self.get_baudrate(), timeout=self.SERIAL_TIMEOUT
 | 
						|
            )
 | 
						|
            ser.port = self.get_test_port()
 | 
						|
            ser.rts = self.options["monitor_rts"]
 | 
						|
            ser.dtr = self.options["monitor_dtr"]
 | 
						|
            ser.open()
 | 
						|
        except serial.SerialException as e:
 | 
						|
            click.secho(str(e), fg="red", err=True)
 | 
						|
            return False
 | 
						|
 | 
						|
        if not self.options["no_reset"]:
 | 
						|
            ser.flushInput()
 | 
						|
            ser.setDTR(False)
 | 
						|
            ser.setRTS(False)
 | 
						|
            sleep(0.1)
 | 
						|
            ser.setDTR(True)
 | 
						|
            ser.setRTS(True)
 | 
						|
            sleep(0.1)
 | 
						|
 | 
						|
        while True:
 | 
						|
            line = ser.readline().strip()
 | 
						|
 | 
						|
            # fix non-ascii output from device
 | 
						|
            for i, c in enumerate(line[::-1]):
 | 
						|
                if not isinstance(c, int):
 | 
						|
                    c = ord(c)
 | 
						|
                if c > 127:
 | 
						|
                    line = line[-i:]
 | 
						|
                    break
 | 
						|
 | 
						|
            if not line:
 | 
						|
                continue
 | 
						|
            if isinstance(line, bytes):
 | 
						|
                line = line.decode("utf8", "ignore")
 | 
						|
            self.on_run_out(line)
 | 
						|
            if all([l in line for l in ("Tests", "Failures", "Ignored")]):
 | 
						|
                break
 | 
						|
        ser.close()
 | 
						|
        return not self._run_failed
 | 
						|
 | 
						|
    def get_test_port(self):
 | 
						|
        # if test port is specified manually or in config
 | 
						|
        if self.options.get("test_port"):
 | 
						|
            return self.options.get("test_port")
 | 
						|
        if self.env_options.get("test_port"):
 | 
						|
            return self.env_options.get("test_port")
 | 
						|
 | 
						|
        assert set(["platform", "board"]) & set(self.env_options.keys())
 | 
						|
        p = PlatformFactory.newPlatform(self.env_options["platform"])
 | 
						|
        board_hwids = p.board_config(self.env_options["board"]).get("build.hwids", [])
 | 
						|
        port = None
 | 
						|
        elapsed = 0
 | 
						|
        while elapsed < 5 and not port:
 | 
						|
            for item in util.get_serialports():
 | 
						|
                port = item["port"]
 | 
						|
                for hwid in board_hwids:
 | 
						|
                    hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "")
 | 
						|
                    if hwid_str in item["hwid"]:
 | 
						|
                        return port
 | 
						|
 | 
						|
            # check if port is already configured
 | 
						|
            try:
 | 
						|
                serial.Serial(port, timeout=self.SERIAL_TIMEOUT).close()
 | 
						|
            except serial.SerialException:
 | 
						|
                port = None
 | 
						|
 | 
						|
            if not port:
 | 
						|
                sleep(0.25)
 | 
						|
                elapsed += 0.25
 | 
						|
 | 
						|
        if not port:
 | 
						|
            raise exception.PlatformioException(
 | 
						|
                "Please specify `test_port` for environment or use "
 | 
						|
                "global `--test-port` option."
 | 
						|
            )
 | 
						|
        return port
 |