mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-09 23:54:33 +02:00
ci: OpenOCD class as fixture
This commit is contained in:
125
conftest.py
125
conftest.py
@@ -1,8 +1,6 @@
|
||||
# SPDX-FileCopyrightText: 2021-2023 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
# pylint: disable=W0621 # redefined-outer-name
|
||||
|
||||
# This file is a pytest root configuration file and provide the following functionalities:
|
||||
# 1. Defines a few fixtures that could be used under the whole project.
|
||||
# 2. Defines a few hook functions.
|
||||
@@ -12,20 +10,28 @@
|
||||
#
|
||||
# This is an experimental feature, and if you found any bug or have any question, please report to
|
||||
# https://github.com/espressif/pytest-embedded/issues
|
||||
|
||||
import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from typing import Callable, Optional
|
||||
from telnetlib import Telnet
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from pytest_embedded.plugin import multi_dut_argument, multi_dut_fixture
|
||||
from pytest_embedded.plugin import multi_dut_argument
|
||||
from pytest_embedded.plugin import multi_dut_fixture
|
||||
from pytest_embedded.utils import to_bytes
|
||||
from pytest_embedded.utils import to_str
|
||||
from pytest_embedded_idf.dut import IdfDut
|
||||
from pytest_embedded_idf.unity_tester import CaseTester
|
||||
|
||||
@@ -94,6 +100,113 @@ def test_case_name(request: FixtureRequest, target: str, config: str) -> str:
|
||||
return format_case_id(target, config, request.node.originalname, is_qemu=is_qemu, params=filtered_params) # type: ignore
|
||||
|
||||
|
||||
class OpenOCD:
|
||||
def __init__(self, dut: 'IdfDut'):
|
||||
self.MAX_RETRIES = 3
|
||||
self.RETRY_DELAY = 1
|
||||
self.TELNET_PORT = 4444
|
||||
self.dut = dut
|
||||
self.telnet: Optional[Telnet] = None
|
||||
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
|
||||
self.proc: Optional[pexpect.spawn] = None
|
||||
|
||||
def __enter__(self) -> 'OpenOCD':
|
||||
return self
|
||||
|
||||
def __exit__(self, exception_type: Any, exception_value: Any, exception_traceback: Any) -> None:
|
||||
self.kill()
|
||||
|
||||
def run(self) -> Optional['OpenOCD']:
|
||||
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
|
||||
|
||||
try:
|
||||
with open(desc_path, 'r') as f:
|
||||
project_desc = json.load(f)
|
||||
except FileNotFoundError:
|
||||
logging.error('Project description file not found at %s', desc_path)
|
||||
raise
|
||||
|
||||
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
|
||||
if not openocd_scripts:
|
||||
raise RuntimeError('OPENOCD_SCRIPTS environment variable is not set.')
|
||||
|
||||
debug_args = project_desc.get('debug_arguments_openocd')
|
||||
if not debug_args:
|
||||
raise KeyError("'debug_arguments_openocd' key is missing in project_description.json")
|
||||
|
||||
# For debug purposes, make the value '4'
|
||||
ocd_env = os.environ.copy()
|
||||
ocd_env['LIBUSB_DEBUG'] = '1'
|
||||
|
||||
for _ in range(1, self.MAX_RETRIES + 1):
|
||||
try:
|
||||
self.proc = pexpect.spawn(
|
||||
command='openocd',
|
||||
args=['-s', openocd_scripts] + debug_args.split(),
|
||||
timeout=5,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
env=ocd_env,
|
||||
)
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
|
||||
self.connect_telnet()
|
||||
self.write('log_output {}'.format(self.log_file))
|
||||
return self
|
||||
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT, ConnectionRefusedError) as e:
|
||||
logging.error('Error running OpenOCD: %s', str(e))
|
||||
self.kill()
|
||||
time.sleep(self.RETRY_DELAY)
|
||||
|
||||
raise RuntimeError('Failed to run OpenOCD after %d attempts.', self.MAX_RETRIES)
|
||||
|
||||
def connect_telnet(self) -> None:
|
||||
for attempt in range(1, self.MAX_RETRIES + 1):
|
||||
try:
|
||||
self.telnet = Telnet('127.0.0.1', self.TELNET_PORT, 5)
|
||||
break
|
||||
except ConnectionRefusedError as e:
|
||||
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ConnectionRefusedError
|
||||
|
||||
def write(self, s: str) -> Any:
|
||||
if self.telnet is None:
|
||||
logging.error('Telnet connection is not established.')
|
||||
return ''
|
||||
resp = self.telnet.read_very_eager()
|
||||
self.telnet.write(to_bytes(s, '\n'))
|
||||
resp += self.telnet.read_until(b'>')
|
||||
return to_str(resp)
|
||||
|
||||
def apptrace_wait_stop(self, timeout: int = 30) -> None:
|
||||
stopped = False
|
||||
end_before = time.time() + timeout
|
||||
while not stopped:
|
||||
cmd_out = self.write('esp apptrace status')
|
||||
for line in cmd_out.splitlines():
|
||||
if line.startswith('Tracing is STOPPED.'):
|
||||
stopped = True
|
||||
break
|
||||
if not stopped and time.time() > end_before:
|
||||
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
|
||||
time.sleep(1)
|
||||
|
||||
def kill(self) -> None:
|
||||
# Check if the process is still running
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
self.proc.kill(signal.SIGKILL)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def openocd_dut(dut: IdfDut) -> OpenOCD:
|
||||
if isinstance(dut, tuple):
|
||||
raise ValueError('Multi-DUT support is not implemented yet')
|
||||
return OpenOCD(dut)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@multi_dut_fixture
|
||||
def build_dir(app_path: str, target: Optional[str], config: Optional[str]) -> str:
|
||||
@@ -314,7 +427,7 @@ def pytest_configure(config: Config) -> None:
|
||||
print('Detected app: ', apps_list[-1])
|
||||
else:
|
||||
print(
|
||||
f'WARNING: app_info base dir {app_info_basedir} not recognizable in {app_info["app_dir"]}, skipping...'
|
||||
f'WARNING: app_info base dir {app_info_basedir} not recognizable in {app_info["app_dir"]}, skipping...' # noqa: E713
|
||||
)
|
||||
continue
|
||||
|
||||
|
@@ -1,125 +1,18 @@
|
||||
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
from telnetlib import Telnet
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
import typing
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
from pytest_embedded.utils import to_bytes
|
||||
from pytest_embedded.utils import to_str
|
||||
from pytest_embedded_idf import IdfDut
|
||||
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 1
|
||||
TELNET_PORT = 4444
|
||||
if typing.TYPE_CHECKING:
|
||||
from conftest import OpenOCD
|
||||
|
||||
|
||||
class OpenOCD:
|
||||
def __init__(self, dut: 'IdfDut'):
|
||||
self.dut = dut
|
||||
self.telnet: Optional[Telnet] = None
|
||||
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
|
||||
self.proc: Optional[pexpect.spawn] = None
|
||||
|
||||
def run(self) -> Optional['OpenOCD']:
|
||||
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
|
||||
|
||||
try:
|
||||
with open(desc_path, 'r') as f:
|
||||
project_desc = json.load(f)
|
||||
except FileNotFoundError:
|
||||
logging.error('Project description file not found at %s', desc_path)
|
||||
return None
|
||||
|
||||
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
|
||||
if not openocd_scripts:
|
||||
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
|
||||
return None
|
||||
|
||||
debug_args = project_desc.get('debug_arguments_openocd')
|
||||
if not debug_args:
|
||||
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
|
||||
return None
|
||||
|
||||
# For debug purposes, make the value '4'
|
||||
ocd_env = os.environ.copy()
|
||||
ocd_env['LIBUSB_DEBUG'] = '1'
|
||||
|
||||
for _ in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.proc = pexpect.spawn(
|
||||
command='openocd',
|
||||
args=['-s', openocd_scripts] + debug_args.split(),
|
||||
timeout=5,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
env=ocd_env,
|
||||
)
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
|
||||
return self
|
||||
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
|
||||
logging.error('Error running OpenOCD: %s', str(e))
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
time.sleep(RETRY_DELAY)
|
||||
|
||||
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
|
||||
return None
|
||||
|
||||
def connect_telnet(self) -> None:
|
||||
for attempt in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
|
||||
break
|
||||
except ConnectionRefusedError as e:
|
||||
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ConnectionRefusedError
|
||||
|
||||
def write(self, s: str) -> Any:
|
||||
if self.telnet is None:
|
||||
logging.error('Telnet connection is not established.')
|
||||
return ''
|
||||
resp = self.telnet.read_very_eager()
|
||||
self.telnet.write(to_bytes(s, '\n'))
|
||||
resp += self.telnet.read_until(b'>')
|
||||
return to_str(resp)
|
||||
|
||||
def apptrace_wait_stop(self, timeout: int = 30) -> None:
|
||||
stopped = False
|
||||
end_before = time.time() + timeout
|
||||
while not stopped:
|
||||
cmd_out = self.write('esp apptrace status')
|
||||
for line in cmd_out.splitlines():
|
||||
if line.startswith('Tracing is STOPPED.'):
|
||||
stopped = True
|
||||
break
|
||||
if not stopped and time.time() > end_before:
|
||||
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
|
||||
time.sleep(1)
|
||||
|
||||
def kill(self) -> None:
|
||||
# Check if the process is still running
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
self.proc.kill(signal.SIGKILL)
|
||||
|
||||
|
||||
def _test_examples_app_trace_basic(dut: IdfDut) -> None:
|
||||
def _test_examples_app_trace_basic(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5)
|
||||
openocd = OpenOCD(dut).run()
|
||||
assert openocd
|
||||
try:
|
||||
openocd.connect_telnet()
|
||||
openocd.write('log_output {}'.format(openocd.log_file))
|
||||
with openocd_dut.run() as openocd:
|
||||
openocd.write('reset run')
|
||||
dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5)
|
||||
time.sleep(1) # wait for APPTRACE_INIT semihosting call
|
||||
@@ -150,16 +43,14 @@ def _test_examples_app_trace_basic(dut: IdfDut) -> None:
|
||||
break
|
||||
if found is not True:
|
||||
raise RuntimeError('"{}" could not be found in {}'.format(log_str, 'apptrace.log'))
|
||||
finally:
|
||||
openocd.kill()
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c2
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32c2
|
||||
@pytest.mark.jtag
|
||||
def test_examples_app_trace_basic(dut: IdfDut) -> None:
|
||||
_test_examples_app_trace_basic(dut)
|
||||
def test_examples_app_trace_basic(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_examples_app_trace_basic(openocd_dut, dut)
|
||||
|
||||
|
||||
@pytest.mark.esp32s3
|
||||
@@ -167,5 +58,5 @@ def test_examples_app_trace_basic(dut: IdfDut) -> None:
|
||||
@pytest.mark.esp32c6
|
||||
@pytest.mark.esp32h2
|
||||
@pytest.mark.usb_serial_jtag
|
||||
def test_examples_app_trace_basic_usj(dut: IdfDut) -> None:
|
||||
_test_examples_app_trace_basic(dut)
|
||||
def test_examples_app_trace_basic_usj(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_examples_app_trace_basic(openocd_dut, dut)
|
||||
|
@@ -1,128 +1,22 @@
|
||||
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import json
|
||||
import logging
|
||||
import os.path
|
||||
import signal
|
||||
import time
|
||||
from telnetlib import Telnet
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
import typing
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
from pytest_embedded.utils import to_bytes
|
||||
from pytest_embedded.utils import to_str
|
||||
from pytest_embedded_idf import IdfDut
|
||||
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 1
|
||||
TELNET_PORT = 4444
|
||||
if typing.TYPE_CHECKING:
|
||||
from conftest import OpenOCD
|
||||
|
||||
|
||||
class OpenOCD:
|
||||
def __init__(self, dut: 'IdfDut'):
|
||||
self.dut = dut
|
||||
self.telnet: Optional[Telnet] = None
|
||||
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
|
||||
self.proc: Optional[pexpect.spawn] = None
|
||||
|
||||
def run(self) -> Optional['OpenOCD']:
|
||||
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
|
||||
|
||||
try:
|
||||
with open(desc_path, 'r') as f:
|
||||
project_desc = json.load(f)
|
||||
except FileNotFoundError:
|
||||
logging.error('Project description file not found at %s', desc_path)
|
||||
return None
|
||||
|
||||
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
|
||||
if not openocd_scripts:
|
||||
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
|
||||
return None
|
||||
|
||||
debug_args = project_desc.get('debug_arguments_openocd')
|
||||
if not debug_args:
|
||||
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
|
||||
return None
|
||||
|
||||
# For debug purposes, make the value '4'
|
||||
ocd_env = os.environ.copy()
|
||||
ocd_env['LIBUSB_DEBUG'] = '1'
|
||||
|
||||
for _ in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.proc = pexpect.spawn(
|
||||
command='openocd',
|
||||
args=['-s', openocd_scripts] + debug_args.split(),
|
||||
timeout=5,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
env=ocd_env,
|
||||
)
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
|
||||
return self
|
||||
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
|
||||
logging.error('Error running OpenOCD: %s', str(e))
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
time.sleep(RETRY_DELAY)
|
||||
|
||||
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
|
||||
return None
|
||||
|
||||
def connect_telnet(self) -> None:
|
||||
for attempt in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
|
||||
break
|
||||
except ConnectionRefusedError as e:
|
||||
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ConnectionRefusedError
|
||||
|
||||
def write(self, s: str) -> Any:
|
||||
if self.telnet is None:
|
||||
logging.error('Telnet connection is not established.')
|
||||
return ''
|
||||
resp = self.telnet.read_very_eager()
|
||||
self.telnet.write(to_bytes(s, '\n'))
|
||||
resp += self.telnet.read_until(b'>')
|
||||
return to_str(resp)
|
||||
|
||||
def apptrace_wait_stop(self, timeout: int = 30) -> None:
|
||||
stopped = False
|
||||
end_before = time.time() + timeout
|
||||
while not stopped:
|
||||
cmd_out = self.write('esp apptrace status')
|
||||
for line in cmd_out.splitlines():
|
||||
if line.startswith('Tracing is STOPPED.'):
|
||||
stopped = True
|
||||
break
|
||||
if not stopped and time.time() > end_before:
|
||||
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
|
||||
time.sleep(1)
|
||||
|
||||
def kill(self) -> None:
|
||||
# Check if the process is still running
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
self.proc.kill(signal.SIGKILL)
|
||||
|
||||
|
||||
def _test_gcov(dut: IdfDut) -> None:
|
||||
def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
# create the generated .gcda folder, otherwise would have error: failed to open file.
|
||||
# normally this folder would be created via `idf.py build`. but in CI the non-related files would not be preserved
|
||||
os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir'), exist_ok=True)
|
||||
os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'sample', 'CMakeFiles', '__idf_sample.dir'), exist_ok=True)
|
||||
|
||||
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
|
||||
openocd = OpenOCD(dut).run()
|
||||
assert openocd
|
||||
|
||||
def expect_counter_output(loop: int, timeout: int = 10) -> None:
|
||||
dut.expect_exact(
|
||||
[f'blink_dummy_func: Counter = {loop}', f'some_dummy_func: Counter = {loop * 2}'],
|
||||
@@ -152,9 +46,8 @@ def _test_gcov(dut: IdfDut) -> None:
|
||||
|
||||
assert len(expect_lines) == 0
|
||||
|
||||
try:
|
||||
openocd.connect_telnet()
|
||||
openocd.write('log_output {}'.format(openocd.log_file))
|
||||
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
|
||||
with openocd_dut.run() as openocd:
|
||||
openocd.write('reset run')
|
||||
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
|
||||
|
||||
@@ -176,16 +69,14 @@ def _test_gcov(dut: IdfDut) -> None:
|
||||
time.sleep(1)
|
||||
# Test instant run-time dump
|
||||
dump_coverage('esp gcov')
|
||||
finally:
|
||||
openocd.kill()
|
||||
|
||||
|
||||
@pytest.mark.jtag
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c2
|
||||
@pytest.mark.esp32s2
|
||||
def test_gcov(dut: IdfDut) -> None:
|
||||
_test_gcov(dut)
|
||||
def test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_gcov(openocd_dut, dut)
|
||||
|
||||
|
||||
@pytest.mark.esp32s3
|
||||
@@ -193,5 +84,5 @@ def test_gcov(dut: IdfDut) -> None:
|
||||
@pytest.mark.esp32c6
|
||||
@pytest.mark.esp32h2
|
||||
@pytest.mark.usb_serial_jtag
|
||||
def test_gcov_usj(dut: IdfDut) -> None:
|
||||
_test_gcov(dut)
|
||||
def test_gcov_usj(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_gcov(openocd_dut, dut)
|
||||
|
@@ -1,120 +1,19 @@
|
||||
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import json
|
||||
import logging
|
||||
import os.path
|
||||
import re
|
||||
import signal
|
||||
import time
|
||||
from telnetlib import Telnet
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
import typing
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
from pytest_embedded.utils import to_bytes
|
||||
from pytest_embedded.utils import to_str
|
||||
from pytest_embedded_idf import IdfDut
|
||||
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 1
|
||||
TELNET_PORT = 4444
|
||||
if typing.TYPE_CHECKING:
|
||||
from conftest import OpenOCD
|
||||
|
||||
|
||||
class OpenOCD:
|
||||
def __init__(self, dut: 'IdfDut'):
|
||||
self.dut = dut
|
||||
self.telnet: Optional[Telnet] = None
|
||||
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
|
||||
self.proc: Optional[pexpect.spawn] = None
|
||||
|
||||
def run(self) -> Optional['OpenOCD']:
|
||||
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
|
||||
|
||||
try:
|
||||
with open(desc_path, 'r') as f:
|
||||
project_desc = json.load(f)
|
||||
except FileNotFoundError:
|
||||
logging.error('Project description file not found at %s', desc_path)
|
||||
return None
|
||||
|
||||
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
|
||||
if not openocd_scripts:
|
||||
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
|
||||
return None
|
||||
|
||||
debug_args = project_desc.get('debug_arguments_openocd')
|
||||
if not debug_args:
|
||||
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
|
||||
return None
|
||||
|
||||
# For debug purposes, make the value '4'
|
||||
ocd_env = os.environ.copy()
|
||||
ocd_env['LIBUSB_DEBUG'] = '1'
|
||||
|
||||
for _ in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.proc = pexpect.spawn(
|
||||
command='openocd',
|
||||
args=['-s', openocd_scripts] + debug_args.split(),
|
||||
timeout=5,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
env=ocd_env,
|
||||
)
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
|
||||
return self
|
||||
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
|
||||
logging.error('Error running OpenOCD: %s', str(e))
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
time.sleep(RETRY_DELAY)
|
||||
|
||||
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
|
||||
return None
|
||||
|
||||
def connect_telnet(self) -> None:
|
||||
for attempt in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
|
||||
break
|
||||
except ConnectionRefusedError as e:
|
||||
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ConnectionRefusedError
|
||||
|
||||
def write(self, s: str) -> Any:
|
||||
if self.telnet is None:
|
||||
logging.error('Telnet connection is not established.')
|
||||
return ''
|
||||
resp = self.telnet.read_very_eager()
|
||||
self.telnet.write(to_bytes(s, '\n'))
|
||||
resp += self.telnet.read_until(b'>')
|
||||
return to_str(resp)
|
||||
|
||||
def apptrace_wait_stop(self, timeout: int = 30) -> None:
|
||||
stopped = False
|
||||
end_before = time.time() + timeout
|
||||
while not stopped:
|
||||
cmd_out = self.write('esp apptrace status')
|
||||
for line in cmd_out.splitlines():
|
||||
if line.startswith('Tracing is STOPPED.'):
|
||||
stopped = True
|
||||
break
|
||||
if not stopped and time.time() > end_before:
|
||||
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
|
||||
time.sleep(1)
|
||||
|
||||
def kill(self) -> None:
|
||||
# Check if the process is still running
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
self.proc.kill(signal.SIGKILL)
|
||||
|
||||
|
||||
def _test_examples_sysview_tracing(dut: IdfDut) -> None:
|
||||
def _test_examples_sysview_tracing(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
# Construct trace log paths
|
||||
trace_log = [
|
||||
os.path.join(dut.logdir, 'sys_log0.svdat') # pylint: disable=protected-access
|
||||
@@ -138,36 +37,28 @@ def _test_examples_sysview_tracing(dut: IdfDut) -> None:
|
||||
dut.expect(re.compile(rb'example: Task\[0x[0-9A-Fa-f]+\]: received event \d+'), timeout=30)
|
||||
|
||||
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
|
||||
openocd = OpenOCD(dut).run()
|
||||
assert openocd
|
||||
try:
|
||||
openocd.connect_telnet()
|
||||
openocd.write('log_output {}'.format(openocd.log_file))
|
||||
with openocd_dut.run() as openocd, open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
|
||||
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
|
||||
timeout=60,
|
||||
logfile=gdb_log,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
) as p:
|
||||
p.expect_exact('hit Breakpoint 1, app_main ()')
|
||||
dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect()
|
||||
dut_expect_task_event()
|
||||
|
||||
with open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
|
||||
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
|
||||
timeout=60,
|
||||
logfile=gdb_log,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
) as p:
|
||||
p.expect_exact('hit Breakpoint 1, app_main ()')
|
||||
dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect()
|
||||
dut_expect_task_event()
|
||||
|
||||
# Do a sleep while sysview samples are captured.
|
||||
time.sleep(3)
|
||||
openocd.write('esp sysview stop')
|
||||
finally:
|
||||
openocd.kill()
|
||||
# Do a sleep while sysview samples are captured.
|
||||
time.sleep(3)
|
||||
openocd.write('esp sysview stop')
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32c2
|
||||
@pytest.mark.jtag
|
||||
def test_examples_sysview_tracing(dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing(dut)
|
||||
def test_examples_sysview_tracing(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing(openocd_dut, dut)
|
||||
|
||||
|
||||
@pytest.mark.esp32s3
|
||||
@@ -175,5 +66,5 @@ def test_examples_sysview_tracing(dut: IdfDut) -> None:
|
||||
@pytest.mark.esp32c6
|
||||
@pytest.mark.esp32h2
|
||||
@pytest.mark.usb_serial_jtag
|
||||
def test_examples_sysview_tracing_usj(dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing(dut)
|
||||
def test_examples_sysview_tracing_usj(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing(openocd_dut, dut)
|
||||
|
@@ -1,119 +1,17 @@
|
||||
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import json
|
||||
import logging
|
||||
import os.path
|
||||
import signal
|
||||
import time
|
||||
from telnetlib import Telnet
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
import typing
|
||||
|
||||
import pexpect.fdpexpect
|
||||
import pytest
|
||||
from pytest_embedded.utils import to_bytes
|
||||
from pytest_embedded.utils import to_str
|
||||
from pytest_embedded_idf import IdfDut
|
||||
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 1
|
||||
TELNET_PORT = 4444
|
||||
if typing.TYPE_CHECKING:
|
||||
from conftest import OpenOCD
|
||||
|
||||
|
||||
class OpenOCD:
|
||||
def __init__(self, dut: 'IdfDut'):
|
||||
self.dut = dut
|
||||
self.telnet: Optional[Telnet] = None
|
||||
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
|
||||
self.proc: Optional[pexpect.spawn] = None
|
||||
|
||||
def run(self) -> Optional['OpenOCD']:
|
||||
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
|
||||
|
||||
try:
|
||||
with open(desc_path, 'r') as f:
|
||||
project_desc = json.load(f)
|
||||
except FileNotFoundError:
|
||||
logging.error('Project description file not found at %s', desc_path)
|
||||
return None
|
||||
|
||||
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
|
||||
if not openocd_scripts:
|
||||
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
|
||||
return None
|
||||
|
||||
debug_args = project_desc.get('debug_arguments_openocd')
|
||||
if not debug_args:
|
||||
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
|
||||
return None
|
||||
|
||||
# For debug purposes, make the value '4'
|
||||
ocd_env = os.environ.copy()
|
||||
ocd_env['LIBUSB_DEBUG'] = '1'
|
||||
|
||||
for _ in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.proc = pexpect.spawn(
|
||||
command='openocd',
|
||||
args=['-s', openocd_scripts] + debug_args.split(),
|
||||
timeout=5,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
env=ocd_env,
|
||||
)
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
|
||||
return self
|
||||
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
|
||||
logging.error('Error running OpenOCD: %s', str(e))
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
time.sleep(RETRY_DELAY)
|
||||
|
||||
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
|
||||
return None
|
||||
|
||||
def connect_telnet(self) -> None:
|
||||
for attempt in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
|
||||
break
|
||||
except ConnectionRefusedError as e:
|
||||
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ConnectionRefusedError
|
||||
|
||||
def write(self, s: str) -> Any:
|
||||
if self.telnet is None:
|
||||
logging.error('Telnet connection is not established.')
|
||||
return ''
|
||||
resp = self.telnet.read_very_eager()
|
||||
self.telnet.write(to_bytes(s, '\n'))
|
||||
resp += self.telnet.read_until(b'>')
|
||||
return to_str(resp)
|
||||
|
||||
def apptrace_wait_stop(self, timeout: int = 30) -> None:
|
||||
stopped = False
|
||||
end_before = time.time() + timeout
|
||||
while not stopped:
|
||||
cmd_out = self.write('esp apptrace status')
|
||||
for line in cmd_out.splitlines():
|
||||
if line.startswith('Tracing is STOPPED.'):
|
||||
stopped = True
|
||||
break
|
||||
if not stopped and time.time() > end_before:
|
||||
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
|
||||
time.sleep(1)
|
||||
|
||||
def kill(self) -> None:
|
||||
# Check if the process is still running
|
||||
if self.proc and self.proc.isalive():
|
||||
self.proc.terminate()
|
||||
self.proc.kill(signal.SIGKILL)
|
||||
|
||||
|
||||
def _test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
|
||||
def _test_examples_sysview_tracing_heap_log(openocd_dut: 'OpenOCD', idf_path: str, dut: IdfDut) -> None:
|
||||
# Construct trace log paths
|
||||
trace_log = [
|
||||
os.path.join(dut.logdir, 'heap_log0.svdat') # pylint: disable=protected-access
|
||||
@@ -134,23 +32,15 @@ def _test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
|
||||
f_w.write(line)
|
||||
|
||||
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
|
||||
openocd = OpenOCD(dut).run()
|
||||
assert openocd
|
||||
try:
|
||||
openocd.connect_telnet()
|
||||
openocd.write('log_output {}'.format(openocd.log_file))
|
||||
|
||||
with open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
|
||||
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
|
||||
timeout=60,
|
||||
logfile=gdb_log,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
) as p:
|
||||
# Wait for sysview files to be generated
|
||||
p.expect_exact('Tracing is STOPPED')
|
||||
finally:
|
||||
openocd.kill()
|
||||
with openocd_dut.run(), open(gdb_logfile, 'w') as gdb_log, pexpect.spawn(
|
||||
f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}',
|
||||
timeout=60,
|
||||
logfile=gdb_log,
|
||||
encoding='utf-8',
|
||||
codec_errors='ignore',
|
||||
) as p:
|
||||
# Wait for sysview files to be generated
|
||||
p.expect_exact('Tracing is STOPPED')
|
||||
|
||||
# Process sysview trace logs
|
||||
command = [os.path.join(idf_path, 'tools', 'esp_app_trace', 'sysviewtrace_proc.py'), '-p'] + trace_log
|
||||
@@ -170,8 +60,8 @@ def _test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32c2
|
||||
def test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing_heap_log(idf_path, dut)
|
||||
def test_examples_sysview_tracing_heap_log(openocd_dut: 'OpenOCD', idf_path: str, dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing_heap_log(openocd_dut, idf_path, dut)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('config', ['app_trace_jtag'], indirect=True)
|
||||
@@ -180,5 +70,5 @@ def test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32c6
|
||||
@pytest.mark.esp32h2
|
||||
def test_examples_sysview_tracing_heap_log_usj(idf_path: str, dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing_heap_log(idf_path, dut)
|
||||
def test_examples_sysview_tracing_heap_log_usj(openocd_dut: 'OpenOCD', idf_path: str, dut: IdfDut) -> None:
|
||||
_test_examples_sysview_tracing_heap_log(openocd_dut, idf_path, dut)
|
||||
|
Reference in New Issue
Block a user