From 1d69302eae81d44243168500a35935eace4661bf Mon Sep 17 00:00:00 2001 From: Fu Hanxi Date: Mon, 31 Jul 2023 12:49:08 +0800 Subject: [PATCH] ci(pytest): move pytest utils into tools/ci/idf_pytest For future usage --- conftest.py | 470 +------------------------ tools/ci/check_build_test_rules.py | 3 +- tools/ci/ci_build_apps.py | 9 +- tools/ci/exclude_check_tools_files.txt | 1 + tools/ci/idf_ci_utils.py | 177 +--------- tools/ci/idf_pytest/__init__.py | 9 + tools/ci/idf_pytest/constants.py | 121 +++++++ tools/ci/idf_pytest/plugin.py | 286 +++++++++++++++ tools/ci/idf_pytest/script.py | 98 ++++++ tools/ci/idf_pytest/utils.py | 131 +++++++ 10 files changed, 674 insertions(+), 631 deletions(-) create mode 100644 tools/ci/idf_pytest/__init__.py create mode 100644 tools/ci/idf_pytest/constants.py create mode 100644 tools/ci/idf_pytest/plugin.py create mode 100644 tools/ci/idf_pytest/script.py create mode 100644 tools/ci/idf_pytest/utils.py diff --git a/conftest.py b/conftest.py index 53af0db382..cb11754fe0 100644 --- a/conftest.py +++ b/conftest.py @@ -19,30 +19,27 @@ import logging import os import re import sys -import xml.etree.ElementTree as ET from datetime import datetime -from fnmatch import fnmatch -from typing import Callable, Dict, List, Optional, Tuple +from typing import Callable, Optional import pytest -from _pytest.config import Config, ExitCode +from _pytest.config import Config from _pytest.fixtures import FixtureRequest -from _pytest.main import Session -from _pytest.nodes import Item -from _pytest.python import Function -from _pytest.reports import TestReport -from _pytest.runner import CallInfo -from _pytest.terminal import TerminalReporter from pytest_embedded.plugin import multi_dut_argument, multi_dut_fixture -from pytest_embedded.utils import find_by_suffix from pytest_embedded_idf.dut import IdfDut try: - from idf_ci_utils import IDF_PATH, to_list + from idf_ci_utils import IDF_PATH + from idf_pytest.constants import DEFAULT_SDKCONFIG, ENV_MARKERS, SPECIAL_MARKERS, TARGET_MARKERS + from idf_pytest.plugin import IDF_PYTEST_EMBEDDED_KEY, IdfPytestEmbedded + from idf_pytest.utils import format_case_id, get_target_marker_from_expr from idf_unity_tester import CaseTester except ImportError: sys.path.append(os.path.join(os.path.dirname(__file__), 'tools', 'ci')) - from idf_ci_utils import IDF_PATH, to_list + from idf_ci_utils import IDF_PATH + from idf_pytest.constants import DEFAULT_SDKCONFIG, ENV_MARKERS, SPECIAL_MARKERS, TARGET_MARKERS + from idf_pytest.plugin import IDF_PYTEST_EMBEDDED_KEY, IdfPytestEmbedded + from idf_pytest.utils import format_case_id, get_target_marker_from_expr from idf_unity_tester import CaseTester try: @@ -51,220 +48,6 @@ except ImportError: sys.path.append(os.path.join(os.path.dirname(__file__), 'tools', 'ci', 'python_packages')) import common_test_methods # noqa: F401 -SUPPORTED_TARGETS = ['esp32', 'esp32s2', 'esp32c3', 'esp32s3', 'esp32c2', 'esp32c6', 'esp32h2'] -PREVIEW_TARGETS: List[str] = [] # this PREVIEW_TARGETS excludes 'linux' target -DEFAULT_SDKCONFIG = 'default' - -TARGET_MARKERS = { - 'esp32': 'support esp32 target', - 'esp32s2': 'support esp32s2 target', - 'esp32s3': 'support esp32s3 target', - 'esp32c3': 'support esp32c3 target', - 'esp32c2': 'support esp32c2 target', - 'esp32c6': 'support esp32c6 target', - 'esp32h2': 'support esp32h2 target', - 'linux': 'support linux target', -} - -SPECIAL_MARKERS = { - 'supported_targets': "support all officially announced supported targets ('esp32', 'esp32s2', 'esp32c3', 'esp32s3', 'esp32c2', 'esp32c6')", - 'preview_targets': "support all preview targets ('none')", - 'all_targets': 'support all targets, including supported ones and preview ones', - 'temp_skip_ci': 'temp skip tests for specified targets only in ci', - 'temp_skip': 'temp skip tests for specified targets both in ci and locally', - 'nightly_run': 'tests should be executed as part of the nightly trigger pipeline', - 'host_test': 'tests which should not be built at the build stage, and instead built in host_test stage', - 'qemu': 'build and test using qemu-system-xtensa, not real target', -} - -ENV_MARKERS = { - # single-dut markers - 'generic': 'tests should be run on generic runners', - 'flash_suspend': 'support flash suspend feature', - 'ip101': 'connected via wired 10/100M ethernet', - 'lan8720': 'connected via LAN8720 ethernet transceiver', - 'quad_psram': 'runners with quad psram', - 'octal_psram': 'runners with octal psram', - 'usb_host': 'usb host runners', - 'usb_host_flash_disk': 'usb host runners with USB flash disk attached', - 'usb_device': 'usb device runners', - 'ethernet_ota': 'ethernet OTA runners', - 'flash_encryption': 'Flash Encryption runners', - 'flash_encryption_f4r8': 'Flash Encryption runners with 4-line flash and 8-line psram', - 'flash_encryption_f8r8': 'Flash Encryption runners with 8-line flash and 8-line psram', - 'flash_multi': 'Multiple flash chips tests', - 'psram': 'Chip has 4-line psram', - 'ir_transceiver': 'runners with a pair of IR transmitter and receiver', - 'twai_transceiver': 'runners with a TWAI PHY transceiver', - 'flash_encryption_wifi_high_traffic': 'Flash Encryption runners with wifi high traffic support', - 'ethernet': 'ethernet runner', - 'ethernet_flash_8m': 'ethernet runner with 8mb flash', - 'ethernet_router': 'both the runner and dut connect to the same router through ethernet NIC', - 'ethernet_vlan': 'ethernet runner GARM-32-SH-1-R16S5N3', - 'wifi_ap': 'a wifi AP in the environment', - 'wifi_router': 'both the runner and dut connect to the same wifi router', - 'wifi_high_traffic': 'wifi high traffic runners', - 'wifi_wlan': 'wifi runner with a wireless NIC', - 'Example_ShieldBox_Basic': 'basic configuration of the AP and ESP DUT placed in shielded box', - 'Example_ShieldBox': 'multiple shielded APs connected to shielded ESP DUT via RF cable with programmable attenuator', - 'xtal_26mhz': 'runner with 26MHz xtal on board', - 'xtal_40mhz': 'runner with 40MHz xtal on board', - 'external_flash': 'external flash memory connected via VSPI (FSPI)', - 'sdcard_sdmode': 'sdcard running in SD mode', - 'sdcard_spimode': 'sdcard running in SPI mode', - 'emmc': 'eMMC card', - 'MSPI_F8R8': 'runner with Octal Flash and Octal PSRAM', - 'MSPI_F4R8': 'runner with Quad Flash and Octal PSRAM', - 'MSPI_F4R4': 'runner with Quad Flash and Quad PSRAM', - 'jtag': 'runner where the chip is accessible through JTAG as well', - 'usb_serial_jtag': 'runner where the chip is accessible through builtin JTAG as well', - 'adc': 'ADC related tests should run on adc runners', - 'xtal32k': 'Runner with external 32k crystal connected', - 'no32kXtal': 'Runner with no external 32k crystal connected', - 'multi_dut_modbus_rs485': 'a pair of runners connected by RS485 bus', - 'psramv0': 'Runner with PSRAM version 0', - 'esp32eco3': 'Runner with esp32 eco3 connected', - 'ecdsa_efuse': 'Runner with test ECDSA private keys programmed in efuse', - 'ccs811': 'Runner with CCS811 connected', - 'eth_w5500': 'SPI Ethernet module with two W5500', - 'nvs_encr_hmac': 'Runner with test HMAC key programmed in efuse', - 'i2c_oled': 'Runner with ssd1306 I2C oled connected', - 'httpbin': 'runner for tests that need to access the httpbin service', - # multi-dut markers - 'ieee802154': 'ieee802154 related tests should run on ieee802154 runners.', - 'openthread_br': 'tests should be used for openthread border router.', - 'openthread_sleep': 'tests should be used for openthread sleepy device.', - 'zigbee_multi_dut': 'zigbee runner which have multiple duts.', - 'wifi_two_dut': 'tests should be run on runners which has two wifi duts connected.', - 'generic_multi_device': 'generic multiple devices whose corresponding gpio pins are connected to each other.', - 'twai_network': 'multiple runners form a TWAI network.', - 'sdio_master_slave': 'Test sdio multi board.', -} - -SUB_JUNIT_FILENAME = 'dut.xml' - - -################## -# Help Functions # -################## -def format_case_id(target: Optional[str], config: Optional[str], case: str, is_qemu: bool = False) -> str: - parts = [] - if target: - parts.append((str(target) + '_qemu') if is_qemu else str(target)) - if config: - parts.append(str(config)) - parts.append(case) - - return '.'.join(parts) - - -def item_marker_names(item: Item) -> List[str]: - return [marker.name for marker in item.iter_markers()] - - -def item_target_marker_names(item: Item) -> List[str]: - res = set() - for marker in item.iter_markers(): - if marker.name in TARGET_MARKERS: - res.add(marker.name) - - return sorted(res) - - -def item_env_marker_names(item: Item) -> List[str]: - res = set() - for marker in item.iter_markers(): - if marker.name in ENV_MARKERS: - res.add(marker.name) - - return sorted(res) - - -def item_skip_targets(item: Item) -> List[str]: - def _get_temp_markers_disabled_targets(marker_name: str) -> List[str]: - temp_marker = item.get_closest_marker(marker_name) - - if not temp_marker: - return [] - - # temp markers should always use keyword arguments `targets` and `reason` - if not temp_marker.kwargs.get('targets') or not temp_marker.kwargs.get('reason'): - raise ValueError( - f'`{marker_name}` should always use keyword arguments `targets` and `reason`. ' - f'For example: ' - f'`@pytest.mark.{marker_name}(targets=["esp32"], reason="IDF-xxxx, will fix it ASAP")`' - ) - - return to_list(temp_marker.kwargs['targets']) # type: ignore - - temp_skip_ci_targets = _get_temp_markers_disabled_targets('temp_skip_ci') - temp_skip_targets = _get_temp_markers_disabled_targets('temp_skip') - - # in CI we skip the union of `temp_skip` and `temp_skip_ci` - if os.getenv('CI_JOB_ID'): - skip_targets = list(set(temp_skip_ci_targets).union(set(temp_skip_targets))) - else: # we use `temp_skip` locally - skip_targets = temp_skip_targets - - return skip_targets - - -def get_target_marker_from_expr(markexpr: str) -> str: - candidates = set() - # we use `-m "esp32 and generic"` in our CI to filter the test cases - # this doesn't cover all use cases, but fit what we do in CI. - for marker in markexpr.split('and'): - marker = marker.strip() - if marker in TARGET_MARKERS: - candidates.add(marker) - - if len(candidates) > 1: - raise ValueError(f'Specified more than one target markers: {candidates}. Please specify no more than one.') - elif len(candidates) == 1: - return candidates.pop() - else: - raise ValueError('Please specify one target marker via "--target [TARGET]" or via "-m [TARGET]"') - - -def merge_junit_files(junit_files: List[str], target_path: str) -> None: - if len(junit_files) <= 1: - return - - merged_testsuite: ET.Element = ET.Element('testsuite') - testcases: Dict[str, ET.Element] = {} - for junit in junit_files: - logging.info(f'Merging {junit} to {target_path}') - tree: ET.ElementTree = ET.parse(junit) - testsuite: ET.Element = tree.getroot() - - for testcase in testsuite.findall('testcase'): - name: str = testcase.get('name') if testcase.get('name') else '' # type: ignore - - if name not in testcases: - testcases[name] = testcase - merged_testsuite.append(testcase) - continue - - existing_testcase = testcases[name] - for element_name in ['failure', 'error']: - for element in testcase.findall(element_name): - existing_element = existing_testcase.find(element_name) - if existing_element is None: - existing_testcase.append(element) - else: - existing_element.attrib.setdefault('message', '') # type: ignore - existing_element.attrib['message'] += '. ' + element.get('message', '') # type: ignore - - os.remove(junit) - - merged_testsuite.set('tests', str(len(merged_testsuite.findall('testcase')))) - merged_testsuite.set('failures', str(len(merged_testsuite.findall('.//testcase/failure')))) - merged_testsuite.set('errors', str(len(merged_testsuite.findall('.//testcase/error')))) - merged_testsuite.set('skipped', str(len(merged_testsuite.findall('.//testcase/skipped')))) - - with open(target_path, 'wb') as fw: - fw.write(ET.tostring(merged_testsuite)) - ############ # Fixtures # @@ -293,7 +76,7 @@ def case_tester(dut: IdfDut, **kwargs): # type: ignore @pytest.fixture @multi_dut_argument def config(request: FixtureRequest) -> str: - return getattr(request, 'param', None) or DEFAULT_SDKCONFIG + return getattr(request, 'param', None) or DEFAULT_SDKCONFIG # type: ignore @pytest.fixture @@ -304,7 +87,7 @@ def test_func_name(request: FixtureRequest) -> str: @pytest.fixture def test_case_name(request: FixtureRequest, target: str, config: str) -> str: is_qemu = request._pyfuncitem.get_closest_marker('qemu') is not None - return format_case_id(target, config, request.node.originalname, is_qemu=is_qemu) + return format_case_id(target, config, request.node.originalname, is_qemu=is_qemu) # type: ignore @pytest.fixture @@ -330,7 +113,7 @@ def build_dir(request: FixtureRequest, app_path: str, target: Optional[str], con check_dirs.append(f'build_{config}') check_dirs.append('build') - idf_pytest_embedded = request.config.stash[_idf_pytest_embedded_key] + idf_pytest_embedded = request.config.stash[IDF_PYTEST_EMBEDDED_KEY] build_dir = None if idf_pytest_embedded.apps_list is not None: @@ -510,11 +293,6 @@ def pytest_addoption(parser: pytest.Parser) -> None: ) -_idf_pytest_embedded_key = pytest.StashKey['IdfPytestEmbedded']() -_item_failed_cases_key = pytest.StashKey[list]() -_item_failed_key = pytest.StashKey[bool]() - - def pytest_configure(config: Config) -> None: # cli option "--target" target = config.getoption('target') or '' @@ -551,234 +329,20 @@ def pytest_configure(config: Config) -> None: ) continue - config.stash[_idf_pytest_embedded_key] = IdfPytestEmbedded( + config.stash[IDF_PYTEST_EMBEDDED_KEY] = IdfPytestEmbedded( target=target, sdkconfig=config.getoption('sdkconfig'), known_failure_cases_file=config.getoption('known_failure_cases_file'), apps_list=apps_list, ) - config.pluginmanager.register(config.stash[_idf_pytest_embedded_key]) + config.pluginmanager.register(config.stash[IDF_PYTEST_EMBEDDED_KEY]) for name, description in {**TARGET_MARKERS, **ENV_MARKERS, **SPECIAL_MARKERS}.items(): config.addinivalue_line('markers', f'{name}: {description}') def pytest_unconfigure(config: Config) -> None: - _pytest_embedded = config.stash.get(_idf_pytest_embedded_key, None) + _pytest_embedded = config.stash.get(IDF_PYTEST_EMBEDDED_KEY, None) if _pytest_embedded: - del config.stash[_idf_pytest_embedded_key] + del config.stash[IDF_PYTEST_EMBEDDED_KEY] config.pluginmanager.unregister(_pytest_embedded) - - -class IdfPytestEmbedded: - def __init__( - self, - target: str, - sdkconfig: Optional[str] = None, - known_failure_cases_file: Optional[str] = None, - apps_list: Optional[List[str]] = None, - ): - # CLI options to filter the test cases - self.target = target.lower() - self.sdkconfig = sdkconfig - self.known_failure_patterns = self._parse_known_failure_cases_file(known_failure_cases_file) - self.apps_list = apps_list - - self._failed_cases: List[Tuple[str, bool, bool]] = [] # (test_case_name, is_known_failure_cases, is_xfail) - - @property - def failed_cases(self) -> List[str]: - return [case for case, is_known, is_xfail in self._failed_cases if not is_known and not is_xfail] - - @property - def known_failure_cases(self) -> List[str]: - return [case for case, is_known, _ in self._failed_cases if is_known] - - @property - def xfail_cases(self) -> List[str]: - return [case for case, _, is_xfail in self._failed_cases if is_xfail] - - @staticmethod - def _parse_known_failure_cases_file( - known_failure_cases_file: Optional[str] = None, - ) -> List[str]: - if not known_failure_cases_file or not os.path.isfile(known_failure_cases_file): - return [] - - patterns = [] - with open(known_failure_cases_file) as fr: - for line in fr.readlines(): - if not line: - continue - if not line.strip(): - continue - without_comments = line.split('#')[0].strip() - if without_comments: - patterns.append(without_comments) - - return patterns - - @pytest.hookimpl(tryfirst=True) - def pytest_sessionstart(self, session: Session) -> None: - # same behavior for vanilla pytest-embedded '--target' - session.config.option.target = self.target - - @pytest.hookimpl(tryfirst=True) - def pytest_collection_modifyitems(self, items: List[Function]) -> None: - # sort by file path and callspec.config - # implement like this since this is a limitation of pytest, couldn't get fixture values while collecting - # https://github.com/pytest-dev/pytest/discussions/9689 - # after sort the test apps, the test may use the app cache to reduce the flash times. - def _get_param_config(_item: Function) -> str: - if hasattr(_item, 'callspec'): - return _item.callspec.params.get('config', DEFAULT_SDKCONFIG) # type: ignore - return DEFAULT_SDKCONFIG - - items.sort(key=lambda x: (os.path.dirname(x.path), _get_param_config(x))) - - # set default timeout 10 minutes for each case - for item in items: - if 'timeout' not in item.keywords: - item.add_marker(pytest.mark.timeout(10 * 60)) - - # add markers for special markers - for item in items: - if 'supported_targets' in item.keywords: - for _target in SUPPORTED_TARGETS: - item.add_marker(_target) - if 'preview_targets' in item.keywords: - for _target in PREVIEW_TARGETS: - item.add_marker(_target) - if 'all_targets' in item.keywords: - for _target in [*SUPPORTED_TARGETS, *PREVIEW_TARGETS]: - item.add_marker(_target) - - # add 'xtal_40mhz' tag as a default tag for esp32c2 target - # only add this marker for esp32c2 cases - if ( - self.target == 'esp32c2' - and 'esp32c2' in item_marker_names(item) - and 'xtal_26mhz' not in item_marker_names(item) - ): - item.add_marker('xtal_40mhz') - - # filter all the test cases with "nightly_run" marker - if os.getenv('INCLUDE_NIGHTLY_RUN') == '1': - # Do not filter nightly_run cases - pass - elif os.getenv('NIGHTLY_RUN') == '1': - items[:] = [item for item in items if 'nightly_run' in item_marker_names(item)] - else: - items[:] = [item for item in items if 'nightly_run' not in item_marker_names(item)] - - # filter all the test cases with target and skip_targets - items[:] = [ - item - for item in items - if self.target in item_marker_names(item) and self.target not in item_skip_targets(item) - ] - - # filter all the test cases with cli option "config" - if self.sdkconfig: - items[:] = [item for item in items if _get_param_config(item) == self.sdkconfig] - - def pytest_runtest_makereport(self, item: Function, call: CallInfo[None]) -> Optional[TestReport]: - report = TestReport.from_item_and_call(item, call) - if item.stash.get(_item_failed_key, None) is None: - item.stash[_item_failed_key] = False - - if report.outcome == 'failed': - # Mark the failed test cases - # - # This hook function would be called in 3 phases, setup, call, teardown. - # the report.outcome is the outcome of the single call of current phase, which is independent - # the call phase outcome is the test result - item.stash[_item_failed_key] = True - - if call.when == 'teardown': - item_failed = item.stash[_item_failed_key] - if item_failed: - # unity real test cases - failed_sub_cases = item.stash.get(_item_failed_cases_key, []) - if failed_sub_cases: - for test_case_name in failed_sub_cases: - self._failed_cases.append((test_case_name, self._is_known_failure(test_case_name), False)) - else: # the case iteself is failing - test_case_name = item.funcargs.get('test_case_name', '') - if test_case_name: - self._failed_cases.append( - ( - test_case_name, - self._is_known_failure(test_case_name), - report.keywords.get('xfail', False), - ) - ) - - return report - - def _is_known_failure(self, case_id: str) -> bool: - for pattern in self.known_failure_patterns: - if case_id == pattern: - return True - if fnmatch(case_id, pattern): - return True - return False - - @pytest.hookimpl(trylast=True) - def pytest_runtest_teardown(self, item: Function) -> None: - """ - Format the test case generated junit reports - """ - tempdir = item.funcargs.get('test_case_tempdir') - if not tempdir: - return - - junits = find_by_suffix('.xml', tempdir) - if not junits: - return - - if len(junits) > 1: - merge_junit_files(junits, os.path.join(tempdir, SUB_JUNIT_FILENAME)) - junits = [os.path.join(tempdir, SUB_JUNIT_FILENAME)] - - is_qemu = item.get_closest_marker('qemu') is not None - failed_sub_cases = [] - target = item.funcargs['target'] - config = item.funcargs['config'] - for junit in junits: - xml = ET.parse(junit) - testcases = xml.findall('.//testcase') - for case in testcases: - # modify the junit files - new_case_name = format_case_id(target, config, case.attrib['name'], is_qemu=is_qemu) - case.attrib['name'] = new_case_name - if 'file' in case.attrib: - case.attrib['file'] = case.attrib['file'].replace('/IDF/', '') # our unity test framework - - # collect real failure cases - if case.find('failure') is not None: - failed_sub_cases.append(new_case_name) - - xml.write(junit) - - item.stash[_item_failed_cases_key] = failed_sub_cases - - def pytest_sessionfinish(self, session: Session, exitstatus: int) -> None: - if exitstatus != 0: - if exitstatus == ExitCode.NO_TESTS_COLLECTED: - session.exitstatus = 0 - elif self.known_failure_cases and not self.failed_cases: - session.exitstatus = 0 - - def pytest_terminal_summary(self, terminalreporter: TerminalReporter) -> None: - if self.known_failure_cases: - terminalreporter.section('Known failure cases', bold=True, yellow=True) - terminalreporter.line('\n'.join(self.known_failure_cases)) - - if self.xfail_cases: - terminalreporter.section('xfail cases', bold=True, yellow=True) - terminalreporter.line('\n'.join(self.xfail_cases)) - - if self.failed_cases: - terminalreporter.section('Failed cases', bold=True, red=True) - terminalreporter.line('\n'.join(self.failed_cases)) diff --git a/tools/ci/check_build_test_rules.py b/tools/ci/check_build_test_rules.py index 886ca50d1d..be7bb9c442 100755 --- a/tools/ci/check_build_test_rules.py +++ b/tools/ci/check_build_test_rules.py @@ -13,7 +13,7 @@ from pathlib import Path from typing import Dict, List, Optional, Tuple import yaml -from idf_ci_utils import IDF_PATH, get_pytest_cases, get_ttfw_cases +from idf_ci_utils import IDF_PATH, get_ttfw_cases YES = u'\u2713' NO = u'\u2717' @@ -215,6 +215,7 @@ def check_test_scripts( ) -> None: from idf_build_apps import App, find_apps from idf_build_apps.constants import SUPPORTED_TARGETS + from idf_pytest.script import get_pytest_cases # takes long time, run only in CI # dict: diff --git a/tools/ci/ci_build_apps.py b/tools/ci/ci_build_apps.py index 24413b5050..956414a130 100644 --- a/tools/ci/ci_build_apps.py +++ b/tools/ci/ci_build_apps.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2021-2023 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 """ @@ -16,7 +16,7 @@ from pathlib import Path import yaml from idf_build_apps import LOGGER, App, build_apps, find_apps, setup_logging from idf_build_apps.constants import SUPPORTED_TARGETS -from idf_ci_utils import IDF_PATH, PytestApp, get_pytest_cases, get_ttfw_app_paths +from idf_ci_utils import IDF_PATH, get_ttfw_app_paths CI_ENV_VARS = { 'EXTRA_CFLAGS': '-Werror -Werror=deprecated-declarations -Werror=unused-variable ' @@ -39,6 +39,8 @@ def get_pytest_apps( modified_files: t.Optional[t.List[str]] = None, ignore_app_dependencies_filepatterns: t.Optional[t.List[str]] = None, ) -> t.List[App]: + from idf_pytest.script import get_pytest_cases + pytest_cases = get_pytest_cases(paths, target, marker_expr, filter_expr) _paths: t.Set[str] = set() @@ -103,6 +105,9 @@ def get_cmake_apps( modified_files: t.Optional[t.List[str]] = None, ignore_app_dependencies_filepatterns: t.Optional[t.List[str]] = None, ) -> t.List[App]: + from idf_pytest.constants import PytestApp + from idf_pytest.script import get_pytest_cases + ttfw_app_dirs = get_ttfw_app_paths(paths, target) apps = find_apps( diff --git a/tools/ci/exclude_check_tools_files.txt b/tools/ci/exclude_check_tools_files.txt index abf3844eb8..eac77c20c9 100644 --- a/tools/ci/exclude_check_tools_files.txt +++ b/tools/ci/exclude_check_tools_files.txt @@ -39,3 +39,4 @@ tools/templates/sample_component/CMakeLists.txt tools/templates/sample_component/include/main.h tools/templates/sample_component/main.c tools/ci/cleanup_ignore_lists.py +tools/ci/idf_pytest/**/* diff --git a/tools/ci/idf_ci_utils.py b/tools/ci/idf_ci_utils.py index c8335a738d..8b78a5051c 100644 --- a/tools/ci/idf_ci_utils.py +++ b/tools/ci/idf_ci_utils.py @@ -1,29 +1,15 @@ # internal use only for CI # some CI related util functions # -# SPDX-FileCopyrightText: 2020-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2020-2023 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 # import contextlib -import io import logging import os import subprocess import sys -from contextlib import redirect_stdout -from dataclasses import dataclass -from pathlib import Path -from typing import TYPE_CHECKING, Any, List, Optional, Set, Union - -try: - from idf_py_actions.constants import PREVIEW_TARGETS, SUPPORTED_TARGETS -except ImportError: - sys.path.append(os.path.join(os.path.dirname(__file__), '..')) - - from idf_py_actions.constants import PREVIEW_TARGETS, SUPPORTED_TARGETS - -if TYPE_CHECKING: - from _pytest.python import Function +from typing import Any, List, Optional, Set, Union IDF_PATH = os.path.abspath(os.getenv('IDF_PATH', os.path.join(os.path.dirname(__file__), '..', '..'))) @@ -127,165 +113,6 @@ def to_list(s: Any) -> List[Any]: return [s] -#################### -# Pytest Utilities # -#################### -@dataclass -class PytestApp: - path: str - target: str - config: str - - def __hash__(self) -> int: - return hash((self.path, self.target, self.config)) - - -@dataclass -class PytestCase: - path: str - name: str - apps: Set[PytestApp] - - nightly_run: bool - - def __hash__(self) -> int: - return hash((self.path, self.name, self.apps, self.nightly_run)) - - -class PytestCollectPlugin: - def __init__(self, target: str) -> None: - self.target = target - self.cases: List[PytestCase] = [] - - @staticmethod - def get_param(item: 'Function', key: str, default: Any = None) -> Any: - if not hasattr(item, 'callspec'): - raise ValueError(f'Function {item} does not have params') - - return item.callspec.params.get(key, default) or default - - def pytest_report_collectionfinish(self, items: List['Function']) -> None: - from pytest_embedded.plugin import parse_multi_dut_args - - for item in items: - count = 1 - case_path = str(item.path) - case_name = item.originalname - target = self.target - # funcargs is not calculated while collection - if hasattr(item, 'callspec'): - count = item.callspec.params.get('count', 1) - app_paths = to_list( - parse_multi_dut_args( - count, - self.get_param(item, 'app_path', os.path.dirname(case_path)), - ) - ) - configs = to_list(parse_multi_dut_args(count, self.get_param(item, 'config', 'default'))) - targets = to_list(parse_multi_dut_args(count, self.get_param(item, 'target', target))) - else: - app_paths = [os.path.dirname(case_path)] - configs = ['default'] - targets = [target] - - case_apps = set() - for i in range(count): - case_apps.add(PytestApp(app_paths[i], targets[i], configs[i])) - - self.cases.append( - PytestCase( - case_path, - case_name, - case_apps, - 'nightly_run' in [marker.name for marker in item.iter_markers()], - ) - ) - - -def get_pytest_files(paths: List[str]) -> List[str]: - # this is a workaround to solve pytest collector super slow issue - # benchmark with - # - time pytest -m esp32 --collect-only - # user=15.57s system=1.35s cpu=95% total=17.741 - # - time { find -name 'pytest_*.py'; } | xargs pytest -m esp32 --collect-only - # user=0.11s system=0.63s cpu=36% total=2.044 - # user=1.76s system=0.22s cpu=43% total=4.539 - # use glob.glob would also save a bunch of time - pytest_scripts: Set[str] = set() - for p in paths: - path = Path(p) - pytest_scripts.update(str(_p) for _p in path.glob('**/pytest_*.py') if 'managed_components' not in _p.parts) - - return list(pytest_scripts) - - -def get_pytest_cases( - paths: Union[str, List[str]], - target: str = 'all', - marker_expr: Optional[str] = None, - filter_expr: Optional[str] = None, -) -> List[PytestCase]: - import pytest - from _pytest.config import ExitCode - - if target == 'all': - targets = SUPPORTED_TARGETS + PREVIEW_TARGETS - else: - targets = [target] - - paths = to_list(paths) - - origin_include_nightly_run_env = os.getenv('INCLUDE_NIGHTLY_RUN') - origin_nightly_run_env = os.getenv('NIGHTLY_RUN') - - # disable the env vars to get all test cases - if 'INCLUDE_NIGHTLY_RUN' in os.environ: - os.environ.pop('INCLUDE_NIGHTLY_RUN') - - if 'NIGHTLY_RUN' in os.environ: - os.environ.pop('NIGHTLY_RUN') - - # collect all cases - os.environ['INCLUDE_NIGHTLY_RUN'] = '1' - - cases = [] # type: List[PytestCase] - pytest_scripts = get_pytest_files(paths) - if not pytest_scripts: - print(f'WARNING: no pytest scripts found for target {target} under paths {", ".join(paths)}') - return cases - - for target in targets: - collector = PytestCollectPlugin(target) - - with io.StringIO() as buf: - with redirect_stdout(buf): - cmd = ['--collect-only', *pytest_scripts, '--target', target, '-q'] - if marker_expr: - cmd.extend(['-m', marker_expr]) - if filter_expr: - cmd.extend(['-k', filter_expr]) - res = pytest.main(cmd, plugins=[collector]) - if res.value != ExitCode.OK: - if res.value == ExitCode.NO_TESTS_COLLECTED: - print(f'WARNING: no pytest app found for target {target} under paths {", ".join(paths)}') - else: - print(buf.getvalue()) - raise RuntimeError( - f'pytest collection failed at {", ".join(paths)} with command \"{" ".join(cmd)}\"' - ) - - cases.extend(collector.cases) - - # revert back the env vars - if origin_include_nightly_run_env is not None: - os.environ['INCLUDE_NIGHTLY_RUN'] = origin_include_nightly_run_env - - if origin_nightly_run_env is not None: - os.environ['NIGHTLY_RUN'] = origin_nightly_run_env - - return cases - - ################## # TTFW Utilities # ################## diff --git a/tools/ci/idf_pytest/__init__.py b/tools/ci/idf_pytest/__init__.py new file mode 100644 index 0000000000..185d511444 --- /dev/null +++ b/tools/ci/idf_pytest/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +import sys + +tools_dir = os.path.realpath(os.path.join(os.path.dirname(__file__), '..', '..')) +if tools_dir not in sys.path: + sys.path.append(tools_dir) diff --git a/tools/ci/idf_pytest/constants.py b/tools/ci/idf_pytest/constants.py new file mode 100644 index 0000000000..ce8474ca46 --- /dev/null +++ b/tools/ci/idf_pytest/constants.py @@ -0,0 +1,121 @@ +# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +""" +Pytest Related Constants. Don't import third-party packages here. +""" + +import typing as t +from dataclasses import dataclass + +SUPPORTED_TARGETS = ['esp32', 'esp32s2', 'esp32c3', 'esp32s3', 'esp32c2', 'esp32c6', 'esp32h2'] +PREVIEW_TARGETS: t.List[str] = [] # this PREVIEW_TARGETS excludes 'linux' target +DEFAULT_SDKCONFIG = 'default' + +TARGET_MARKERS = { + 'esp32': 'support esp32 target', + 'esp32s2': 'support esp32s2 target', + 'esp32s3': 'support esp32s3 target', + 'esp32c3': 'support esp32c3 target', + 'esp32c2': 'support esp32c2 target', + 'esp32c6': 'support esp32c6 target', + 'esp32h2': 'support esp32h2 target', + 'linux': 'support linux target', +} + +SPECIAL_MARKERS = { + 'supported_targets': "support all officially announced supported targets ('esp32', 'esp32s2', 'esp32c3', 'esp32s3', 'esp32c2', 'esp32c6')", + 'preview_targets': "support all preview targets ('none')", + 'all_targets': 'support all targets, including supported ones and preview ones', + 'temp_skip_ci': 'temp skip tests for specified targets only in ci', + 'temp_skip': 'temp skip tests for specified targets both in ci and locally', + 'nightly_run': 'tests should be executed as part of the nightly trigger pipeline', + 'host_test': 'tests which should not be built at the build stage, and instead built in host_test stage', + 'qemu': 'build and test using qemu-system-xtensa, not real target', +} + +ENV_MARKERS = { + # single-dut markers + 'generic': 'tests should be run on generic runners', + 'flash_suspend': 'support flash suspend feature', + 'ip101': 'connected via wired 10/100M ethernet', + 'lan8720': 'connected via LAN8720 ethernet transceiver', + 'quad_psram': 'runners with quad psram', + 'octal_psram': 'runners with octal psram', + 'usb_host': 'usb host runners', + 'usb_host_flash_disk': 'usb host runners with USB flash disk attached', + 'usb_device': 'usb device runners', + 'ethernet_ota': 'ethernet OTA runners', + 'flash_encryption': 'Flash Encryption runners', + 'flash_encryption_f4r8': 'Flash Encryption runners with 4-line flash and 8-line psram', + 'flash_encryption_f8r8': 'Flash Encryption runners with 8-line flash and 8-line psram', + 'flash_multi': 'Multiple flash chips tests', + 'psram': 'Chip has 4-line psram', + 'ir_transceiver': 'runners with a pair of IR transmitter and receiver', + 'twai_transceiver': 'runners with a TWAI PHY transceiver', + 'flash_encryption_wifi_high_traffic': 'Flash Encryption runners with wifi high traffic support', + 'ethernet': 'ethernet runner', + 'ethernet_flash_8m': 'ethernet runner with 8mb flash', + 'ethernet_router': 'both the runner and dut connect to the same router through ethernet NIC', + 'ethernet_vlan': 'ethernet runner GARM-32-SH-1-R16S5N3', + 'wifi_ap': 'a wifi AP in the environment', + 'wifi_router': 'both the runner and dut connect to the same wifi router', + 'wifi_high_traffic': 'wifi high traffic runners', + 'wifi_wlan': 'wifi runner with a wireless NIC', + 'Example_ShieldBox_Basic': 'basic configuration of the AP and ESP DUT placed in shielded box', + 'Example_ShieldBox': 'multiple shielded APs connected to shielded ESP DUT via RF cable with programmable attenuator', + 'xtal_26mhz': 'runner with 26MHz xtal on board', + 'xtal_40mhz': 'runner with 40MHz xtal on board', + 'external_flash': 'external flash memory connected via VSPI (FSPI)', + 'sdcard_sdmode': 'sdcard running in SD mode', + 'sdcard_spimode': 'sdcard running in SPI mode', + 'emmc': 'eMMC card', + 'MSPI_F8R8': 'runner with Octal Flash and Octal PSRAM', + 'MSPI_F4R8': 'runner with Quad Flash and Octal PSRAM', + 'MSPI_F4R4': 'runner with Quad Flash and Quad PSRAM', + 'jtag': 'runner where the chip is accessible through JTAG as well', + 'usb_serial_jtag': 'runner where the chip is accessible through builtin JTAG as well', + 'adc': 'ADC related tests should run on adc runners', + 'xtal32k': 'Runner with external 32k crystal connected', + 'no32kXtal': 'Runner with no external 32k crystal connected', + 'multi_dut_modbus_rs485': 'a pair of runners connected by RS485 bus', + 'psramv0': 'Runner with PSRAM version 0', + 'esp32eco3': 'Runner with esp32 eco3 connected', + 'ecdsa_efuse': 'Runner with test ECDSA private keys programmed in efuse', + 'ccs811': 'Runner with CCS811 connected', + 'eth_w5500': 'SPI Ethernet module with two W5500', + 'nvs_encr_hmac': 'Runner with test HMAC key programmed in efuse', + 'i2c_oled': 'Runner with ssd1306 I2C oled connected', + 'httpbin': 'runner for tests that need to access the httpbin service', + # multi-dut markers + 'ieee802154': 'ieee802154 related tests should run on ieee802154 runners.', + 'openthread_br': 'tests should be used for openthread border router.', + 'openthread_sleep': 'tests should be used for openthread sleepy device.', + 'zigbee_multi_dut': 'zigbee runner which have multiple duts.', + 'wifi_two_dut': 'tests should be run on runners which has two wifi duts connected.', + 'generic_multi_device': 'generic multiple devices whose corresponding gpio pins are connected to each other.', + 'twai_network': 'multiple runners form a TWAI network.', + 'sdio_master_slave': 'Test sdio multi board.', +} + + +@dataclass +class PytestApp: + path: str + target: str + config: str + + def __hash__(self) -> int: + return hash((self.path, self.target, self.config)) + + +@dataclass +class PytestCase: + path: str + name: str + apps: t.Set[PytestApp] + + nightly_run: bool + + def __hash__(self) -> int: + return hash((self.path, self.name, self.apps, self.nightly_run)) diff --git a/tools/ci/idf_pytest/plugin.py b/tools/ci/idf_pytest/plugin.py new file mode 100644 index 0000000000..a0db77dce6 --- /dev/null +++ b/tools/ci/idf_pytest/plugin.py @@ -0,0 +1,286 @@ +# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +import typing as t +from fnmatch import fnmatch +from xml.etree import ElementTree as ET + +import pytest +from _pytest.config import ExitCode +from _pytest.main import Session +from _pytest.python import Function +from _pytest.reports import TestReport +from _pytest.runner import CallInfo +from _pytest.terminal import TerminalReporter +from pytest_embedded.plugin import parse_multi_dut_args +from pytest_embedded.utils import find_by_suffix, to_list + +from .constants import DEFAULT_SDKCONFIG, PREVIEW_TARGETS, SUPPORTED_TARGETS, PytestApp, PytestCase +from .utils import format_case_id, item_marker_names, item_skip_targets, merge_junit_files + +IDF_PYTEST_EMBEDDED_KEY = pytest.StashKey['IdfPytestEmbedded']() +ITEM_FAILED_CASES_KEY = pytest.StashKey[list]() +ITEM_FAILED_KEY = pytest.StashKey[bool]() + + +class IdfPytestEmbedded: + def __init__( + self, + target: str, + sdkconfig: t.Optional[str] = None, + known_failure_cases_file: t.Optional[str] = None, + apps_list: t.Optional[t.List[str]] = None, + ): + # CLI options to filter the test cases + self.target = target.lower() + self.sdkconfig = sdkconfig + self.known_failure_patterns = self._parse_known_failure_cases_file(known_failure_cases_file) + self.apps_list = apps_list + + self._failed_cases: t.List[t.Tuple[str, bool, bool]] = [] # (test_case_name, is_known_failure_cases, is_xfail) + + @property + def failed_cases(self) -> t.List[str]: + return [case for case, is_known, is_xfail in self._failed_cases if not is_known and not is_xfail] + + @property + def known_failure_cases(self) -> t.List[str]: + return [case for case, is_known, _ in self._failed_cases if is_known] + + @property + def xfail_cases(self) -> t.List[str]: + return [case for case, _, is_xfail in self._failed_cases if is_xfail] + + @staticmethod + def _parse_known_failure_cases_file( + known_failure_cases_file: t.Optional[str] = None, + ) -> t.List[str]: + if not known_failure_cases_file or not os.path.isfile(known_failure_cases_file): + return [] + + patterns = [] + with open(known_failure_cases_file) as fr: + for line in fr.readlines(): + if not line: + continue + if not line.strip(): + continue + without_comments = line.split('#')[0].strip() + if without_comments: + patterns.append(without_comments) + + return patterns + + @pytest.hookimpl(tryfirst=True) + def pytest_sessionstart(self, session: Session) -> None: + # same behavior for vanilla pytest-embedded '--target' + session.config.option.target = self.target + + @pytest.hookimpl(tryfirst=True) + def pytest_collection_modifyitems(self, items: t.List[Function]) -> None: + # sort by file path and callspec.config + # implement like this since this is a limitation of pytest, couldn't get fixture values while collecting + # https://github.com/pytest-dev/pytest/discussions/9689 + # after sort the test apps, the test may use the app cache to reduce the flash times. + def _get_param_config(_item: Function) -> str: + if hasattr(_item, 'callspec'): + return _item.callspec.params.get('config', DEFAULT_SDKCONFIG) # type: ignore + return DEFAULT_SDKCONFIG # type: ignore + + items.sort(key=lambda x: (os.path.dirname(x.path), _get_param_config(x))) + + # set default timeout 10 minutes for each case + for item in items: + if 'timeout' not in item.keywords: + item.add_marker(pytest.mark.timeout(10 * 60)) + + # add markers for special markers + for item in items: + if 'supported_targets' in item.keywords: + for _target in SUPPORTED_TARGETS: + item.add_marker(_target) + if 'preview_targets' in item.keywords: + for _target in PREVIEW_TARGETS: + item.add_marker(_target) + if 'all_targets' in item.keywords: + for _target in [*SUPPORTED_TARGETS, *PREVIEW_TARGETS]: + item.add_marker(_target) + + # add 'xtal_40mhz' tag as a default tag for esp32c2 target + # only add this marker for esp32c2 cases + if ( + self.target == 'esp32c2' + and 'esp32c2' in item_marker_names(item) + and 'xtal_26mhz' not in item_marker_names(item) + ): + item.add_marker('xtal_40mhz') + + # filter all the test cases with "nightly_run" marker + if os.getenv('INCLUDE_NIGHTLY_RUN') == '1': + # Do not filter nightly_run cases + pass + elif os.getenv('NIGHTLY_RUN') == '1': + items[:] = [item for item in items if 'nightly_run' in item_marker_names(item)] + else: + items[:] = [item for item in items if 'nightly_run' not in item_marker_names(item)] + + # filter all the test cases with target and skip_targets + items[:] = [ + item + for item in items + if self.target in item_marker_names(item) and self.target not in item_skip_targets(item) + ] + + # filter all the test cases with cli option "config" + if self.sdkconfig: + items[:] = [item for item in items if _get_param_config(item) == self.sdkconfig] + + def pytest_runtest_makereport(self, item: Function, call: CallInfo[None]) -> t.Optional[TestReport]: + report = TestReport.from_item_and_call(item, call) + if item.stash.get(ITEM_FAILED_KEY, None) is None: + item.stash[ITEM_FAILED_KEY] = False + + if report.outcome == 'failed': + # Mark the failed test cases + # + # This hook function would be called in 3 phases, setup, call, teardown. + # the report.outcome is the outcome of the single call of current phase, which is independent + # the call phase outcome is the test result + item.stash[ITEM_FAILED_KEY] = True + + if call.when == 'teardown': + item_failed = item.stash[ITEM_FAILED_KEY] + if item_failed: + # unity real test cases + failed_sub_cases = item.stash.get(ITEM_FAILED_CASES_KEY, []) + if failed_sub_cases: + for test_case_name in failed_sub_cases: + self._failed_cases.append((test_case_name, self._is_known_failure(test_case_name), False)) + else: # the case iteself is failing + test_case_name = item.funcargs.get('test_case_name', '') + if test_case_name: + self._failed_cases.append( + ( + test_case_name, + self._is_known_failure(test_case_name), + report.keywords.get('xfail', False), + ) + ) + + return report + + def _is_known_failure(self, case_id: str) -> bool: + for pattern in self.known_failure_patterns: + if case_id == pattern: + return True + if fnmatch(case_id, pattern): + return True + return False + + @pytest.hookimpl(trylast=True) + def pytest_runtest_teardown(self, item: Function) -> None: + """ + Format the test case generated junit reports + """ + tempdir = item.funcargs.get('test_case_tempdir') + if not tempdir: + return + + junits = find_by_suffix('.xml', tempdir) + if not junits: + return + + if len(junits) > 1: + merge_junit_files(junits, os.path.join(tempdir, 'dut.xml')) + junits = [os.path.join(tempdir, 'dut.xml')] + + is_qemu = item.get_closest_marker('qemu') is not None + failed_sub_cases = [] + target = item.funcargs['target'] + config = item.funcargs['config'] + for junit in junits: + xml = ET.parse(junit) + testcases = xml.findall('.//testcase') + for case in testcases: + # modify the junit files + new_case_name = format_case_id(target, config, case.attrib['name'], is_qemu=is_qemu) + case.attrib['name'] = new_case_name + if 'file' in case.attrib: + case.attrib['file'] = case.attrib['file'].replace('/IDF/', '') # our unity test framework + + # collect real failure cases + if case.find('failure') is not None: + failed_sub_cases.append(new_case_name) + + xml.write(junit) + + item.stash[ITEM_FAILED_CASES_KEY] = failed_sub_cases + + def pytest_sessionfinish(self, session: Session, exitstatus: int) -> None: + if exitstatus != 0: + if exitstatus == ExitCode.NO_TESTS_COLLECTED: + session.exitstatus = 0 + elif self.known_failure_cases and not self.failed_cases: + session.exitstatus = 0 + + def pytest_terminal_summary(self, terminalreporter: TerminalReporter) -> None: + if self.known_failure_cases: + terminalreporter.section('Known failure cases', bold=True, yellow=True) + terminalreporter.line('\n'.join(self.known_failure_cases)) + + if self.xfail_cases: + terminalreporter.section('xfail cases', bold=True, yellow=True) + terminalreporter.line('\n'.join(self.xfail_cases)) + + if self.failed_cases: + terminalreporter.section('Failed cases', bold=True, red=True) + terminalreporter.line('\n'.join(self.failed_cases)) + + +class PytestCollectPlugin: + def __init__(self, target: str) -> None: + self.target = target + self.cases: t.List[PytestCase] = [] + + @staticmethod + def get_param(item: 'Function', key: str, default: t.Any = None) -> t.Any: + if not hasattr(item, 'callspec'): + raise ValueError(f'Function {item} does not have params') + + return item.callspec.params.get(key, default) or default + + def pytest_report_collectionfinish(self, items: t.List['Function']) -> None: + for item in items: + count = 1 + case_path = str(item.path) + case_name = item.originalname + target = self.target + # funcargs is not calculated while collection + if hasattr(item, 'callspec'): + count = item.callspec.params.get('count', 1) + app_paths = to_list( + parse_multi_dut_args( + count, + self.get_param(item, 'app_path', os.path.dirname(case_path)), + ) + ) + configs = to_list(parse_multi_dut_args(count, self.get_param(item, 'config', 'default'))) + targets = to_list(parse_multi_dut_args(count, self.get_param(item, 'target', target))) + else: + app_paths = [os.path.dirname(case_path)] + configs = ['default'] + targets = [target] + + case_apps = set() + for i in range(count): + case_apps.add(PytestApp(app_paths[i], targets[i], configs[i])) + + self.cases.append( + PytestCase( + case_path, + case_name, + case_apps, + 'nightly_run' in [marker.name for marker in item.iter_markers()], + ) + ) diff --git a/tools/ci/idf_pytest/script.py b/tools/ci/idf_pytest/script.py new file mode 100644 index 0000000000..9a5eb31da4 --- /dev/null +++ b/tools/ci/idf_pytest/script.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import io +import os +import typing as t +from contextlib import redirect_stdout +from pathlib import Path + +import pytest +from _pytest.config import ExitCode +from idf_py_actions.constants import PREVIEW_TARGETS as TOOLS_PREVIEW_TARGETS +from idf_py_actions.constants import SUPPORTED_TARGETS as TOOLS_SUPPORTED_TARGETS +from pytest_embedded.utils import to_list + +from .constants import PytestCase +from .plugin import PytestCollectPlugin + + +def get_pytest_files(paths: t.List[str]) -> t.List[str]: + # this is a workaround to solve pytest collector super slow issue + # benchmark with + # - time pytest -m esp32 --collect-only + # user=15.57s system=1.35s cpu=95% total=17.741 + # - time { find -name 'pytest_*.py'; } | xargs pytest -m esp32 --collect-only + # user=0.11s system=0.63s cpu=36% total=2.044 + # user=1.76s system=0.22s cpu=43% total=4.539 + # use glob.glob would also save a bunch of time + pytest_scripts: t.Set[str] = set() + for p in paths: + path = Path(p) + pytest_scripts.update(str(_p) for _p in path.glob('**/pytest_*.py') if 'managed_components' not in _p.parts) + + return list(pytest_scripts) + + +def get_pytest_cases( + paths: t.Union[str, t.List[str]], + target: str = 'all', + marker_expr: t.Optional[str] = None, + filter_expr: t.Optional[str] = None, +) -> t.List[PytestCase]: + if target == 'all': + targets = TOOLS_SUPPORTED_TARGETS + TOOLS_PREVIEW_TARGETS + else: + targets = [target] + + paths = to_list(paths) + + origin_include_nightly_run_env = os.getenv('INCLUDE_NIGHTLY_RUN') + origin_nightly_run_env = os.getenv('NIGHTLY_RUN') + + # disable the env vars to get all test cases + if 'INCLUDE_NIGHTLY_RUN' in os.environ: + os.environ.pop('INCLUDE_NIGHTLY_RUN') + + if 'NIGHTLY_RUN' in os.environ: + os.environ.pop('NIGHTLY_RUN') + + # collect all cases + os.environ['INCLUDE_NIGHTLY_RUN'] = '1' + + cases: t.List[PytestCase] = [] + pytest_scripts = get_pytest_files(paths) # type: ignore + if not pytest_scripts: + print(f'WARNING: no pytest scripts found for target {target} under paths {", ".join(paths)}') + return cases + + for target in targets: + collector = PytestCollectPlugin(target) + + with io.StringIO() as buf: + with redirect_stdout(buf): + cmd = ['--collect-only', *pytest_scripts, '--target', target, '-q'] + if marker_expr: + cmd.extend(['-m', marker_expr]) + if filter_expr: + cmd.extend(['-k', filter_expr]) + res = pytest.main(cmd, plugins=[collector]) + if res.value != ExitCode.OK: + if res.value == ExitCode.NO_TESTS_COLLECTED: + print(f'WARNING: no pytest app found for target {target} under paths {", ".join(paths)}') + else: + print(buf.getvalue()) + raise RuntimeError( + f'pytest collection failed at {", ".join(paths)} with command \"{" ".join(cmd)}\"' + ) + + cases.extend(collector.cases) + + # revert back the env vars + if origin_include_nightly_run_env is not None: + os.environ['INCLUDE_NIGHTLY_RUN'] = origin_include_nightly_run_env + + if origin_nightly_run_env is not None: + os.environ['NIGHTLY_RUN'] = origin_nightly_run_env + + return cases diff --git a/tools/ci/idf_pytest/utils.py b/tools/ci/idf_pytest/utils.py new file mode 100644 index 0000000000..522dba7e84 --- /dev/null +++ b/tools/ci/idf_pytest/utils.py @@ -0,0 +1,131 @@ +# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import typing as t +from xml.etree import ElementTree as ET + +from _pytest.nodes import Item +from pytest_embedded.utils import to_list + +from .constants import ENV_MARKERS, TARGET_MARKERS + + +def format_case_id(target: t.Optional[str], config: t.Optional[str], case: str, is_qemu: bool = False) -> str: + parts = [] + if target: + parts.append((str(target) + '_qemu') if is_qemu else str(target)) + if config: + parts.append(str(config)) + parts.append(case) + + return '.'.join(parts) + + +def item_marker_names(item: Item) -> t.List[str]: + return [marker.name for marker in item.iter_markers()] + + +def item_target_marker_names(item: Item) -> t.List[str]: + res = set() + for marker in item.iter_markers(): + if marker.name in TARGET_MARKERS: + res.add(marker.name) + + return sorted(res) + + +def item_env_marker_names(item: Item) -> t.List[str]: + res = set() + for marker in item.iter_markers(): + if marker.name in ENV_MARKERS: + res.add(marker.name) + + return sorted(res) + + +def item_skip_targets(item: Item) -> t.List[str]: + def _get_temp_markers_disabled_targets(marker_name: str) -> t.List[str]: + temp_marker = item.get_closest_marker(marker_name) + + if not temp_marker: + return [] + + # temp markers should always use keyword arguments `targets` and `reason` + if not temp_marker.kwargs.get('targets') or not temp_marker.kwargs.get('reason'): + raise ValueError( + f'`{marker_name}` should always use keyword arguments `targets` and `reason`. ' + f'For example: ' + f'`@pytest.mark.{marker_name}(targets=["esp32"], reason="IDF-xxxx, will fix it ASAP")`' + ) + + return to_list(temp_marker.kwargs['targets']) # type: ignore + + temp_skip_ci_targets = _get_temp_markers_disabled_targets('temp_skip_ci') + temp_skip_targets = _get_temp_markers_disabled_targets('temp_skip') + + # in CI we skip the union of `temp_skip` and `temp_skip_ci` + if os.getenv('CI_JOB_ID'): + skip_targets = list(set(temp_skip_ci_targets).union(set(temp_skip_targets))) + else: # we use `temp_skip` locally + skip_targets = temp_skip_targets + + return skip_targets + + +def get_target_marker_from_expr(markexpr: str) -> str: + candidates = set() + # we use `-m "esp32 and generic"` in our CI to filter the test cases + # this doesn't cover all use cases, but fit what we do in CI. + for marker in markexpr.split('and'): + marker = marker.strip() + if marker in TARGET_MARKERS: + candidates.add(marker) + + if len(candidates) > 1: + raise ValueError(f'Specified more than one target markers: {candidates}. Please specify no more than one.') + elif len(candidates) == 1: + return candidates.pop() + else: + raise ValueError('Please specify one target marker via "--target [TARGET]" or via "-m [TARGET]"') + + +def merge_junit_files(junit_files: t.List[str], target_path: str) -> None: + if len(junit_files) <= 1: + return + + merged_testsuite: ET.Element = ET.Element('testsuite') + testcases: t.Dict[str, ET.Element] = {} + for junit in junit_files: + logging.info(f'Merging {junit} to {target_path}') + tree: ET.ElementTree = ET.parse(junit) + testsuite: ET.Element = tree.getroot() + + for testcase in testsuite.findall('testcase'): + name: str = testcase.get('name') if testcase.get('name') else '' # type: ignore + + if name not in testcases: + testcases[name] = testcase + merged_testsuite.append(testcase) + continue + + existing_testcase = testcases[name] + for element_name in ['failure', 'error']: + for element in testcase.findall(element_name): + existing_element = existing_testcase.find(element_name) + if existing_element is None: + existing_testcase.append(element) + else: + existing_element.attrib.setdefault('message', '') # type: ignore + existing_element.attrib['message'] += '. ' + element.get('message', '') # type: ignore + + os.remove(junit) + + merged_testsuite.set('tests', str(len(merged_testsuite.findall('testcase')))) + merged_testsuite.set('failures', str(len(merged_testsuite.findall('.//testcase/failure')))) + merged_testsuite.set('errors', str(len(merged_testsuite.findall('.//testcase/error')))) + merged_testsuite.set('skipped', str(len(merged_testsuite.findall('.//testcase/skipped')))) + + with open(target_path, 'wb') as fw: + fw.write(ET.tostring(merged_testsuite))