diff --git a/conftest.py b/conftest.py index 6266c97ef0..988db74085 100644 --- a/conftest.py +++ b/conftest.py @@ -19,21 +19,16 @@ if os.path.join(os.path.dirname(__file__), 'tools', 'ci', 'python_packages') not import glob import io -import json import logging import os import re -import signal -import time import typing as t import zipfile from copy import deepcopy -from telnetlib import Telnet from urllib.parse import quote import common_test_methods # noqa: F401 import gitlab_api -import pexpect import pytest import requests import yaml @@ -58,8 +53,6 @@ from idf_pytest.plugin import IdfPytestEmbedded from idf_pytest.utils import format_case_id from pytest_embedded.plugin import multi_dut_argument from pytest_embedded.plugin import multi_dut_fixture -from pytest_embedded.utils import to_bytes -from pytest_embedded.utils import to_str from pytest_embedded_idf.dut import IdfDut from pytest_embedded_idf.unity_tester import CaseTester @@ -155,113 +148,6 @@ class BuildReportDownloader(AppDownloader): super().download_app(app_build_path, artifact_type) -class OpenOCD: - def __init__(self, dut: 'IdfDut'): - self.MAX_RETRIES = 3 - self.RETRY_DELAY = 1 - self.TELNET_PORT = 4444 - self.dut = dut - self.telnet: t.Optional[Telnet] = None - self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') - self.proc: t.Optional[pexpect.spawn] = None - - def __enter__(self) -> t.Optional['OpenOCD']: - return self.run() - - def __exit__(self, exception_type: t.Any, exception_value: t.Any, exception_traceback: t.Any) -> None: - self.kill() - - def run(self) -> t.Optional['OpenOCD']: - desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') - - try: - with open(desc_path, 'r') as f: - project_desc = json.load(f) - except FileNotFoundError: - logging.error('Project description file not found at %s', desc_path) - raise - - openocd_scripts = os.getenv('OPENOCD_SCRIPTS') - if not openocd_scripts: - raise RuntimeError('OPENOCD_SCRIPTS environment variable is not set.') - - debug_args = project_desc.get('debug_arguments_openocd') - if not debug_args: - raise KeyError("'debug_arguments_openocd' key is missing in project_description.json") - - # For debug purposes, make the value '4' - ocd_env = os.environ.copy() - ocd_env['LIBUSB_DEBUG'] = '1' - - for _ in range(1, self.MAX_RETRIES + 1): - try: - self.proc = pexpect.spawn( - command='openocd', - args=['-s', openocd_scripts] + debug_args.split(), - timeout=5, - encoding='utf-8', - codec_errors='ignore', - env=ocd_env, - ) - if self.proc and self.proc.isalive(): - self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) - self.connect_telnet() - self.write('log_output {}'.format(self.log_file)) - return self - except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT, ConnectionRefusedError) as e: - logging.error('Error running OpenOCD: %s', str(e)) - self.kill() - time.sleep(self.RETRY_DELAY) - - raise RuntimeError('Failed to run OpenOCD after %d attempts.', self.MAX_RETRIES) - - def connect_telnet(self) -> None: - for attempt in range(1, self.MAX_RETRIES + 1): - try: - self.telnet = Telnet('127.0.0.1', self.TELNET_PORT, 5) - break - except ConnectionRefusedError as e: - logging.error('Error telnet connection: %s in attempt:%d', e, attempt) - time.sleep(1) - else: - raise ConnectionRefusedError - - def write(self, s: str) -> t.Any: - if self.telnet is None: - logging.error('Telnet connection is not established.') - return '' - resp = self.telnet.read_very_eager() - self.telnet.write(to_bytes(s, '\n')) - resp += self.telnet.read_until(b'>') - return to_str(resp) - - def apptrace_wait_stop(self, timeout: int = 30) -> None: - stopped = False - end_before = time.time() + timeout - while not stopped: - cmd_out = self.write('esp apptrace status') - for line in cmd_out.splitlines(): - if line.startswith('Tracing is STOPPED.'): - stopped = True - break - if not stopped and time.time() > end_before: - raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') - time.sleep(1) - - def kill(self) -> None: - # Check if the process is still running - if self.proc and self.proc.isalive(): - self.proc.terminate() - self.proc.kill(signal.SIGKILL) - - -@pytest.fixture -def openocd(dut: IdfDut) -> OpenOCD: - if isinstance(dut, tuple): - raise ValueError('Multi-DUT support is not implemented yet') - return OpenOCD(dut) - - @pytest.fixture(scope='session') def app_downloader(pipeline_id: t.Optional[str]) -> t.Optional[AppDownloader]: if not pipeline_id: diff --git a/examples/system/app_trace_basic/pytest_app_trace_basic.py b/examples/system/app_trace_basic/pytest_app_trace_basic.py index bb79aa7dd2..004e07632b 100644 --- a/examples/system/app_trace_basic/pytest_app_trace_basic.py +++ b/examples/system/app_trace_basic/pytest_app_trace_basic.py @@ -1,19 +1,126 @@ # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 +import json +import logging +import os +import signal import time -import typing +from telnetlib import Telnet +from typing import Any +from typing import Optional +import pexpect import pytest +from pytest_embedded.utils import to_bytes +from pytest_embedded.utils import to_str from pytest_embedded_idf import IdfDut from pytest_embedded_idf.utils import idf_parametrize -if typing.TYPE_CHECKING: - from conftest import OpenOCD +MAX_RETRIES = 3 +RETRY_DELAY = 1 +TELNET_PORT = 4444 -def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: +class OpenOCD: + def __init__(self, dut: 'IdfDut'): + self.dut = dut + self.telnet: Optional[Telnet] = None + self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') + self.proc: Optional[pexpect.spawn] = None + + def run(self) -> Optional['OpenOCD']: + desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') + + try: + with open(desc_path, 'r') as f: + project_desc = json.load(f) + except FileNotFoundError: + logging.error('Project description file not found at %s', desc_path) + return None + + openocd_scripts = os.getenv('OPENOCD_SCRIPTS') + if not openocd_scripts: + logging.error('OPENOCD_SCRIPTS environment variable is not set.') + return None + + debug_args = project_desc.get('debug_arguments_openocd') + if not debug_args: + logging.error("'debug_arguments_openocd' key is missing in project_description.json") + return None + + # For debug purposes, make the value '4' + ocd_env = os.environ.copy() + ocd_env['LIBUSB_DEBUG'] = '1' + + for _ in range(1, MAX_RETRIES + 1): + try: + self.proc = pexpect.spawn( + command='openocd', + args=['-s', openocd_scripts] + debug_args.split(), + timeout=5, + encoding='utf-8', + codec_errors='ignore', + env=ocd_env, + ) + if self.proc and self.proc.isalive(): + self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) + return self + except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e: + logging.error('Error running OpenOCD: %s', str(e)) + if self.proc and self.proc.isalive(): + self.proc.terminate() + time.sleep(RETRY_DELAY) + + logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES) + return None + + def connect_telnet(self) -> None: + for attempt in range(1, MAX_RETRIES + 1): + try: + self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5) + break + except ConnectionRefusedError as e: + logging.error('Error telnet connection: %s in attempt:%d', e, attempt) + time.sleep(1) + else: + raise ConnectionRefusedError + + def write(self, s: str) -> Any: + if self.telnet is None: + logging.error('Telnet connection is not established.') + return '' + resp = self.telnet.read_very_eager() + self.telnet.write(to_bytes(s, '\n')) + resp += self.telnet.read_until(b'>') + return to_str(resp) + + def apptrace_wait_stop(self, timeout: int = 30) -> None: + stopped = False + end_before = time.time() + timeout + while not stopped: + cmd_out = self.write('esp apptrace status') + for line in cmd_out.splitlines(): + if line.startswith('Tracing is STOPPED.'): + stopped = True + break + if not stopped and time.time() > end_before: + raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') + time.sleep(1) + + def kill(self) -> None: + # Check if the process is still running + if self.proc and self.proc.isalive(): + self.proc.terminate() + self.proc.kill(signal.SIGKILL) + + +def _test_examples_app_trace_basic(dut: IdfDut) -> None: dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5) - with openocd.run() as openocd: + openocd = OpenOCD(dut).run() + assert openocd + try: + openocd.connect_telnet() + openocd.write('log_output {}'.format(openocd.log_file)) openocd.write('reset run') dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5) time.sleep(1) # wait for APPTRACE_INIT semihosting call @@ -44,15 +151,17 @@ def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: break if found is not True: raise RuntimeError('"{}" could not be found in {}'.format(log_str, 'apptrace.log')) + finally: + openocd.kill() @pytest.mark.jtag @idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) -def test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: - _test_examples_app_trace_basic(openocd, dut) +def test_examples_app_trace_basic(dut: IdfDut) -> None: + _test_examples_app_trace_basic(dut) @pytest.mark.usb_serial_jtag @idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target']) -def test_examples_app_trace_basic_usj(openocd: 'OpenOCD', dut: IdfDut) -> None: - _test_examples_app_trace_basic(openocd, dut) +def test_examples_app_trace_basic_usj(dut: IdfDut) -> None: + _test_examples_app_trace_basic(dut) diff --git a/examples/system/gcov/pytest_gcov.py b/examples/system/gcov/pytest_gcov.py index 759c2fe5f3..59c4763579 100644 --- a/examples/system/gcov/pytest_gcov.py +++ b/examples/system/gcov/pytest_gcov.py @@ -1,23 +1,129 @@ # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 +import json +import logging import os.path +import signal import time -import typing +from telnetlib import Telnet +from typing import Any +from typing import Optional +import pexpect import pytest +from pytest_embedded.utils import to_bytes +from pytest_embedded.utils import to_str from pytest_embedded_idf import IdfDut from pytest_embedded_idf.utils import idf_parametrize -if typing.TYPE_CHECKING: - from conftest import OpenOCD +MAX_RETRIES = 3 +RETRY_DELAY = 1 +TELNET_PORT = 4444 -def _test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None: +class OpenOCD: + def __init__(self, dut: 'IdfDut'): + self.dut = dut + self.telnet: Optional[Telnet] = None + self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') + self.proc: Optional[pexpect.spawn] = None + + def run(self) -> Optional['OpenOCD']: + desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') + + try: + with open(desc_path, 'r') as f: + project_desc = json.load(f) + except FileNotFoundError: + logging.error('Project description file not found at %s', desc_path) + return None + + openocd_scripts = os.getenv('OPENOCD_SCRIPTS') + if not openocd_scripts: + logging.error('OPENOCD_SCRIPTS environment variable is not set.') + return None + + debug_args = project_desc.get('debug_arguments_openocd') + if not debug_args: + logging.error("'debug_arguments_openocd' key is missing in project_description.json") + return None + + # For debug purposes, make the value '4' + ocd_env = os.environ.copy() + ocd_env['LIBUSB_DEBUG'] = '1' + + for _ in range(1, MAX_RETRIES + 1): + try: + self.proc = pexpect.spawn( + command='openocd', + args=['-s', openocd_scripts] + debug_args.split(), + timeout=5, + encoding='utf-8', + codec_errors='ignore', + env=ocd_env, + ) + if self.proc and self.proc.isalive(): + self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) + return self + except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e: + logging.error('Error running OpenOCD: %s', str(e)) + if self.proc and self.proc.isalive(): + self.proc.terminate() + time.sleep(RETRY_DELAY) + + logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES) + return None + + def connect_telnet(self) -> None: + for attempt in range(1, MAX_RETRIES + 1): + try: + self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5) + break + except ConnectionRefusedError as e: + logging.error('Error telnet connection: %s in attempt:%d', e, attempt) + time.sleep(1) + else: + raise ConnectionRefusedError + + def write(self, s: str) -> Any: + if self.telnet is None: + logging.error('Telnet connection is not established.') + return '' + resp = self.telnet.read_very_eager() + self.telnet.write(to_bytes(s, '\n')) + resp += self.telnet.read_until(b'>') + return to_str(resp) + + def apptrace_wait_stop(self, timeout: int = 30) -> None: + stopped = False + end_before = time.time() + timeout + while not stopped: + cmd_out = self.write('esp apptrace status') + for line in cmd_out.splitlines(): + if line.startswith('Tracing is STOPPED.'): + stopped = True + break + if not stopped and time.time() > end_before: + raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') + time.sleep(1) + + def kill(self) -> None: + # Check if the process is still running + if self.proc and self.proc.isalive(): + self.proc.terminate() + self.proc.kill(signal.SIGKILL) + + +def _test_gcov(dut: IdfDut) -> None: # create the generated .gcda folder, otherwise would have error: failed to open file. # normally this folder would be created via `idf.py build`. but in CI the non-related files would not be preserved os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir'), exist_ok=True) os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'sample', 'CMakeFiles', '__idf_sample.dir'), exist_ok=True) + dut.expect_exact('example: Ready for OpenOCD connection', timeout=5) + openocd = OpenOCD(dut).run() + assert openocd + def expect_counter_output(loop: int, timeout: int = 10) -> None: dut.expect_exact( [f'blink_dummy_func: Counter = {loop}', f'some_dummy_func: Counter = {loop * 2}'], @@ -47,8 +153,9 @@ def _test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None: assert len(expect_lines) == 0 - dut.expect_exact('example: Ready for OpenOCD connection', timeout=5) - with openocd.run() as openocd: + try: + openocd.connect_telnet() + openocd.write('log_output {}'.format(openocd.log_file)) openocd.write('reset run') dut.expect_exact('example: Ready for OpenOCD connection', timeout=5) @@ -70,15 +177,17 @@ def _test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None: time.sleep(1) # Test instant run-time dump dump_coverage('esp gcov') + finally: + openocd.kill() @pytest.mark.jtag @idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) -def test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None: - _test_gcov(openocd, dut) +def test_gcov(dut: IdfDut) -> None: + _test_gcov(dut) @pytest.mark.usb_serial_jtag @idf_parametrize('target', ['esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target']) -def test_gcov_usj(openocd: 'OpenOCD', dut: IdfDut) -> None: - _test_gcov(openocd, dut) +def test_gcov_usj(dut: IdfDut) -> None: + _test_gcov(dut) diff --git a/examples/system/sysview_tracing/pytest_sysview_tracing.py b/examples/system/sysview_tracing/pytest_sysview_tracing.py index 3a3ff92e5f..015e030d55 100644 --- a/examples/system/sysview_tracing/pytest_sysview_tracing.py +++ b/examples/system/sysview_tracing/pytest_sysview_tracing.py @@ -1,20 +1,121 @@ # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 +import json +import logging import os.path import re +import signal import time -import typing +from telnetlib import Telnet +from typing import Any +from typing import Optional import pexpect import pytest +from pytest_embedded.utils import to_bytes +from pytest_embedded.utils import to_str from pytest_embedded_idf import IdfDut from pytest_embedded_idf.utils import idf_parametrize -if typing.TYPE_CHECKING: - from conftest import OpenOCD +MAX_RETRIES = 3 +RETRY_DELAY = 1 +TELNET_PORT = 4444 -def _test_examples_sysview_tracing(openocd: 'OpenOCD', dut: IdfDut) -> None: +class OpenOCD: + def __init__(self, dut: 'IdfDut'): + self.dut = dut + self.telnet: Optional[Telnet] = None + self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') + self.proc: Optional[pexpect.spawn] = None + + def run(self) -> Optional['OpenOCD']: + desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') + + try: + with open(desc_path, 'r') as f: + project_desc = json.load(f) + except FileNotFoundError: + logging.error('Project description file not found at %s', desc_path) + return None + + openocd_scripts = os.getenv('OPENOCD_SCRIPTS') + if not openocd_scripts: + logging.error('OPENOCD_SCRIPTS environment variable is not set.') + return None + + debug_args = project_desc.get('debug_arguments_openocd') + if not debug_args: + logging.error("'debug_arguments_openocd' key is missing in project_description.json") + return None + + # For debug purposes, make the value '4' + ocd_env = os.environ.copy() + ocd_env['LIBUSB_DEBUG'] = '1' + + for _ in range(1, MAX_RETRIES + 1): + try: + self.proc = pexpect.spawn( + command='openocd', + args=['-s', openocd_scripts] + debug_args.split(), + timeout=5, + encoding='utf-8', + codec_errors='ignore', + env=ocd_env, + ) + if self.proc and self.proc.isalive(): + self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) + return self + except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e: + logging.error('Error running OpenOCD: %s', str(e)) + if self.proc and self.proc.isalive(): + self.proc.terminate() + time.sleep(RETRY_DELAY) + + logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES) + return None + + def connect_telnet(self) -> None: + for attempt in range(1, MAX_RETRIES + 1): + try: + self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5) + break + except ConnectionRefusedError as e: + logging.error('Error telnet connection: %s in attempt:%d', e, attempt) + time.sleep(1) + else: + raise ConnectionRefusedError + + def write(self, s: str) -> Any: + if self.telnet is None: + logging.error('Telnet connection is not established.') + return '' + resp = self.telnet.read_very_eager() + self.telnet.write(to_bytes(s, '\n')) + resp += self.telnet.read_until(b'>') + return to_str(resp) + + def apptrace_wait_stop(self, timeout: int = 30) -> None: + stopped = False + end_before = time.time() + timeout + while not stopped: + cmd_out = self.write('esp apptrace status') + for line in cmd_out.splitlines(): + if line.startswith('Tracing is STOPPED.'): + stopped = True + break + if not stopped and time.time() > end_before: + raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') + time.sleep(1) + + def kill(self) -> None: + # Check if the process is still running + if self.proc and self.proc.isalive(): + self.proc.terminate() + self.proc.kill(signal.SIGKILL) + + +def _test_examples_sysview_tracing(dut: IdfDut) -> None: # Construct trace log paths trace_log = [ os.path.join(dut.logdir, 'sys_log0.svdat') # pylint: disable=protected-access @@ -38,29 +139,37 @@ def _test_examples_sysview_tracing(openocd: 'OpenOCD', dut: IdfDut) -> None: dut.expect(re.compile(rb'example: Task\[0x[0-9A-Fa-f]+\]: received event \d+'), timeout=30) dut.expect_exact('example: Ready for OpenOCD connection', timeout=5) - with openocd.run() as openocd, open(gdb_logfile, 'w') as gdb_log, pexpect.spawn( - f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}', - timeout=60, - logfile=gdb_log, - encoding='utf-8', - codec_errors='ignore', - ) as p: - p.expect_exact('hit Breakpoint 1, app_main ()') - dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect() - dut_expect_task_event() + openocd = OpenOCD(dut).run() + assert openocd + try: + openocd.connect_telnet() + openocd.write('log_output {}'.format(openocd.log_file)) - # Do a sleep while sysview samples are captured. - time.sleep(3) - openocd.write('esp sysview stop') + with open(gdb_logfile, 'w') as gdb_log, pexpect.spawn( + f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}', + timeout=60, + logfile=gdb_log, + encoding='utf-8', + codec_errors='ignore', + ) as p: + p.expect_exact('hit Breakpoint 1, app_main ()') + dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect() + dut_expect_task_event() + + # Do a sleep while sysview samples are captured. + time.sleep(3) + openocd.write('esp sysview stop') + finally: + openocd.kill() @pytest.mark.jtag @idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) -def test_examples_sysview_tracing(openocd: 'OpenOCD', dut: IdfDut) -> None: - _test_examples_sysview_tracing(openocd, dut) +def test_examples_sysview_tracing(dut: IdfDut) -> None: + _test_examples_sysview_tracing(dut) @pytest.mark.usb_serial_jtag @idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target']) -def test_examples_sysview_tracing_usj(openocd: 'OpenOCD', dut: IdfDut) -> None: - _test_examples_sysview_tracing(openocd, dut) +def test_examples_sysview_tracing_usj(dut: IdfDut) -> None: + _test_examples_sysview_tracing(dut) diff --git a/examples/system/sysview_tracing_heap_log/pytest_sysview_tracing_heap_log.py b/examples/system/sysview_tracing_heap_log/pytest_sysview_tracing_heap_log.py index 8063fe3b6a..748733e202 100644 --- a/examples/system/sysview_tracing_heap_log/pytest_sysview_tracing_heap_log.py +++ b/examples/system/sysview_tracing_heap_log/pytest_sysview_tracing_heap_log.py @@ -1,18 +1,120 @@ # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 +import json +import logging import os.path -import typing +import signal +import time +from telnetlib import Telnet +from typing import Any +from typing import Optional import pexpect.fdpexpect import pytest +from pytest_embedded.utils import to_bytes +from pytest_embedded.utils import to_str from pytest_embedded_idf import IdfDut from pytest_embedded_idf.utils import idf_parametrize -if typing.TYPE_CHECKING: - from conftest import OpenOCD +MAX_RETRIES = 3 +RETRY_DELAY = 1 +TELNET_PORT = 4444 -def _test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, dut: IdfDut) -> None: +class OpenOCD: + def __init__(self, dut: 'IdfDut'): + self.dut = dut + self.telnet: Optional[Telnet] = None + self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') + self.proc: Optional[pexpect.spawn] = None + + def run(self) -> Optional['OpenOCD']: + desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') + + try: + with open(desc_path, 'r') as f: + project_desc = json.load(f) + except FileNotFoundError: + logging.error('Project description file not found at %s', desc_path) + return None + + openocd_scripts = os.getenv('OPENOCD_SCRIPTS') + if not openocd_scripts: + logging.error('OPENOCD_SCRIPTS environment variable is not set.') + return None + + debug_args = project_desc.get('debug_arguments_openocd') + if not debug_args: + logging.error("'debug_arguments_openocd' key is missing in project_description.json") + return None + + # For debug purposes, make the value '4' + ocd_env = os.environ.copy() + ocd_env['LIBUSB_DEBUG'] = '1' + + for _ in range(1, MAX_RETRIES + 1): + try: + self.proc = pexpect.spawn( + command='openocd', + args=['-s', openocd_scripts] + debug_args.split(), + timeout=5, + encoding='utf-8', + codec_errors='ignore', + env=ocd_env, + ) + if self.proc and self.proc.isalive(): + self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) + return self + except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e: + logging.error('Error running OpenOCD: %s', str(e)) + if self.proc and self.proc.isalive(): + self.proc.terminate() + time.sleep(RETRY_DELAY) + + logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES) + return None + + def connect_telnet(self) -> None: + for attempt in range(1, MAX_RETRIES + 1): + try: + self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5) + break + except ConnectionRefusedError as e: + logging.error('Error telnet connection: %s in attempt:%d', e, attempt) + time.sleep(1) + else: + raise ConnectionRefusedError + + def write(self, s: str) -> Any: + if self.telnet is None: + logging.error('Telnet connection is not established.') + return '' + resp = self.telnet.read_very_eager() + self.telnet.write(to_bytes(s, '\n')) + resp += self.telnet.read_until(b'>') + return to_str(resp) + + def apptrace_wait_stop(self, timeout: int = 30) -> None: + stopped = False + end_before = time.time() + timeout + while not stopped: + cmd_out = self.write('esp apptrace status') + for line in cmd_out.splitlines(): + if line.startswith('Tracing is STOPPED.'): + stopped = True + break + if not stopped and time.time() > end_before: + raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') + time.sleep(1) + + def kill(self) -> None: + # Check if the process is still running + if self.proc and self.proc.isalive(): + self.proc.terminate() + self.proc.kill(signal.SIGKILL) + + +def _test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None: # Construct trace log paths trace_log = [ os.path.join(dut.logdir, 'heap_log0.svdat') # pylint: disable=protected-access @@ -33,15 +135,23 @@ def _test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, d f_w.write(line) dut.expect_exact('example: Ready for OpenOCD connection', timeout=5) - with openocd.run() as openocd, open(gdb_logfile, 'w') as gdb_log, pexpect.spawn( - f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}', - timeout=60, - logfile=gdb_log, - encoding='utf-8', - codec_errors='ignore', - ) as p: - # Wait for sysview files to be generated - p.expect_exact('Tracing is STOPPED') + openocd = OpenOCD(dut).run() + assert openocd + try: + openocd.connect_telnet() + openocd.write('log_output {}'.format(openocd.log_file)) + + with open(gdb_logfile, 'w') as gdb_log, pexpect.spawn( + f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}', + timeout=60, + logfile=gdb_log, + encoding='utf-8', + codec_errors='ignore', + ) as p: + # Wait for sysview files to be generated + p.expect_exact('Tracing is STOPPED') + finally: + openocd.kill() # Process sysview trace logs command = [os.path.join(idf_path, 'tools', 'esp_app_trace', 'sysviewtrace_proc.py'), '-p'] + trace_log @@ -60,12 +170,12 @@ def _test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, d @pytest.mark.parametrize('config', ['app_trace_jtag'], indirect=True) @pytest.mark.jtag @idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) -def test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, dut: IdfDut) -> None: - _test_examples_sysview_tracing_heap_log(openocd, idf_path, dut) +def test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None: + _test_examples_sysview_tracing_heap_log(idf_path, dut) @pytest.mark.parametrize('config', ['app_trace_jtag'], indirect=True) @pytest.mark.usb_serial_jtag @idf_parametrize('target', ['esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target']) -def test_examples_sysview_tracing_heap_log_usj(openocd: 'OpenOCD', idf_path: str, dut: IdfDut) -> None: - _test_examples_sysview_tracing_heap_log(openocd, idf_path, dut) +def test_examples_sysview_tracing_heap_log_usj(idf_path: str, dut: IdfDut) -> None: + _test_examples_sysview_tracing_heap_log(idf_path, dut)