diff --git a/components/bt/common/hci_log/bt_hci_log.c b/components/bt/common/hci_log/bt_hci_log.c index ac85179a70..431eafc6c8 100644 --- a/components/bt/common/hci_log/bt_hci_log.c +++ b/components/bt/common/hci_log/bt_hci_log.c @@ -10,6 +10,7 @@ #include "bt_common.h" #include "osi/mutex.h" #include "esp_attr.h" +#include "esp_timer.h" #if (BT_HCI_LOG_INCLUDED == TRUE) #define BT_HCI_LOG_PRINT_TAG (1) @@ -203,6 +204,8 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st { osi_mutex_t mutex_lock; uint8_t *g_hci_log_buffer; + int64_t ts; + uint8_t *temp_buf; if (!p_hci_log_ctl->p_hci_log_buffer) { return ESP_FAIL; @@ -214,6 +217,16 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st return ESP_FAIL; } + ts = esp_timer_get_time(); + + temp_buf = (uint8_t *)malloc(data_len + 8); + memset(temp_buf, 0x0, data_len + 8); + + memcpy(temp_buf, &ts, 8); + memcpy(temp_buf + 8, data, data_len); + + data_len += 8; + mutex_lock = p_hci_log_ctl->mutex_lock; osi_mutex_lock(&mutex_lock, OSI_MUTEX_MAX_TIMEOUT); @@ -245,7 +258,7 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st bt_hci_log_record_string(p_hci_log_ctl, str); } - bt_hci_log_record_hex(p_hci_log_ctl, data, data_len); + bt_hci_log_record_hex(p_hci_log_ctl, temp_buf, data_len); g_hci_log_buffer[p_hci_log_ctl->log_record_in] = '\n'; @@ -261,6 +274,8 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st osi_mutex_unlock(&mutex_lock); + free(temp_buf); + return ESP_OK; } diff --git a/tools/bt/bt_hci_to_btsnoop.py b/tools/bt/bt_hci_to_btsnoop.py index 4b14da558a..1fff3f41e2 100644 --- a/tools/bt/bt_hci_to_btsnoop.py +++ b/tools/bt/bt_hci_to_btsnoop.py @@ -1,13 +1,9 @@ -# SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 import argparse import os import re import struct -import time - -parsed_num = 0 -all_line_num = 0 def create_new_bt_snoop_file(filename: str) -> None: @@ -20,30 +16,19 @@ def create_new_bt_snoop_file(filename: str) -> None: f.write(header) -def append_hci_to_bt_snoop_file(filename: str, direction: int, data: str) -> None: +def append_hci_to_bt_snoop_file(filename: str, direction: int, data: str, timestamp_us: int) -> None: if os.path.exists(filename): print(f'Appending to existing file: {filename}') else: print(f'Creating new file: {filename}') create_new_bt_snoop_file(filename) - - # Ensure data is converted to bytearray from hex string - data_bytes = bytearray.fromhex(data) # Convert hex string to bytearray - - with open(filename, 'ab') as f: # 'ab' mode for appending binary data - global parsed_num - parsed_num += 1 - data_len = len(data_bytes) - orig_len = data_len - incl_len = data_len - packet_flags = direction + (data_bytes[0] != 1 or data_bytes[0] != 3) * 2 + data_bytes = bytearray.fromhex(data) + with open(filename, 'ab') as f: + orig_len = incl_len = len(data_bytes) + packet_flags = direction + (data_bytes[0] != 1 and data_bytes[0] != 3) * 2 cumulative_drops = 0 - # Calculate the timestamp with an offset from a predefined reference time(0x00DCDDB30F2F8000). - timestamp_us = int(round(time.time() * 1000000)) + 0x00DCDDB30F2F8000 - - # Writing structured data followed by the byte array data f.write(struct.pack('>IIIIQ', orig_len, incl_len, packet_flags, cumulative_drops, timestamp_us)) - f.write(data_bytes) # Write bytearray (binary data) to file + f.write(data_bytes) def log_data_clean(data: str) -> str: @@ -51,82 +36,93 @@ def log_data_clean(data: str) -> str: return cleaned -def is_adv_report(data: str) -> bool: - is_binary = all(re.fullmatch(r'[0-9a-fA-F]{2}', part) for part in data.split()) - return is_binary - - def parse_log(input_path: str, output_tag: str) -> None: - """ - Parses the specified log file and saves the results to an output file. - - Args: - input_path (str): Path to the input log file. - output_tag (str): Name tag for the output file. - - Returns: - None - """ - global parsed_num - global all_line_num if not os.path.exists(input_path): print(f"Error: The file '{input_path}' does not exist.") return - - # Define the output directory and create it if it doesn't exist output_dir = './parsed_logs' os.makedirs(output_dir, exist_ok=True) - - # Define the output file name based on the tag output_file = os.path.join(output_dir, f'parsed_log_{output_tag}.btsnoop.log') - + parsed_num = 0 + all_line_num = 0 with open(input_path, 'r', encoding='utf-8') as infile: - # Example: Parse each line and save processed results for line in infile: try: all_line_num += 1 - line = log_data_clean(line) - line = line.replace('C:', '01 ') - line = line.replace('E:', '04 ') - line = line[3:] - if line[0] == 'H': - line = line.replace('H:', '02 ') - append_hci_to_bt_snoop_file(output_file, 0, line) + line = log_data_clean(line).strip() + if not line: continue - if line[0] == 'D': - line = line.replace('D:', '02 ') - append_hci_to_bt_snoop_file(output_file, 1, line) + parts = line.split() + if len(parts) < 10: continue - if line.startswith('01'): - append_hci_to_bt_snoop_file(output_file, 0, line) - continue - if line.startswith('04'): - append_hci_to_bt_snoop_file(output_file, 1, line) - continue - if is_adv_report(line): - line = '04 3e ' + line - append_hci_to_bt_snoop_file(output_file, 1, line) + parts_wo_ln = parts[1:] + # Check for literal in the first token + literal = None + if ':' in parts_wo_ln[0]: + literal_part, sep, ts_byte = parts_wo_ln[0].partition(':') + if sep == ':' and literal_part in ['C', 'D', 'E', 'H'] and len(ts_byte) == 2: + literal = literal_part + ':' + parts_wo_ln = [literal, ts_byte] + parts_wo_ln[1:] + else: + literal = None + if literal: + # Parse timestamp + try: + timestamp_bytes = bytes(int(b, 16) for b in parts_wo_ln[1:9]) + except Exception: + continue + timestamp_us = int.from_bytes(timestamp_bytes, byteorder='little', signed=False) + hci_data = ' '.join(parts_wo_ln[9:]) + if not hci_data: + continue + # Determine indicator and direction + if literal == 'C:': + hci_data = '01 ' + hci_data + direction = 0 + elif literal == 'E:': + hci_data = '04 ' + hci_data + direction = 1 + elif literal == 'H:': + hci_data = '02 ' + hci_data + direction = 0 + elif literal == 'D:': + hci_data = '02 ' + hci_data + direction = 1 + else: + continue + append_hci_to_bt_snoop_file(output_file, direction, hci_data, timestamp_us) + parsed_num += 1 + else: + # No literal: treat as advertising report + try: + timestamp_bytes = bytes(int(b, 16) for b in parts[1:9]) + except Exception: + continue + timestamp_us = int.from_bytes(timestamp_bytes, byteorder='little', signed=False) + adv_data = ' '.join(parts[9:]) + if not adv_data: + continue + hci_data = '04 3e ' + adv_data + direction = 1 + append_hci_to_bt_snoop_file(output_file, direction, hci_data, timestamp_us) + parsed_num += 1 except Exception as e: print(f'Exception: {e}') - if parsed_num > 0: - print(f'Log parsing completed, parsed_num {parsed_num}, all_line_num {all_line_num}.\nOutput saved to: {output_file}') + print( + f'Parsing completed, parsed_num {parsed_num}, all_line_num {all_line_num}.\nOutput saved to: {output_file}' + ) else: print('No data could be parsed.') def main() -> None: - # Define command-line arguments parser = argparse.ArgumentParser(description='Log Parsing Tool') parser.add_argument('-p', '--path', required=True, help='Path to the input log file') parser.add_argument('-o', '--output', required=True, help='Name tag for the output file') - - # Parse arguments args = parser.parse_args() input_path = args.path output_tag = args.output - - # Call the log parsing function parse_log(input_path, output_tag)