Revert "ci: OpenOCD class as fixture"

This reverts commit 0eb74ffcc3.
This commit is contained in:
Marius Vikhammer
2025-05-28 08:38:08 +02:00
parent 4c3b989ce5
commit 24d2720793
5 changed files with 494 additions and 171 deletions

View File

@ -19,21 +19,16 @@ if os.path.join(os.path.dirname(__file__), 'tools', 'ci', 'python_packages') not
import glob
import io
import json
import logging
import os
import re
import signal
import time
import typing as t
import zipfile
from copy import deepcopy
from telnetlib import Telnet
from urllib.parse import quote
import common_test_methods # noqa: F401
import gitlab_api
import pexpect
import pytest
import requests
import yaml
@ -58,8 +53,6 @@ from idf_pytest.plugin import IdfPytestEmbedded
from idf_pytest.utils import format_case_id
from pytest_embedded.plugin import multi_dut_argument
from pytest_embedded.plugin import multi_dut_fixture
from pytest_embedded.utils import to_bytes
from pytest_embedded.utils import to_str
from pytest_embedded_idf.dut import IdfDut
from pytest_embedded_idf.unity_tester import CaseTester
@ -155,113 +148,6 @@ class BuildReportDownloader(AppDownloader):
super().download_app(app_build_path, artifact_type)
class OpenOCD:
def __init__(self, dut: 'IdfDut'):
self.MAX_RETRIES = 3
self.RETRY_DELAY = 1
self.TELNET_PORT = 4444
self.dut = dut
self.telnet: t.Optional[Telnet] = None
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
self.proc: t.Optional[pexpect.spawn] = None
def __enter__(self) -> t.Optional['OpenOCD']:
return self.run()
def __exit__(self, exception_type: t.Any, exception_value: t.Any, exception_traceback: t.Any) -> None:
self.kill()
def run(self) -> t.Optional['OpenOCD']:
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
try:
with open(desc_path, 'r') as f:
project_desc = json.load(f)
except FileNotFoundError:
logging.error('Project description file not found at %s', desc_path)
raise
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
if not openocd_scripts:
raise RuntimeError('OPENOCD_SCRIPTS environment variable is not set.')
debug_args = project_desc.get('debug_arguments_openocd')
if not debug_args:
raise KeyError("'debug_arguments_openocd' key is missing in project_description.json")
# For debug purposes, make the value '4'
ocd_env = os.environ.copy()
ocd_env['LIBUSB_DEBUG'] = '1'
for _ in range(1, self.MAX_RETRIES + 1):
try:
self.proc = pexpect.spawn(
command='openocd',
args=['-s', openocd_scripts] + debug_args.split(),
timeout=5,
encoding='utf-8',
codec_errors='ignore',
env=ocd_env,
)
if self.proc and self.proc.isalive():
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
self.connect_telnet()
self.write('log_output {}'.format(self.log_file))
return self
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT, ConnectionRefusedError) as e:
logging.error('Error running OpenOCD: %s', str(e))
self.kill()
time.sleep(self.RETRY_DELAY)
raise RuntimeError('Failed to run OpenOCD after %d attempts.', self.MAX_RETRIES)
def connect_telnet(self) -> None:
for attempt in range(1, self.MAX_RETRIES + 1):
try:
self.telnet = Telnet('127.0.0.1', self.TELNET_PORT, 5)
break
except ConnectionRefusedError as e:
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
time.sleep(1)
else:
raise ConnectionRefusedError
def write(self, s: str) -> t.Any:
if self.telnet is None:
logging.error('Telnet connection is not established.')
return ''
resp = self.telnet.read_very_eager()
self.telnet.write(to_bytes(s, '\n'))
resp += self.telnet.read_until(b'>')
return to_str(resp)
def apptrace_wait_stop(self, timeout: int = 30) -> None:
stopped = False
end_before = time.time() + timeout
while not stopped:
cmd_out = self.write('esp apptrace status')
for line in cmd_out.splitlines():
if line.startswith('Tracing is STOPPED.'):
stopped = True
break
if not stopped and time.time() > end_before:
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
time.sleep(1)
def kill(self) -> None:
# Check if the process is still running
if self.proc and self.proc.isalive():
self.proc.terminate()
self.proc.kill(signal.SIGKILL)
@pytest.fixture
def openocd(dut: IdfDut) -> OpenOCD:
if isinstance(dut, tuple):
raise ValueError('Multi-DUT support is not implemented yet')
return OpenOCD(dut)
@pytest.fixture(scope='session')
def app_downloader(pipeline_id: t.Optional[str]) -> t.Optional[AppDownloader]:
if not pipeline_id:

View File

@ -1,19 +1,126 @@
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import json
import logging
import os
import signal
import time
import typing
from telnetlib import Telnet
from typing import Any
from typing import Optional
import pexpect
import pytest
from pytest_embedded.utils import to_bytes
from pytest_embedded.utils import to_str
from pytest_embedded_idf import IdfDut
from pytest_embedded_idf.utils import idf_parametrize
if typing.TYPE_CHECKING:
from conftest import OpenOCD
MAX_RETRIES = 3
RETRY_DELAY = 1
TELNET_PORT = 4444
def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None:
class OpenOCD:
def __init__(self, dut: 'IdfDut'):
self.dut = dut
self.telnet: Optional[Telnet] = None
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
self.proc: Optional[pexpect.spawn] = None
def run(self) -> Optional['OpenOCD']:
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
try:
with open(desc_path, 'r') as f:
project_desc = json.load(f)
except FileNotFoundError:
logging.error('Project description file not found at %s', desc_path)
return None
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
if not openocd_scripts:
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
return None
debug_args = project_desc.get('debug_arguments_openocd')
if not debug_args:
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
return None
# For debug purposes, make the value '4'
ocd_env = os.environ.copy()
ocd_env['LIBUSB_DEBUG'] = '1'
for _ in range(1, MAX_RETRIES + 1):
try:
self.proc = pexpect.spawn(
command='openocd',
args=['-s', openocd_scripts] + debug_args.split(),
timeout=5,
encoding='utf-8',
codec_errors='ignore',
env=ocd_env,
)
if self.proc and self.proc.isalive():
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
return self
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
logging.error('Error running OpenOCD: %s', str(e))
if self.proc and self.proc.isalive():
self.proc.terminate()
time.sleep(RETRY_DELAY)
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
return None
def connect_telnet(self) -> None:
for attempt in range(1, MAX_RETRIES + 1):
try:
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
break
except ConnectionRefusedError as e:
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
time.sleep(1)
else:
raise ConnectionRefusedError
def write(self, s: str) -> Any:
if self.telnet is None:
logging.error('Telnet connection is not established.')
return ''
resp = self.telnet.read_very_eager()
self.telnet.write(to_bytes(s, '\n'))
resp += self.telnet.read_until(b'>')
return to_str(resp)
def apptrace_wait_stop(self, timeout: int = 30) -> None:
stopped = False
end_before = time.time() + timeout
while not stopped:
cmd_out = self.write('esp apptrace status')
for line in cmd_out.splitlines():
if line.startswith('Tracing is STOPPED.'):
stopped = True
break
if not stopped and time.time() > end_before:
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
time.sleep(1)
def kill(self) -> None:
# Check if the process is still running
if self.proc and self.proc.isalive():
self.proc.terminate()
self.proc.kill(signal.SIGKILL)
def _test_examples_app_trace_basic(dut: IdfDut) -> None:
dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5)
with openocd.run() as openocd:
openocd = OpenOCD(dut).run()
assert openocd
try:
openocd.connect_telnet()
openocd.write('log_output {}'.format(openocd.log_file))
openocd.write('reset run')
dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5)
time.sleep(1) # wait for APPTRACE_INIT semihosting call
@ -44,15 +151,17 @@ def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None:
break
if found is not True:
raise RuntimeError('"{}" could not be found in {}'.format(log_str, 'apptrace.log'))
finally:
openocd.kill()
@pytest.mark.jtag
@idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target'])
def test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None:
_test_examples_app_trace_basic(openocd, dut)
def test_examples_app_trace_basic(dut: IdfDut) -> None:
_test_examples_app_trace_basic(dut)
@pytest.mark.usb_serial_jtag
@idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target'])
def test_examples_app_trace_basic_usj(openocd: 'OpenOCD', dut: IdfDut) -> None:
_test_examples_app_trace_basic(openocd, dut)
def test_examples_app_trace_basic_usj(dut: IdfDut) -> None:
_test_examples_app_trace_basic(dut)

View File

@ -1,23 +1,129 @@
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import json
import logging
import os.path
import signal
import time
import typing
from telnetlib import Telnet
from typing import Any
from typing import Optional
import pexpect
import pytest
from pytest_embedded.utils import to_bytes
from pytest_embedded.utils import to_str
from pytest_embedded_idf import IdfDut
from pytest_embedded_idf.utils import idf_parametrize
if typing.TYPE_CHECKING:
from conftest import OpenOCD
MAX_RETRIES = 3
RETRY_DELAY = 1
TELNET_PORT = 4444
def _test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None:
class OpenOCD:
def __init__(self, dut: 'IdfDut'):
self.dut = dut
self.telnet: Optional[Telnet] = None
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
self.proc: Optional[pexpect.spawn] = None
def run(self) -> Optional['OpenOCD']:
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
try:
with open(desc_path, 'r') as f:
project_desc = json.load(f)
except FileNotFoundError:
logging.error('Project description file not found at %s', desc_path)
return None
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
if not openocd_scripts:
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
return None
debug_args = project_desc.get('debug_arguments_openocd')
if not debug_args:
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
return None
# For debug purposes, make the value '4'
ocd_env = os.environ.copy()
ocd_env['LIBUSB_DEBUG'] = '1'
for _ in range(1, MAX_RETRIES + 1):
try:
self.proc = pexpect.spawn(
command='openocd',
args=['-s', openocd_scripts] + debug_args.split(),
timeout=5,
encoding='utf-8',
codec_errors='ignore',
env=ocd_env,
)
if self.proc and self.proc.isalive():
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
return self
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
logging.error('Error running OpenOCD: %s', str(e))
if self.proc and self.proc.isalive():
self.proc.terminate()
time.sleep(RETRY_DELAY)
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
return None
def connect_telnet(self) -> None:
for attempt in range(1, MAX_RETRIES + 1):
try:
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
break
except ConnectionRefusedError as e:
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
time.sleep(1)
else:
raise ConnectionRefusedError
def write(self, s: str) -> Any:
if self.telnet is None:
logging.error('Telnet connection is not established.')
return ''
resp = self.telnet.read_very_eager()
self.telnet.write(to_bytes(s, '\n'))
resp += self.telnet.read_until(b'>')
return to_str(resp)
def apptrace_wait_stop(self, timeout: int = 30) -> None:
stopped = False
end_before = time.time() + timeout
while not stopped:
cmd_out = self.write('esp apptrace status')
for line in cmd_out.splitlines():
if line.startswith('Tracing is STOPPED.'):
stopped = True
break
if not stopped and time.time() > end_before:
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
time.sleep(1)
def kill(self) -> None:
# Check if the process is still running
if self.proc and self.proc.isalive():
self.proc.terminate()
self.proc.kill(signal.SIGKILL)
def _test_gcov(dut: IdfDut) -> None:
# create the generated .gcda folder, otherwise would have error: failed to open file.
# normally this folder would be created via `idf.py build`. but in CI the non-related files would not be preserved
os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir'), exist_ok=True)
os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'sample', 'CMakeFiles', '__idf_sample.dir'), exist_ok=True)
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
openocd = OpenOCD(dut).run()
assert openocd
def expect_counter_output(loop: int, timeout: int = 10) -> None:
dut.expect_exact(
[f'blink_dummy_func: Counter = {loop}', f'some_dummy_func: Counter = {loop * 2}'],
@ -47,8 +153,9 @@ def _test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None:
assert len(expect_lines) == 0
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
with openocd.run() as openocd:
try:
openocd.connect_telnet()
openocd.write('log_output {}'.format(openocd.log_file))
openocd.write('reset run')
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
@ -70,15 +177,17 @@ def _test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None:
time.sleep(1)
# Test instant run-time dump
dump_coverage('esp gcov')
finally:
openocd.kill()
@pytest.mark.jtag
@idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target'])
def test_gcov(openocd: 'OpenOCD', dut: IdfDut) -> None:
_test_gcov(openocd, dut)
def test_gcov(dut: IdfDut) -> None:
_test_gcov(dut)
@pytest.mark.usb_serial_jtag
@idf_parametrize('target', ['esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target'])
def test_gcov_usj(openocd: 'OpenOCD', dut: IdfDut) -> None:
_test_gcov(openocd, dut)
def test_gcov_usj(dut: IdfDut) -> None:
_test_gcov(dut)

View File

@ -1,20 +1,121 @@
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import json
import logging
import os.path
import re
import signal
import time
import typing
from telnetlib import Telnet
from typing import Any
from typing import Optional
import pexpect
import pytest
from pytest_embedded.utils import to_bytes
from pytest_embedded.utils import to_str
from pytest_embedded_idf import IdfDut
from pytest_embedded_idf.utils import idf_parametrize
if typing.TYPE_CHECKING:
from conftest import OpenOCD
MAX_RETRIES = 3
RETRY_DELAY = 1
TELNET_PORT = 4444
def _test_examples_sysview_tracing(openocd: 'OpenOCD', dut: IdfDut) -> None:
class OpenOCD:
def __init__(self, dut: 'IdfDut'):
self.dut = dut
self.telnet: Optional[Telnet] = None
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
self.proc: Optional[pexpect.spawn] = None
def run(self) -> Optional['OpenOCD']:
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
try:
with open(desc_path, 'r') as f:
project_desc = json.load(f)
except FileNotFoundError:
logging.error('Project description file not found at %s', desc_path)
return None
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
if not openocd_scripts:
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
return None
debug_args = project_desc.get('debug_arguments_openocd')
if not debug_args:
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
return None
# For debug purposes, make the value '4'
ocd_env = os.environ.copy()
ocd_env['LIBUSB_DEBUG'] = '1'
for _ in range(1, MAX_RETRIES + 1):
try:
self.proc = pexpect.spawn(
command='openocd',
args=['-s', openocd_scripts] + debug_args.split(),
timeout=5,
encoding='utf-8',
codec_errors='ignore',
env=ocd_env,
)
if self.proc and self.proc.isalive():
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
return self
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
logging.error('Error running OpenOCD: %s', str(e))
if self.proc and self.proc.isalive():
self.proc.terminate()
time.sleep(RETRY_DELAY)
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
return None
def connect_telnet(self) -> None:
for attempt in range(1, MAX_RETRIES + 1):
try:
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
break
except ConnectionRefusedError as e:
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
time.sleep(1)
else:
raise ConnectionRefusedError
def write(self, s: str) -> Any:
if self.telnet is None:
logging.error('Telnet connection is not established.')
return ''
resp = self.telnet.read_very_eager()
self.telnet.write(to_bytes(s, '\n'))
resp += self.telnet.read_until(b'>')
return to_str(resp)
def apptrace_wait_stop(self, timeout: int = 30) -> None:
stopped = False
end_before = time.time() + timeout
while not stopped:
cmd_out = self.write('esp apptrace status')
for line in cmd_out.splitlines():
if line.startswith('Tracing is STOPPED.'):
stopped = True
break
if not stopped and time.time() > end_before:
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
time.sleep(1)
def kill(self) -> None:
# Check if the process is still running
if self.proc and self.proc.isalive():
self.proc.terminate()
self.proc.kill(signal.SIGKILL)
def _test_examples_sysview_tracing(dut: IdfDut) -> None:
# Construct trace log paths
trace_log = [
os.path.join(dut.logdir, 'sys_log0.svdat') # pylint: disable=protected-access
@ -38,29 +139,37 @@ def _test_examples_sysview_tracing(openocd: 'OpenOCD', dut: IdfDut) -> None:
dut.expect(re.compile(rb'example: Task\[0x[0-9A-Fa-f]+\]: received event \d+'), timeout=30)
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
with openocd.run() as openocd, open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
timeout=60,
logfile=gdb_log,
encoding='utf-8',
codec_errors='ignore',
) as p:
p.expect_exact('hit Breakpoint 1, app_main ()')
dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect()
dut_expect_task_event()
openocd = OpenOCD(dut).run()
assert openocd
try:
openocd.connect_telnet()
openocd.write('log_output {}'.format(openocd.log_file))
# Do a sleep while sysview samples are captured.
time.sleep(3)
openocd.write('esp sysview stop')
with open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
timeout=60,
logfile=gdb_log,
encoding='utf-8',
codec_errors='ignore',
) as p:
p.expect_exact('hit Breakpoint 1, app_main ()')
dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect()
dut_expect_task_event()
# Do a sleep while sysview samples are captured.
time.sleep(3)
openocd.write('esp sysview stop')
finally:
openocd.kill()
@pytest.mark.jtag
@idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target'])
def test_examples_sysview_tracing(openocd: 'OpenOCD', dut: IdfDut) -> None:
_test_examples_sysview_tracing(openocd, dut)
def test_examples_sysview_tracing(dut: IdfDut) -> None:
_test_examples_sysview_tracing(dut)
@pytest.mark.usb_serial_jtag
@idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target'])
def test_examples_sysview_tracing_usj(openocd: 'OpenOCD', dut: IdfDut) -> None:
_test_examples_sysview_tracing(openocd, dut)
def test_examples_sysview_tracing_usj(dut: IdfDut) -> None:
_test_examples_sysview_tracing(dut)

View File

@ -1,18 +1,120 @@
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import json
import logging
import os.path
import typing
import signal
import time
from telnetlib import Telnet
from typing import Any
from typing import Optional
import pexpect.fdpexpect
import pytest
from pytest_embedded.utils import to_bytes
from pytest_embedded.utils import to_str
from pytest_embedded_idf import IdfDut
from pytest_embedded_idf.utils import idf_parametrize
if typing.TYPE_CHECKING:
from conftest import OpenOCD
MAX_RETRIES = 3
RETRY_DELAY = 1
TELNET_PORT = 4444
def _test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, dut: IdfDut) -> None:
class OpenOCD:
def __init__(self, dut: 'IdfDut'):
self.dut = dut
self.telnet: Optional[Telnet] = None
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
self.proc: Optional[pexpect.spawn] = None
def run(self) -> Optional['OpenOCD']:
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
try:
with open(desc_path, 'r') as f:
project_desc = json.load(f)
except FileNotFoundError:
logging.error('Project description file not found at %s', desc_path)
return None
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
if not openocd_scripts:
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
return None
debug_args = project_desc.get('debug_arguments_openocd')
if not debug_args:
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
return None
# For debug purposes, make the value '4'
ocd_env = os.environ.copy()
ocd_env['LIBUSB_DEBUG'] = '1'
for _ in range(1, MAX_RETRIES + 1):
try:
self.proc = pexpect.spawn(
command='openocd',
args=['-s', openocd_scripts] + debug_args.split(),
timeout=5,
encoding='utf-8',
codec_errors='ignore',
env=ocd_env,
)
if self.proc and self.proc.isalive():
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
return self
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
logging.error('Error running OpenOCD: %s', str(e))
if self.proc and self.proc.isalive():
self.proc.terminate()
time.sleep(RETRY_DELAY)
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
return None
def connect_telnet(self) -> None:
for attempt in range(1, MAX_RETRIES + 1):
try:
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
break
except ConnectionRefusedError as e:
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
time.sleep(1)
else:
raise ConnectionRefusedError
def write(self, s: str) -> Any:
if self.telnet is None:
logging.error('Telnet connection is not established.')
return ''
resp = self.telnet.read_very_eager()
self.telnet.write(to_bytes(s, '\n'))
resp += self.telnet.read_until(b'>')
return to_str(resp)
def apptrace_wait_stop(self, timeout: int = 30) -> None:
stopped = False
end_before = time.time() + timeout
while not stopped:
cmd_out = self.write('esp apptrace status')
for line in cmd_out.splitlines():
if line.startswith('Tracing is STOPPED.'):
stopped = True
break
if not stopped and time.time() > end_before:
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
time.sleep(1)
def kill(self) -> None:
# Check if the process is still running
if self.proc and self.proc.isalive():
self.proc.terminate()
self.proc.kill(signal.SIGKILL)
def _test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
# Construct trace log paths
trace_log = [
os.path.join(dut.logdir, 'heap_log0.svdat') # pylint: disable=protected-access
@ -33,15 +135,23 @@ def _test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, d
f_w.write(line)
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
with openocd.run() as openocd, open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
timeout=60,
logfile=gdb_log,
encoding='utf-8',
codec_errors='ignore',
) as p:
# Wait for sysview files to be generated
p.expect_exact('Tracing is STOPPED')
openocd = OpenOCD(dut).run()
assert openocd
try:
openocd.connect_telnet()
openocd.write('log_output {}'.format(openocd.log_file))
with open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
timeout=60,
logfile=gdb_log,
encoding='utf-8',
codec_errors='ignore',
) as p:
# Wait for sysview files to be generated
p.expect_exact('Tracing is STOPPED')
finally:
openocd.kill()
# Process sysview trace logs
command = [os.path.join(idf_path, 'tools', 'esp_app_trace', 'sysviewtrace_proc.py'), '-p'] + trace_log
@ -60,12 +170,12 @@ def _test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, d
@pytest.mark.parametrize('config', ['app_trace_jtag'], indirect=True)
@pytest.mark.jtag
@idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target'])
def test_examples_sysview_tracing_heap_log(openocd: 'OpenOCD', idf_path: str, dut: IdfDut) -> None:
_test_examples_sysview_tracing_heap_log(openocd, idf_path, dut)
def test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
_test_examples_sysview_tracing_heap_log(idf_path, dut)
@pytest.mark.parametrize('config', ['app_trace_jtag'], indirect=True)
@pytest.mark.usb_serial_jtag
@idf_parametrize('target', ['esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target'])
def test_examples_sysview_tracing_heap_log_usj(openocd: 'OpenOCD', idf_path: str, dut: IdfDut) -> None:
_test_examples_sysview_tracing_heap_log(openocd, idf_path, dut)
def test_examples_sysview_tracing_heap_log_usj(idf_path: str, dut: IdfDut) -> None:
_test_examples_sysview_tracing_heap_log(idf_path, dut)