diff --git a/tools/ci/python_packages/tiny_test_fw/Utility/CIAssignTest.py b/tools/ci/python_packages/tiny_test_fw/Utility/CIAssignTest.py index 367de54dc5..6277f07819 100644 --- a/tools/ci/python_packages/tiny_test_fw/Utility/CIAssignTest.py +++ b/tools/ci/python_packages/tiny_test_fw/Utility/CIAssignTest.py @@ -44,6 +44,7 @@ import re import json import yaml + try: from yaml import CLoader as Loader except ImportError: @@ -67,7 +68,7 @@ class Group(object): self.case_list = [case] self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS])) # we use ci_job_match_keys to match CI job tags. It's a set of required tags. - self.ci_job_match_keys = set([self._get_case_attr(case, x) for x in self.CI_JOB_MATCH_KEYS]) + self.ci_job_match_keys = self._get_match_keys(case) @staticmethod def _get_case_attr(case, attr): @@ -75,6 +76,16 @@ class Group(object): # this method will do get attribute form cases return case.case_info[attr] + def _get_match_keys(self, case): + keys = [] + for attr in self.CI_JOB_MATCH_KEYS: + val = self._get_case_attr(case, attr) + if isinstance(val, list): + keys.extend(val) + else: + keys.append(val) + return set(keys) + def accept_new_case(self): """ check if allowed to add any case to this group @@ -143,6 +154,7 @@ class AssignTest(object): DEFAULT_FILTER = { "category": "function", "ignore": False, + "supported_in_ci": True, } def __init__(self, test_case_path, ci_config_file, case_group=Group): @@ -234,12 +246,11 @@ class AssignTest(object): :return: filter for search test cases """ - bot_filter = os.getenv("BOT_CASE_FILTER") - if bot_filter: - bot_filter = json.loads(bot_filter) - else: - bot_filter = dict() - return bot_filter + res = dict() + for bot_filter in [os.getenv('BOT_CASE_FILTER'), os.getenv('BOT_TARGET_FILTER')]: + if bot_filter: + res.update(json.loads(bot_filter)) + return res def _apply_bot_test_count(self): """ diff --git a/tools/ci/python_packages/tiny_test_fw/Utility/CaseConfig.py b/tools/ci/python_packages/tiny_test_fw/Utility/CaseConfig.py index 8f373bce9f..1334ab3ea1 100644 --- a/tools/ci/python_packages/tiny_test_fw/Utility/CaseConfig.py +++ b/tools/ci/python_packages/tiny_test_fw/Utility/CaseConfig.py @@ -45,6 +45,7 @@ Template Config File:: import importlib import yaml + try: from yaml import CLoader as Loader except ImportError: @@ -127,7 +128,6 @@ def filter_test_cases(test_methods, case_filter): * user case filter is ``chip: ["esp32", "esp32c"]``, case attribute is ``chip: "esp32"`` * user case filter is ``chip: "esp32"``, case attribute is ``chip: "esp32"`` - :param test_methods: a list of test methods functions :param case_filter: case filter :return: filtered test methods @@ -190,8 +190,29 @@ class Parser(object): _overwrite = cls.handle_overwrite_args(_config.pop("overwrite", dict())) _extra_data = _config.pop("extra_data", None) _filter.update(_config) + + # Try get target from yml + try: + _target = _filter['target'] + except KeyError: + _target = None + else: + _overwrite.update({'target': _target}) + for test_method in test_methods: if _filter_one_case(test_method, _filter): + try: + dut_dict = test_method.case_info['dut_dict'] + except (AttributeError, KeyError): + dut_dict = None + + if dut_dict and _target: + dut = test_method.case_info.get('dut') + if _target.upper() in dut_dict: + if dut and dut in dut_dict.values(): # don't overwrite special cases + _overwrite.update({'dut': dut_dict[_target.upper()]}) + else: + raise ValueError('target {} is not in the specified dut_dict'.format(_target)) test_case_list.append(TestCase.TestCase(test_method, _extra_data, **_overwrite)) return test_case_list diff --git a/tools/ci/python_packages/tiny_test_fw/Utility/SearchCases.py b/tools/ci/python_packages/tiny_test_fw/Utility/SearchCases.py index 984083a143..699bb57ece 100644 --- a/tools/ci/python_packages/tiny_test_fw/Utility/SearchCases.py +++ b/tools/ci/python_packages/tiny_test_fw/Utility/SearchCases.py @@ -23,6 +23,7 @@ from . import load_source class Search(object): TEST_CASE_FILE_PATTERN = "*_test.py" + SUPPORT_REPLICATE_CASES_KEY = ['target'] @classmethod def _search_cases_from_file(cls, file_name): @@ -42,9 +43,14 @@ class Search(object): continue except ImportError as e: print("ImportError: \r\n\tFile:" + file_name + "\r\n\tError:" + str(e)) - for i, test_function in enumerate(test_functions): + + test_functions_out = [] + for case in test_functions: + test_functions_out += cls.replicate_case(case) + + for i, test_function in enumerate(test_functions_out): print("\t{}. ".format(i + 1) + test_function.case_info["name"]) - return test_functions + return test_functions_out @classmethod def _search_test_case_files(cls, test_case, file_pattern): @@ -74,21 +80,41 @@ class Search(object): """ replicate_config = [] for key in case.case_info: + if key == 'ci_target': # ci_target is used to filter target, should not be duplicated. + continue if isinstance(case.case_info[key], (list, tuple)): replicate_config.append(key) - def _replicate_for_key(case_list, replicate_key, replicate_list): + def _replicate_for_key(cases, replicate_key, replicate_list): + def deepcopy_func(f, name=None): + fn = types.FunctionType(f.__code__, f.__globals__, name if name else f.__name__, + f.__defaults__, f.__closure__) + fn.__dict__.update(copy.deepcopy(f.__dict__)) + return fn + case_out = [] - for _case in case_list: + for inner_case in cases: for value in replicate_list: - new_case = copy.deepcopy(_case) + new_case = deepcopy_func(inner_case) new_case.case_info[replicate_key] = value case_out.append(new_case) return case_out replicated_cases = [case] - for key in replicate_config: - replicated_cases = _replicate_for_key(replicated_cases, key, case.case_info[key]) + while replicate_config: + if not replicate_config: + break + key = replicate_config.pop() + if key in cls.SUPPORT_REPLICATE_CASES_KEY: + replicated_cases = _replicate_for_key(replicated_cases, key, case.case_info[key]) + + # mark the cases with targets not in ci_target + for case in replicated_cases: + ci_target = case.case_info['ci_target'] + if not ci_target or case.case_info['target'] in ci_target: + case.case_info['supported_in_ci'] = True + else: + case.case_info['supported_in_ci'] = False return replicated_cases @@ -104,8 +130,4 @@ class Search(object): test_cases = [] for test_case_file in test_case_files: test_cases += cls._search_cases_from_file(test_case_file) - # handle replicate cases - test_case_out = [] - for case in test_cases: - test_case_out += cls.replicate_case(case) - return test_case_out + return test_cases diff --git a/tools/ci/python_packages/ttfw_idf/CIAssignExampleTest.py b/tools/ci/python_packages/ttfw_idf/CIAssignExampleTest.py index d887ae1d0e..3acdd78d06 100644 --- a/tools/ci/python_packages/ttfw_idf/CIAssignExampleTest.py +++ b/tools/ci/python_packages/ttfw_idf/CIAssignExampleTest.py @@ -29,7 +29,7 @@ IDF_PATH_FROM_ENV = os.getenv("IDF_PATH") class ExampleGroup(CIAssignTest.Group): - SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "chip"] + SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "target"] BUILD_LOCAL_DIR = "build_examples" BUILD_JOB_NAMES = ["build_examples_cmake_esp32", "build_examples_cmake_esp32s2"] @@ -61,7 +61,7 @@ def create_artifact_index_file(project_id=None, pipeline_id=None, case_group=Exa artifact_index_list = [] def format_build_log_path(): - parallel = job_info["parallel_num"] # Could be None if "parallel_num" not defined for the job + parallel = job_info["parallel_num"] # Could be None if "parallel_num" not defined for the job return "{}/list_job_{}.json".format(case_group.BUILD_LOCAL_DIR, parallel or 1) for build_job_name in case_group.BUILD_JOB_NAMES: @@ -99,7 +99,7 @@ if __name__ == '__main__': help="file name pattern used to find Python test case files") parser.add_argument('--custom-group', help='select custom-group for the test cases, if other than ExampleTest', - choices=['example','test-apps'], default='example') + choices=['example', 'test-apps'], default='example') args = parser.parse_args() diff --git a/tools/ci/python_packages/ttfw_idf/__init__.py b/tools/ci/python_packages/ttfw_idf/__init__.py index efbde25e75..23114ef0c8 100644 --- a/tools/ci/python_packages/ttfw_idf/__init__.py +++ b/tools/ci/python_packages/ttfw_idf/__init__.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import functools +import json +import logging import os import re @@ -19,19 +22,100 @@ from .IDFApp import IDFApp, Example, LoadableElfTestApp, UT, TestApp # noqa: ex from .IDFDUT import IDFDUT, ESP32DUT, ESP32S2DUT, ESP8266DUT, ESP32QEMUDUT # noqa: export DUTs for users from .DebugUtils import OCDProcess, GDBProcess, TelnetProcess, CustomProcess # noqa: export DebugUtils for users - -def format_case_id(chip, case_name): - return "{}.{}".format(chip, case_name) +# pass TARGET_DUT_CLS_DICT to Env.py to avoid circular dependency issue. +TARGET_DUT_CLS_DICT = { + 'ESP32': ESP32DUT, + 'ESP32S2': ESP32S2DUT, +} -def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32", module="examples", execution_time=1, +def format_case_id(target, case_name): + return "{}.{}".format(target, case_name) + + +try: + string_type = basestring +except NameError: + string_type = str + + +def upper_list_or_str(text): + """ + Return the uppercase of list of string or string. Return itself for other + data types + :param text: list or string, other instance will be returned immediately + :return: uppercase of list of string + """ + if isinstance(text, string_type): + return [text.upper()] + elif isinstance(text, list): + return [item.upper() for item in text] + else: + return text + + +def local_test_check(decorator_target): + # Try to get the sdkconfig.json to read the IDF_TARGET value. + # If not set, will set to ESP32. + # For CI jobs, this is a fake procedure, the true target and dut will be + # overwritten by the job config YAML file. + idf_target = 'ESP32' # default if sdkconfig not found or not readable + if os.getenv('CI_JOB_ID'): # Only auto-detect target when running locally + return idf_target + + expected_json_path = os.path.join('build', 'config', 'sdkconfig.json') + if os.path.exists(expected_json_path): + sdkconfig = json.load(open(expected_json_path)) + try: + idf_target = sdkconfig['IDF_TARGET'].upper() + except KeyError: + logging.warning('IDF_TARGET not in {}. IDF_TARGET set to esp32'.format(os.path.abspath(expected_json_path))) + else: + logging.info('IDF_TARGET: {}'.format(idf_target)) + else: + logging.warning('{} not found. IDF_TARGET set to esp32'.format(os.path.abspath(expected_json_path))) + + if isinstance(decorator_target, list): + if idf_target not in decorator_target: + raise ValueError('IDF_TARGET set to {}, not in decorator target value'.format(idf_target)) + else: + if idf_target != decorator_target: + raise ValueError('IDF_TARGET set to {}, not equal to decorator target value'.format(idf_target)) + return idf_target + + +def get_dut_class(target, erase_nvs=None): + if target not in TARGET_DUT_CLS_DICT: + raise Exception('target can only be {%s} (case insensitive)' % ', '.join(TARGET_DUT_CLS_DICT.keys())) + + dut = TARGET_DUT_CLS_DICT[target.upper()] + if erase_nvs: + dut.ERASE_NVS = 'erase_nvs' + return dut + + +def ci_target_check(func): + @functools.wraps(func) + def wrapper(**kwargs): + target = upper_list_or_str(kwargs.get('target', [])) + ci_target = upper_list_or_str(kwargs.get('ci_target', [])) + if not set(ci_target).issubset(set(target)): + raise ValueError('ci_target must be a subset of target') + + return func(**kwargs) + + return wrapper + + +@ci_target_check +def idf_example_test(app=Example, target="ESP32", ci_target=None, module="examples", execution_time=1, level="example", erase_nvs=True, config_name=None, **kwargs): """ decorator for testing idf examples (with default values for some keyword args). :param app: test application class - :param dut: dut class - :param chip: chip supported, string or tuple + :param target: target supported, string or list + :param ci_target: target auto run in CI, if None than all target will be tested, None, string or list :param module: module, string :param execution_time: execution time in minutes, int :param level: test level, could be used to filter test cases, string @@ -40,31 +124,31 @@ def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32", module="examples", e :param kwargs: other keyword args :return: test method """ - try: - # try to config the default behavior of erase nvs - dut.ERASE_NVS = erase_nvs - except AttributeError: - pass - - original_method = TinyFW.test_method(app=app, dut=dut, chip=chip, module=module, - execution_time=execution_time, level=level, **kwargs) def test(func): + test_target = local_test_check(target) + dut = get_dut_class(test_target, erase_nvs) + original_method = TinyFW.test_method( + app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target), + module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs, + dut_dict=TARGET_DUT_CLS_DICT, **kwargs + ) test_func = original_method(func) - test_func.case_info["ID"] = format_case_id(chip, test_func.case_info["name"]) + test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"]) return test_func return test -def idf_unit_test(app=UT, dut=IDFDUT, chip="ESP32", module="unit-test", execution_time=1, +@ci_target_check +def idf_unit_test(app=UT, target="ESP32", ci_target=None, module="unit-test", execution_time=1, level="unit", erase_nvs=True, **kwargs): """ decorator for testing idf unit tests (with default values for some keyword args). :param app: test application class - :param dut: dut class - :param chip: chip supported, string or tuple + :param target: target supported, string or list + :param ci_target: target auto run in CI, if None than all target will be tested, None, string or list :param module: module, string :param execution_time: execution time in minutes, int :param level: test level, could be used to filter test cases, string @@ -72,32 +156,31 @@ def idf_unit_test(app=UT, dut=IDFDUT, chip="ESP32", module="unit-test", executio :param kwargs: other keyword args :return: test method """ - try: - # try to config the default behavior of erase nvs - dut.ERASE_NVS = erase_nvs - except AttributeError: - pass - - original_method = TinyFW.test_method(app=app, dut=dut, chip=chip, module=module, - execution_time=execution_time, level=level, **kwargs) def test(func): + test_target = local_test_check(target) + dut = get_dut_class(test_target, erase_nvs) + original_method = TinyFW.test_method( + app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target), + module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs, + dut_dict=TARGET_DUT_CLS_DICT, **kwargs + ) test_func = original_method(func) - test_func.case_info["ID"] = format_case_id(chip, test_func.case_info["name"]) + test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"]) return test_func return test -def idf_custom_test(app=TestApp, dut=IDFDUT, chip="ESP32", module="misc", execution_time=1, +@ci_target_check +def idf_custom_test(app=TestApp, target="ESP32", ci_target=None, module="misc", execution_time=1, level="integration", erase_nvs=True, config_name=None, group="test-apps", **kwargs): - """ decorator for idf custom tests (with default values for some keyword args). :param app: test application class - :param dut: dut class - :param chip: chip supported, string or tuple + :param target: target supported, string or list + :param ci_target: target auto run in CI, if None than all target will be tested, None, string or list :param module: module, string :param execution_time: execution time in minutes, int :param level: test level, could be used to filter test cases, string @@ -107,18 +190,20 @@ def idf_custom_test(app=TestApp, dut=IDFDUT, chip="ESP32", module="misc", execut :param kwargs: other keyword args :return: test method """ - try: - # try to config the default behavior of erase nvs - dut.ERASE_NVS = erase_nvs - except AttributeError: - pass - - original_method = TinyFW.test_method(app=app, dut=dut, chip=chip, module=module, - execution_time=execution_time, level=level, **kwargs) def test(func): + test_target = local_test_check(target) + dut = get_dut_class(test_target, erase_nvs) + if 'dut' in kwargs: # panic_test() will inject dut, resolve conflicts here + dut = kwargs['dut'] + del kwargs['dut'] + original_method = TinyFW.test_method( + app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target), + module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs, + dut_dict=TARGET_DUT_CLS_DICT, **kwargs + ) test_func = original_method(func) - test_func.case_info["ID"] = format_case_id(chip, test_func.case_info["name"]) + test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"]) return test_func return test diff --git a/tools/unit-test-app/unit_test.py b/tools/unit-test-app/unit_test.py index 564f8e22c3..8afcfb6e62 100755 --- a/tools/unit-test-app/unit_test.py +++ b/tools/unit-test-app/unit_test.py @@ -26,7 +26,6 @@ import threading from tiny_test_fw import TinyFW, Utility, Env, DUT import ttfw_idf - UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests." # matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)" @@ -188,12 +187,11 @@ def reset_dut(dut): def log_test_case(description, test_case, ut_config): - Utility.console_log("Running {} '{}' (config {})".format(description, test_case["name"], ut_config), color="orange") - Utility.console_log("Tags: %s" % ", ".join("%s=%s" % (k,v) for (k,v) in test_case.items() if k != "name" and v is not None), color="orange") + Utility.console_log("Running {} '{}' (config {})".format(description, test_case["name"], ut_config), color="orange") + Utility.console_log("Tags: %s" % ", ".join("%s=%s" % (k, v) for (k, v) in test_case.items() if k != "name" and v is not None), color="orange") def run_one_normal_case(dut, one_case, junit_test_case): - reset_dut(dut) dut.start_capture_raw_data() @@ -330,7 +328,6 @@ def run_unit_test_cases(env, extra_data): class Handler(threading.Thread): - WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!') SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!') FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored") @@ -691,7 +688,6 @@ def run_multiple_stage_cases(env, extra_data): def detect_update_unit_test_info(env, extra_data, app_bin): - case_config = format_test_case_config(extra_data) for ut_config in case_config: @@ -785,7 +781,7 @@ if __name__ == '__main__': if len(test_item) == 0: continue pair = test_item.split(r':', 1) - if len(pair) == 1 or pair[0] is 'name': + if len(pair) == 1 or pair[0] == 'name': test_dict['name'] = pair[0] elif len(pair) == 2: if pair[0] == 'timeout' or pair[0] == 'child case num':