Merge branch 'bugfix/unit_test_keep_serial_port_open' into 'master'

unit test: Keep serial port open when running esptool

See merge request idf/esp-idf!3889
This commit is contained in:
Angus Gratton
2018-12-19 07:06:17 +08:00
6 changed files with 206 additions and 120 deletions

View File

@@ -2,6 +2,11 @@
"write_flash_args" : [ "--flash_mode", "${ESPFLASHMODE}", "write_flash_args" : [ "--flash_mode", "${ESPFLASHMODE}",
"--flash_size", "${ESPFLASHSIZE}", "--flash_size", "${ESPFLASHSIZE}",
"--flash_freq", "${ESPFLASHFREQ}" ], "--flash_freq", "${ESPFLASHFREQ}" ],
"flash_settings" : {
"flash_mode": "${ESPFLASHMODE}",
"flash_size": "${ESPFLASHSIZE}",
"flash_freq": "${ESPFLASHFREQ}"
},
"flash_files" : { "flash_files" : {
"${BOOTLOADER_OFFSET}" : "bootloader/bootloader.bin", "${BOOTLOADER_OFFSET}" : "bootloader/bootloader.bin",
"${PARTITION_TABLE_OFFSET}" : "partition_table/partition-table.bin", "${PARTITION_TABLE_OFFSET}" : "partition_table/partition-table.bin",

View File

@@ -286,8 +286,8 @@ class BaseDUT(object):
self.record_data_lock = threading.RLock() self.record_data_lock = threading.RLock()
self.receive_thread = None self.receive_thread = None
self.expect_failures = [] self.expect_failures = []
# open and start during init self._port_open()
self.open() self.start_receive()
def __str__(self): def __str__(self):
return "DUT({}: {})".format(self.name, str(self.port)) return "DUT({}: {})".format(self.name, str(self.port))
@@ -392,27 +392,32 @@ class BaseDUT(object):
pass pass
# methods that features raw port methods # methods that features raw port methods
def open(self): def start_receive(self):
""" """
open port and create thread to receive data. Start thread to receive data.
:return: None :return: None
""" """
self._port_open()
self.receive_thread = _RecvThread(self._port_read, self.data_cache, self.receive_thread = _RecvThread(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock) self.recorded_data, self.record_data_lock)
self.receive_thread.start() self.receive_thread.start()
def close(self): def stop_receive(self):
""" """
close receive thread and then close port. stop the receiving thread for the port
:return: None :return: None
""" """
if self.receive_thread: if self.receive_thread:
self.receive_thread.exit() self.receive_thread.exit()
self._port_close()
self.LOG_THREAD.flush_data() self.LOG_THREAD.flush_data()
self.receive_thread = None
def close(self):
"""
permanently close the port
"""
self.stop_receive()
self._port_close()
@staticmethod @staticmethod
def u_to_bytearray(data): def u_to_bytearray(data):

View File

@@ -16,6 +16,7 @@
import subprocess import subprocess
import os import os
import json
import App import App
@@ -26,7 +27,7 @@ class IDFApp(App.BaseApp):
""" """
IDF_DOWNLOAD_CONFIG_FILE = "download.config" IDF_DOWNLOAD_CONFIG_FILE = "download.config"
IDF_FLASH_ARGS_FILE = "flash_project_args" IDF_FLASH_ARGS_FILE = "flasher_args.json"
def __init__(self, app_path): def __init__(self, app_path):
super(IDFApp, self).__init__(app_path) super(IDFApp, self).__init__(app_path)
@@ -43,7 +44,8 @@ class IDFApp(App.BaseApp):
self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE) self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
raise AssertionError(msg) raise AssertionError(msg)
self.esptool, self.partition_tool = self.get_tools() self.flash_files, self.flash_settings = self._parse_flash_download_config()
self.partition_table = self._parse_partition_table()
@classmethod @classmethod
def get_sdk_path(cls): def get_sdk_path(cls):
@@ -52,16 +54,6 @@ class IDFApp(App.BaseApp):
assert os.path.exists(idf_path) assert os.path.exists(idf_path)
return idf_path return idf_path
@classmethod
def get_tools(cls):
idf_path = cls.get_sdk_path()
# get esptool and partition tool for esp-idf
esptool = os.path.join(idf_path, "components",
"esptool_py", "esptool", "esptool.py")
partition_tool = os.path.join(idf_path, "components",
"partition_table", "gen_esp32part.py")
assert os.path.exists(esptool) and os.path.exists(partition_tool)
return esptool, partition_tool
def get_binary_path(self, app_path): def get_binary_path(self, app_path):
""" """
@@ -74,47 +66,64 @@ class IDFApp(App.BaseApp):
""" """
pass pass
def process_arg(self, arg): def _parse_flash_download_config(self):
""" """
process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs. Parse flash download config from build metadata files
"""
if ".bin" in arg:
ret = os.path.join(self.binary_path, arg)
else:
ret = arg
return ret.strip("\r\n ")
def process_app_info(self): Sets self.flash_files, self.flash_settings
"""
get app download config and partition info from a specific app path
:return: download config, partition info (Called from constructor)
Returns (flash_files, flash_settings)
""" """
if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path): if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path):
# CMake version using build metadata file
with open(os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE), "r") as f: with open(os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE), "r") as f:
configs = [] args = json.load(f)
for line in f: flash_files = [ (offs,file) for (offs,file) in args["flash_files"].items() if offs != "" ]
line = line.strip() flash_settings = args["flash_settings"]
if len(line) > 0:
configs += line.split()
else: else:
# GNU Make version uses download.config arguments file
with open(os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE), "r") as f: with open(os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE), "r") as f:
configs = f.read().split(" ") args = f.readlines()[-1].split(" ")
flash_files = []
flash_settings = {}
for idx in range(0, len(args), 2): # process arguments in pairs
if args[idx].startswith("--"):
# strip the -- from the command line argument
flash_settings[args[idx][2:]] = args[idx+1]
else:
# offs, filename
flash_files.append( (args[idx], args[idx+1]) )
download_configs = ["--chip", "auto", "--before", "default_reset", # make file offsets into integers, make paths absolute
"--after", "hard_reset", "write_flash", "-z"] flash_files = [ (int(offs, 0), os.path.join(self.binary_path, path.strip())) for (offs, path) in flash_files ]
download_configs += [self.process_arg(x) for x in configs]
# handle partition table return (flash_files, flash_settings)
for partition_file in download_configs:
if "partition" in partition_file: def _parse_partition_table(self):
partition_file = os.path.join(self.binary_path, partition_file) """
Parse partition table contents based on app binaries
Returns partition_table data
(Called from constructor)
"""
partition_tool = os.path.join(self.idf_path,
"components",
"partition_table",
"gen_esp32part.py")
assert os.path.exists(partition_tool)
for (_, path) in self.flash_files:
if "partition" in path:
partition_file = os.path.join(self.binary_path, path)
break break
else: else:
raise ValueError("No partition table found for IDF binary path: {}".format(self.binary_path)) raise ValueError("No partition table found for IDF binary path: {}".format(self.binary_path))
process = subprocess.Popen(["python", self.partition_tool, partition_file], process = subprocess.Popen(["python", partition_tool, partition_file],
stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data = process.stdout.read() raw_data = process.stdout.read()
if isinstance(raw_data, bytes): if isinstance(raw_data, bytes):
@@ -140,7 +149,8 @@ class IDFApp(App.BaseApp):
"size": _size, "size": _size,
"flags": _flags "flags": _flags
} }
return download_configs, partition_table
return partition_table
class Example(IDFApp): class Example(IDFApp):

View File

@@ -14,37 +14,64 @@
""" DUT for IDF applications """ """ DUT for IDF applications """
import os import os
import os.path
import sys import sys
import re import re
import subprocess import subprocess
import functools import functools
import random import random
import tempfile import tempfile
import time
from serial.tools import list_ports from serial.tools import list_ports
from collections import namedtuple
import DUT import DUT
try:
import esptool
except ImportError: # cheat and use IDF's copy of esptool if available
idf_path = os.getenv("IDF_PATH")
if not idf_path or not os.path.exists(idf_path):
raise
sys.path.insert(0, os.path.join(idf_path, "components", "esptool_py", "esptool"))
import esptool
class IDFToolError(OSError): class IDFToolError(OSError):
pass pass
def _tool_method(func): def _uses_esptool(func):
""" close port, execute tool method and then reopen port """ """ Suspend listener thread, connect with esptool,
call target function with esptool instance,
then resume listening for output
"""
@functools.wraps(func) @functools.wraps(func)
def handler(self, *args, **kwargs): def handler(self, *args, **kwargs):
self.close() self.stop_receive()
ret = func(self, *args, **kwargs)
self.open() settings = self.port_inst.get_settings()
rom = esptool.ESP32ROM(self.port_inst)
rom.connect('hard_reset')
esp = rom.run_stub()
ret = func(self, esp, *args, **kwargs)
self.port_inst.apply_settings(settings)
self.start_receive()
return ret return ret
return handler return handler
class IDFDUT(DUT.SerialDUT): class IDFDUT(DUT.SerialDUT):
""" IDF DUT, extends serial with ESPTool methods """ """ IDF DUT, extends serial with esptool methods
(Becomes aware of IDFApp instance which holds app-specific data)
"""
CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
# /dev/ttyAMA0 port is listed in Raspberry Pi # /dev/ttyAMA0 port is listed in Raspberry Pi
# /dev/tty.Bluetooth-Incoming-Port port is listed in Mac # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth") INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
@@ -52,32 +79,78 @@ class IDFDUT(DUT.SerialDUT):
ERASE_NVS = True ERASE_NVS = True
def __init__(self, name, port, log_file, app, **kwargs): def __init__(self, name, port, log_file, app, **kwargs):
self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs) super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
@classmethod @classmethod
def get_chip(cls, app, port): def get_mac(cls, app, port):
""" """
get chip id via esptool get MAC address via esptool
:param app: application instance (to get tool) :param app: application instance (to get tool)
:param port: comport :param port: serial port as string
:return: chip ID or None :return: MAC address or None
""" """
try: try:
output = subprocess.check_output(["python", app.esptool, "--port", port, "chip_id"]) esp = esptool.ESP32ROM(port)
except subprocess.CalledProcessError: esp.connect()
output = bytes() return esp.read_mac()
if isinstance(output, bytes): except RuntimeError as e:
output = output.decode() return None
chip_type = cls.CHIP_TYPE_PATTERN.search(output) finally:
return chip_type.group(1) if chip_type else None esp._port.close()
@classmethod @classmethod
def confirm_dut(cls, port, app, **kwargs): def confirm_dut(cls, port, app, **kwargs):
return cls.get_chip(app, port) is not None return cls.get_mac(app, port) is not None
@_uses_esptool
def _try_flash(self, esp, erase_nvs, baud_rate):
"""
Called by start_app() to try flashing at a particular baud rate.
Structured this way so @_uses_esptool will reconnect each time
"""
try:
# note: opening here prevents us from having to seek back to 0 each time
flash_files = [ (offs, open(path, "rb")) for (offs, path) in self.app.flash_files ]
if erase_nvs:
address = self.app.partition_table["nvs"]["offset"]
size = self.app.partition_table["nvs"]["size"]
nvs_file = tempfile.TemporaryFile()
nvs_file.write(b'\xff' * size)
nvs_file.seek(0)
flash_files.append( (int(address, 0), nvs_file) )
# fake flasher args object, this is a hack until
# esptool Python API is improved
Flash_Args = namedtuple('write_flash_args',
['flash_size',
'flash_mode',
'flash_freq',
'addr_filename',
'no_stub',
'compress',
'verify',
'encrypt'])
flash_args = Flash_Args(
self.app.flash_settings["flash_size"],
self.app.flash_settings["flash_mode"],
self.app.flash_settings["flash_freq"],
flash_files,
False,
True,
False,
False
)
esp.change_baud(baud_rate)
esptool.write_flash(esp, flash_args)
finally:
for (_,f) in flash_files:
f.close()
@_tool_method
def start_app(self, erase_nvs=ERASE_NVS): def start_app(self, erase_nvs=ERASE_NVS):
""" """
download and start app. download and start app.
@@ -85,55 +158,39 @@ class IDFDUT(DUT.SerialDUT):
:param: erase_nvs: whether erase NVS partition during flash :param: erase_nvs: whether erase NVS partition during flash
:return: None :return: None
""" """
if erase_nvs: for baud_rate in [ 921600, 115200 ]:
address = self.partition_table["nvs"]["offset"]
size = self.partition_table["nvs"]["size"]
nvs_file = tempfile.NamedTemporaryFile()
nvs_file.write(b'\xff' * size)
nvs_file.flush()
download_config = self.download_config + [address, nvs_file.name]
else:
download_config = self.download_config
retry_baud_rates = ["921600", "115200"]
error = IDFToolError()
try: try:
for baud_rate in retry_baud_rates: self._try_flash(erase_nvs, baud_rate)
try:
subprocess.check_output(["python", self.app.esptool,
"--port", self.port, "--baud", baud_rate]
+ download_config)
break break
except subprocess.CalledProcessError as error: except RuntimeError:
continue continue
else: else:
raise error raise IDFToolError()
finally:
if erase_nvs:
nvs_file.close()
@_tool_method @_uses_esptool
def reset(self): def reset(self, esp):
""" """
reset DUT with esptool hard reset DUT
:return: None :return: None
""" """
subprocess.check_output(["python", self.app.esptool, "--port", self.port, "run"]) esp.hard_reset()
@_tool_method @_uses_esptool
def erase_partition(self, partition): def erase_partition(self, esp, partition):
""" """
:param partition: partition name to erase :param partition: partition name to erase
:return: None :return: None
""" """
address = self.partition_table[partition]["offset"] raise NotImplementedError() # TODO: implement this
size = self.partition_table[partition]["size"] address = self.app.partition_table[partition]["offset"]
size = self.app.partition_table[partition]["size"]
# TODO can use esp.erase_region() instead of this, I think
with open(".erase_partition.tmp", "wb") as f: with open(".erase_partition.tmp", "wb") as f:
f.write(chr(0xFF) * size) f.write(chr(0xFF) * size)
@_tool_method @_uses_esptool
def dump_flush(self, output_file, **kwargs): def dump_flush(self, esp, output_file, **kwargs):
""" """
dump flush dump flush
@@ -147,7 +204,7 @@ class IDFDUT(DUT.SerialDUT):
if os.path.isabs(output_file) is False: if os.path.isabs(output_file) is False:
output_file = os.path.relpath(output_file, self.app.get_log_folder()) output_file = os.path.relpath(output_file, self.app.get_log_folder())
if "partition" in kwargs: if "partition" in kwargs:
partition = self.partition_table[kwargs["partition"]] partition = self.app.partition_table[kwargs["partition"]]
_address = partition["offset"] _address = partition["offset"]
_size = partition["size"] _size = partition["size"]
elif "address" in kwargs and "size" in kwargs: elif "address" in kwargs and "size" in kwargs:
@@ -155,11 +212,10 @@ class IDFDUT(DUT.SerialDUT):
_size = kwargs["size"] _size = kwargs["size"]
else: else:
raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash") raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
subprocess.check_output(
["python", self.app.esptool, "--port", self.port, "--baud", "921600", content = esp.read_flash(_address, _size)
"--before", "default_reset", "--after", "hard_reset", "read_flash", with open(output_file, "wb") as f:
_address, _size, output_file] f.write(content)
)
@classmethod @classmethod
def list_available_ports(cls): def list_available_ports(cls):

View File

@@ -122,7 +122,8 @@ Class Diagram
{method} expect_all {method} expect_all
{method} read {method} read
{method} write {method} write
{method} open {method} start_receive
{method} stop_receive
{method} close {method} close
} }
class SerialDUT { class SerialDUT {
@@ -137,12 +138,12 @@ Class Diagram
} }
class BaseApp { class BaseApp {
{method} get_sdk_path {method} get_sdk_path
{method} get_tools
{method} process_app_info
{method} get_log_folder {method} get_log_folder
} }
class IDFApp { class IDFApp {
{method} process_app_info {field} flash_files
{field} flash_settings
{field} partition_table
} }
class Example { class Example {
{method} get_binary_path {method} get_binary_path

View File

@@ -25,13 +25,22 @@ import time
import argparse import argparse
import threading import threading
try:
import TinyFW
except ImportError:
# if we want to run test case outside `tiny-test-fw` folder, # if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path # we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH") test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path: if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path) sys.path.insert(0, test_fw_path)
else:
# or try the copy in IDF
idf_path = os.getenv("IDF_PATH")
tiny_test_path = idf_path + "/tools/tiny-test-fw"
if os.path.exists(tiny_test_path):
sys.path.insert(0, tiny_test_path)
import TinyFW import TinyFW
import IDF import IDF
import Utility import Utility
import Env import Env