mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-10-31 15:11:40 +01:00 
			
		
		
		
	
		
			
	
	
		
			280 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			280 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD | ||
|  | # SPDX-License-Identifier: Apache-2.0 | ||
|  | 
 | ||
|  | # pylint: disable=W0621  # redefined-outer-name | ||
|  | 
 | ||
|  | import hashlib | ||
|  | import logging | ||
|  | import os | ||
|  | import subprocess | ||
|  | import sys | ||
|  | from typing import Any, Dict, List, TextIO | ||
|  | 
 | ||
|  | import pexpect | ||
|  | import pytest | ||
|  | from _pytest.fixtures import FixtureRequest | ||
|  | from _pytest.monkeypatch import MonkeyPatch | ||
|  | from pygdbmi.gdbcontroller import GdbController, GdbTimeoutError, NoGdbProcessError | ||
|  | from pytest_embedded_idf.app import IdfApp | ||
|  | from pytest_embedded_idf.dut import IdfDut | ||
|  | from pytest_embedded_idf.serial import IdfSerial | ||
|  | 
 | ||
|  | 
 | ||
|  | def sha256(file: str) -> str: | ||
|  |     res = hashlib.sha256() | ||
|  |     with open(file, 'rb') as fr: | ||
|  |         res.update(fr.read()) | ||
|  |     return res.hexdigest() | ||
|  | 
 | ||
|  | 
 | ||
|  | class PanicTestDut(IdfDut): | ||
|  |     BOOT_CMD_ADDR = 0x9000 | ||
|  |     BOOT_CMD_SIZE = 0x1000 | ||
|  |     DEFAULT_EXPECT_TIMEOUT = 10 | ||
|  |     COREDUMP_UART_START = '================= CORE DUMP START =================' | ||
|  |     COREDUMP_UART_END = '================= CORE DUMP END =================' | ||
|  | 
 | ||
|  |     app: IdfApp | ||
|  |     serial: IdfSerial | ||
|  | 
 | ||
|  |     def __init__(self, *args, **kwargs) -> None:  # type: ignore | ||
|  |         super().__init__(*args, **kwargs) | ||
|  | 
 | ||
|  |         self.gdb: GdbController = None  # type: ignore | ||
|  |         # record this since pygdbmi is using logging.debug to generate some single character mess | ||
|  |         self.log_level = logging.getLogger().level | ||
|  |         # pygdbmi is using logging.debug to generate some single character mess | ||
|  |         if self.log_level <= logging.DEBUG: | ||
|  |             logging.getLogger().setLevel(logging.INFO) | ||
|  | 
 | ||
|  |         self.coredump_output: TextIO = None  # type: ignore | ||
|  | 
 | ||
|  |     def close(self) -> None: | ||
|  |         if self.gdb: | ||
|  |             self.gdb.exit() | ||
|  | 
 | ||
|  |         super().close() | ||
|  | 
 | ||
|  |     def revert_log_level(self) -> None: | ||
|  |         logging.getLogger().setLevel(self.log_level) | ||
|  | 
 | ||
|  |     def expect_test_func_name(self, test_func_name: str) -> None: | ||
|  |         self.expect_exact('Enter test name:') | ||
|  |         self.write(test_func_name) | ||
|  |         self.expect_exact('Got test name: ' + test_func_name) | ||
|  | 
 | ||
|  |     def expect_none(self, pattern, **kwargs) -> None:  # type: ignore | ||
|  |         """like dut.expect_all, but with an inverse logic""" | ||
|  |         if 'timeout' not in kwargs: | ||
|  |             kwargs['timeout'] = 1 | ||
|  | 
 | ||
|  |         try: | ||
|  |             res = self.expect(pattern, **kwargs) | ||
|  |             raise AssertionError(f'Unexpected: {res.group().decode("utf8")}') | ||
|  |         except pexpect.TIMEOUT: | ||
|  |             pass | ||
|  | 
 | ||
|  |     def expect_backtrace(self) -> None: | ||
|  |         self.expect_exact('Backtrace:') | ||
|  |         self.expect_none('CORRUPTED') | ||
|  | 
 | ||
|  |     def expect_gme(self, reason: str) -> None: | ||
|  |         """Expect method for Guru Meditation Errors""" | ||
|  |         self.expect_exact(f"Guru Meditation Error: Core  0 panic'ed ({reason})") | ||
|  | 
 | ||
|  |     def expect_reg_dump(self, core: int = 0) -> None: | ||
|  |         """Expect method for the register dump""" | ||
|  |         self.expect(r'Core\s+%d register dump:' % core) | ||
|  | 
 | ||
|  |     def expect_elf_sha256(self) -> None: | ||
|  |         """Expect method for ELF SHA256 line""" | ||
|  |         elf_sha256 = sha256(self.app.elf_file) | ||
|  |         elf_sha256_len = int( | ||
|  |             self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '16') | ||
|  |         ) | ||
|  |         self.expect_exact('ELF file SHA256: ' + elf_sha256[0:elf_sha256_len]) | ||
|  | 
 | ||
|  |     def _call_espcoredump( | ||
|  |         self, extra_args: List[str], coredump_file_name: str, output_file_name: str | ||
|  |     ) -> None: | ||
|  |         # no "with" here, since we need the file to be open for later inspection by the test case | ||
|  |         if not self.coredump_output: | ||
|  |             self.coredump_output = open(output_file_name, 'w') | ||
|  | 
 | ||
|  |         espcoredump_script = os.path.join( | ||
|  |             os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py' | ||
|  |         ) | ||
|  |         espcoredump_args = [ | ||
|  |             sys.executable, | ||
|  |             espcoredump_script, | ||
|  |             'info_corefile', | ||
|  |             '--core', | ||
|  |             coredump_file_name, | ||
|  |         ] | ||
|  |         espcoredump_args += extra_args | ||
|  |         espcoredump_args.append(self.app.elf_file) | ||
|  |         logging.info('Running %s', ' '.join(espcoredump_args)) | ||
|  |         logging.info('espcoredump output is written to %s', self.coredump_output.name) | ||
|  | 
 | ||
|  |         subprocess.check_call(espcoredump_args, stdout=self.coredump_output) | ||
|  |         self.coredump_output.flush() | ||
|  |         self.coredump_output.seek(0) | ||
|  | 
 | ||
|  |     def process_coredump_uart(self) -> None: | ||
|  |         """Extract the core dump from UART output of the test, run espcoredump on it""" | ||
|  |         self.expect(self.COREDUMP_UART_START) | ||
|  |         res = self.expect('(.+)' + self.COREDUMP_UART_END) | ||
|  |         coredump_base64 = res.group(1).decode('utf8') | ||
|  |         with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file: | ||
|  |             logging.info('Writing UART base64 core dump to %s', coredump_file.name) | ||
|  |             coredump_file.write(coredump_base64) | ||
|  | 
 | ||
|  |         output_file_name = os.path.join(self.logdir, 'coredump_uart_result.txt') | ||
|  |         self._call_espcoredump( | ||
|  |             ['--core-format', 'b64'], coredump_file.name, output_file_name | ||
|  |         ) | ||
|  | 
 | ||
|  |     def process_coredump_flash(self) -> None: | ||
|  |         """Extract the core dump from flash, run espcoredump on it""" | ||
|  |         coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin') | ||
|  |         logging.info('Writing flash binary core dump to %s', coredump_file_name) | ||
|  |         self.serial.dump_flash(coredump_file_name, partition='coredump') | ||
|  | 
 | ||
|  |         output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt') | ||
|  |         self._call_espcoredump( | ||
|  |             ['--core-format', 'raw'], coredump_file_name, output_file_name | ||
|  |         ) | ||
|  | 
 | ||
|  |     def gdb_write(self, command: str) -> Any: | ||
|  |         """
 | ||
|  |         Wrapper to write to gdb with a longer timeout, as test runner | ||
|  |         host can be slow sometimes | ||
|  |         """
 | ||
|  |         return self.gdb.write(command, timeout_sec=10) | ||
|  | 
 | ||
|  |     def start_gdb(self) -> None: | ||
|  |         """
 | ||
|  |         Runs GDB and connects it to the "serial" port of the DUT. | ||
|  |         After this, the DUT expect methods can no longer be used to capture output. | ||
|  |         """
 | ||
|  |         self.gdb = GdbController(gdb_path=self.toolchain_prefix + 'gdb') | ||
|  | 
 | ||
|  |         # pygdbmi logs to console by default, make it log to a file instead | ||
|  |         pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt') | ||
|  |         pygdbmi_logger = self.gdb.logger | ||
|  |         pygdbmi_logger.setLevel(logging.DEBUG) | ||
|  |         while pygdbmi_logger.hasHandlers(): | ||
|  |             pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0]) | ||
|  |         log_handler = logging.FileHandler(pygdbmi_log_file_name) | ||
|  |         log_handler.setFormatter( | ||
|  |             logging.Formatter('%(asctime)s %(levelname)s: %(message)s') | ||
|  |         ) | ||
|  |         pygdbmi_logger.addHandler(log_handler) | ||
|  | 
 | ||
|  |         logging.info('Running command: %s', self.gdb.get_subprocess_cmd()) | ||
|  |         for _ in range(10): | ||
|  |             try: | ||
|  |                 # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that | ||
|  |                 # an RPI under high load will get non-responsive during creating a lot of processes. | ||
|  |                 resp = self.gdb.get_gdb_response( | ||
|  |                     timeout_sec=10 | ||
|  |                 )  # calls verify_valid_gdb_subprocess() internally | ||
|  |                 # it will be interesting to look up this response if the next GDB command fails (times out) | ||
|  |                 logging.info('GDB response: %s', resp) | ||
|  |                 break  # success | ||
|  |             except GdbTimeoutError: | ||
|  |                 logging.warning( | ||
|  |                     'GDB internal error: cannot get response from the subprocess' | ||
|  |                 ) | ||
|  |             except NoGdbProcessError: | ||
|  |                 logging.error('GDB internal error: process is not running') | ||
|  |                 break  # failure - TODO: create another GdbController | ||
|  |             except ValueError: | ||
|  |                 logging.error( | ||
|  |                     'GDB internal error: select() returned an unexpected file number' | ||
|  |                 ) | ||
|  | 
 | ||
|  |         # Set up logging for GDB remote protocol | ||
|  |         gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt') | ||
|  |         self.gdb_write('-gdb-set remotelogfile ' + gdb_remotelog_file_name) | ||
|  | 
 | ||
|  |         # Load the ELF file | ||
|  |         self.gdb_write('-file-exec-and-symbols {}'.format(self.app.elf_file)) | ||
|  | 
 | ||
|  |         # Connect GDB to UART | ||
|  |         self.serial.proc.close() | ||
|  |         logging.info('Connecting to GDB Stub...') | ||
|  |         self.gdb_write('-gdb-set serial baud 115200') | ||
|  |         responses = self.gdb_write('-target-select remote ' + self.serial.port) | ||
|  | 
 | ||
|  |         # Make sure we get the 'stopped' notification | ||
|  |         stop_response = self.find_gdb_response('stopped', 'notify', responses) | ||
|  |         if not stop_response: | ||
|  |             responses = self.gdb_write('-exec-interrupt') | ||
|  |             stop_response = self.find_gdb_response('stopped', 'notify', responses) | ||
|  |             assert stop_response | ||
|  |         frame = stop_response['payload']['frame'] | ||
|  |         if 'file' not in frame: | ||
|  |             frame['file'] = '?' | ||
|  |         if 'line' not in frame: | ||
|  |             frame['line'] = '?' | ||
|  |         logging.info('Stopped in {func} at {addr} ({file}:{line})'.format(**frame)) | ||
|  | 
 | ||
|  |         # Drain remaining responses | ||
|  |         self.gdb.get_gdb_response(raise_error_on_timeout=False) | ||
|  | 
 | ||
|  |     def gdb_backtrace(self) -> Any: | ||
|  |         """
 | ||
|  |         Returns the list of stack frames for the current thread. | ||
|  |         Each frame is a dictionary, refer to pygdbmi docs for the format. | ||
|  |         """
 | ||
|  |         assert self.gdb | ||
|  | 
 | ||
|  |         responses = self.gdb_write('-stack-list-frames') | ||
|  |         return self.find_gdb_response('done', 'result', responses)['payload']['stack'] | ||
|  | 
 | ||
|  |     @staticmethod | ||
|  |     def match_backtrace( | ||
|  |         gdb_backtrace: List[Any], expected_functions_list: List[Any] | ||
|  |     ) -> bool: | ||
|  |         """
 | ||
|  |         Returns True if the function names listed in expected_functions_list match the backtrace | ||
|  |         given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace() | ||
|  |         function. | ||
|  |         """
 | ||
|  |         return all( | ||
|  |             [ | ||
|  |                 frame['func'] == expected_functions_list[i] | ||
|  |                 for i, frame in enumerate(gdb_backtrace) | ||
|  |             ] | ||
|  |         ) | ||
|  | 
 | ||
|  |     @staticmethod | ||
|  |     def find_gdb_response( | ||
|  |         message: str, response_type: str, responses: List[Any] | ||
|  |     ) -> Any: | ||
|  |         """
 | ||
|  |         Helper function which extracts one response from an array of GDB responses, filtering | ||
|  |         by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format. | ||
|  |         """
 | ||
|  | 
 | ||
|  |         def match_response(response: Dict[str, Any]) -> bool: | ||
|  |             return response['message'] == message and response['type'] == response_type  # type: ignore | ||
|  | 
 | ||
|  |         filtered_responses = [r for r in responses if match_response(r)] | ||
|  |         if not filtered_responses: | ||
|  |             return None | ||
|  |         return filtered_responses[0] | ||
|  | 
 | ||
|  | 
 | ||
|  | @pytest.fixture(scope='module') | ||
|  | def monkeypatch_module(request: FixtureRequest) -> MonkeyPatch: | ||
|  |     mp = MonkeyPatch() | ||
|  |     request.addfinalizer(mp.undo) | ||
|  |     return mp | ||
|  | 
 | ||
|  | 
 | ||
|  | @pytest.fixture(scope='module', autouse=True) | ||
|  | def replace_dut_class(monkeypatch_module: MonkeyPatch) -> None: | ||
|  |     monkeypatch_module.setattr('pytest_embedded_idf.dut.IdfDut', PanicTestDut) |