Merge branch 'feat/add_timestamp_v5.1' into 'release/v5.1'

fix(nimble): Enhanced HCI logging by adding timestamp information (v5.1)

See merge request espressif/esp-idf!40637
This commit is contained in:
Rahul Tank
2025-07-18 12:28:52 +05:30
2 changed files with 83 additions and 72 deletions

View File

@@ -10,6 +10,7 @@
#include "bt_common.h" #include "bt_common.h"
#include "osi/mutex.h" #include "osi/mutex.h"
#include "esp_attr.h" #include "esp_attr.h"
#include "esp_timer.h"
#if (BT_HCI_LOG_INCLUDED == TRUE) #if (BT_HCI_LOG_INCLUDED == TRUE)
#define BT_HCI_LOG_PRINT_TAG (1) #define BT_HCI_LOG_PRINT_TAG (1)
@@ -203,6 +204,8 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st
{ {
osi_mutex_t mutex_lock; osi_mutex_t mutex_lock;
uint8_t *g_hci_log_buffer; uint8_t *g_hci_log_buffer;
int64_t ts;
uint8_t *temp_buf;
if (!p_hci_log_ctl->p_hci_log_buffer) { if (!p_hci_log_ctl->p_hci_log_buffer) {
return ESP_FAIL; return ESP_FAIL;
@@ -214,6 +217,16 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st
return ESP_FAIL; return ESP_FAIL;
} }
ts = esp_timer_get_time();
temp_buf = (uint8_t *)malloc(data_len + 8);
memset(temp_buf, 0x0, data_len + 8);
memcpy(temp_buf, &ts, 8);
memcpy(temp_buf + 8, data, data_len);
data_len += 8;
mutex_lock = p_hci_log_ctl->mutex_lock; mutex_lock = p_hci_log_ctl->mutex_lock;
osi_mutex_lock(&mutex_lock, OSI_MUTEX_MAX_TIMEOUT); osi_mutex_lock(&mutex_lock, OSI_MUTEX_MAX_TIMEOUT);
@@ -245,7 +258,7 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st
bt_hci_log_record_string(p_hci_log_ctl, str); bt_hci_log_record_string(p_hci_log_ctl, str);
} }
bt_hci_log_record_hex(p_hci_log_ctl, data, data_len); bt_hci_log_record_hex(p_hci_log_ctl, temp_buf, data_len);
g_hci_log_buffer[p_hci_log_ctl->log_record_in] = '\n'; g_hci_log_buffer[p_hci_log_ctl->log_record_in] = '\n';
@@ -261,6 +274,8 @@ esp_err_t IRAM_ATTR bt_hci_log_record_data(bt_hci_log_t *p_hci_log_ctl, char *st
osi_mutex_unlock(&mutex_lock); osi_mutex_unlock(&mutex_lock);
free(temp_buf);
return ESP_OK; return ESP_OK;
} }

View File

@@ -1,13 +1,9 @@
# SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import argparse import argparse
import os import os
import re import re
import struct import struct
import time
parsed_num = 0
all_line_num = 0
def create_new_bt_snoop_file(filename: str) -> None: def create_new_bt_snoop_file(filename: str) -> None:
@@ -20,30 +16,19 @@ def create_new_bt_snoop_file(filename: str) -> None:
f.write(header) f.write(header)
def append_hci_to_bt_snoop_file(filename: str, direction: int, data: str) -> None: def append_hci_to_bt_snoop_file(filename: str, direction: int, data: str, timestamp_us: int) -> None:
if os.path.exists(filename): if os.path.exists(filename):
print(f'Appending to existing file: {filename}') print(f'Appending to existing file: {filename}')
else: else:
print(f'Creating new file: {filename}') print(f'Creating new file: {filename}')
create_new_bt_snoop_file(filename) create_new_bt_snoop_file(filename)
data_bytes = bytearray.fromhex(data)
# Ensure data is converted to bytearray from hex string with open(filename, 'ab') as f:
data_bytes = bytearray.fromhex(data) # Convert hex string to bytearray orig_len = incl_len = len(data_bytes)
packet_flags = direction + (data_bytes[0] != 1 and data_bytes[0] != 3) * 2
with open(filename, 'ab') as f: # 'ab' mode for appending binary data
global parsed_num
parsed_num += 1
data_len = len(data_bytes)
orig_len = data_len
incl_len = data_len
packet_flags = direction + (data_bytes[0] != 1 or data_bytes[0] != 3) * 2
cumulative_drops = 0 cumulative_drops = 0
# Calculate the timestamp with an offset from a predefined reference time(0x00DCDDB30F2F8000).
timestamp_us = int(round(time.time() * 1000000)) + 0x00DCDDB30F2F8000
# Writing structured data followed by the byte array data
f.write(struct.pack('>IIIIQ', orig_len, incl_len, packet_flags, cumulative_drops, timestamp_us)) f.write(struct.pack('>IIIIQ', orig_len, incl_len, packet_flags, cumulative_drops, timestamp_us))
f.write(data_bytes) # Write bytearray (binary data) to file f.write(data_bytes)
def log_data_clean(data: str) -> str: def log_data_clean(data: str) -> str:
@@ -51,82 +36,93 @@ def log_data_clean(data: str) -> str:
return cleaned return cleaned
def is_adv_report(data: str) -> bool:
is_binary = all(re.fullmatch(r'[0-9a-fA-F]{2}', part) for part in data.split())
return is_binary
def parse_log(input_path: str, output_tag: str) -> None: def parse_log(input_path: str, output_tag: str) -> None:
"""
Parses the specified log file and saves the results to an output file.
Args:
input_path (str): Path to the input log file.
output_tag (str): Name tag for the output file.
Returns:
None
"""
global parsed_num
global all_line_num
if not os.path.exists(input_path): if not os.path.exists(input_path):
print(f"Error: The file '{input_path}' does not exist.") print(f"Error: The file '{input_path}' does not exist.")
return return
# Define the output directory and create it if it doesn't exist
output_dir = './parsed_logs' output_dir = './parsed_logs'
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
# Define the output file name based on the tag
output_file = os.path.join(output_dir, f'parsed_log_{output_tag}.btsnoop.log') output_file = os.path.join(output_dir, f'parsed_log_{output_tag}.btsnoop.log')
parsed_num = 0
all_line_num = 0
with open(input_path, 'r', encoding='utf-8') as infile: with open(input_path, 'r', encoding='utf-8') as infile:
# Example: Parse each line and save processed results
for line in infile: for line in infile:
try: try:
all_line_num += 1 all_line_num += 1
line = log_data_clean(line) line = log_data_clean(line).strip()
line = line.replace('C:', '01 ') if not line:
line = line.replace('E:', '04 ')
line = line[3:]
if line[0] == 'H':
line = line.replace('H:', '02 ')
append_hci_to_bt_snoop_file(output_file, 0, line)
continue continue
if line[0] == 'D': parts = line.split()
line = line.replace('D:', '02 ') if len(parts) < 10:
append_hci_to_bt_snoop_file(output_file, 1, line)
continue continue
if line.startswith('01'): parts_wo_ln = parts[1:]
append_hci_to_bt_snoop_file(output_file, 0, line) # Check for literal in the first token
literal = None
if ':' in parts_wo_ln[0]:
literal_part, sep, ts_byte = parts_wo_ln[0].partition(':')
if sep == ':' and literal_part in ['C', 'D', 'E', 'H'] and len(ts_byte) == 2:
literal = literal_part + ':'
parts_wo_ln = [literal, ts_byte] + parts_wo_ln[1:]
else:
literal = None
if literal:
# Parse timestamp
try:
timestamp_bytes = bytes(int(b, 16) for b in parts_wo_ln[1:9])
except Exception:
continue continue
if line.startswith('04'): timestamp_us = int.from_bytes(timestamp_bytes, byteorder='little', signed=False)
append_hci_to_bt_snoop_file(output_file, 1, line) hci_data = ' '.join(parts_wo_ln[9:])
if not hci_data:
continue continue
if is_adv_report(line): # Determine indicator and direction
line = '04 3e ' + line if literal == 'C:':
append_hci_to_bt_snoop_file(output_file, 1, line) hci_data = '01 ' + hci_data
direction = 0
elif literal == 'E:':
hci_data = '04 ' + hci_data
direction = 1
elif literal == 'H:':
hci_data = '02 ' + hci_data
direction = 0
elif literal == 'D:':
hci_data = '02 ' + hci_data
direction = 1
else:
continue
append_hci_to_bt_snoop_file(output_file, direction, hci_data, timestamp_us)
parsed_num += 1
else:
# No literal: treat as advertising report
try:
timestamp_bytes = bytes(int(b, 16) for b in parts[1:9])
except Exception:
continue
timestamp_us = int.from_bytes(timestamp_bytes, byteorder='little', signed=False)
adv_data = ' '.join(parts[9:])
if not adv_data:
continue
hci_data = '04 3e ' + adv_data
direction = 1
append_hci_to_bt_snoop_file(output_file, direction, hci_data, timestamp_us)
parsed_num += 1
except Exception as e: except Exception as e:
print(f'Exception: {e}') print(f'Exception: {e}')
if parsed_num > 0: if parsed_num > 0:
print(f'Log parsing completed, parsed_num {parsed_num}, all_line_num {all_line_num}.\nOutput saved to: {output_file}') print(
f'Parsing completed, parsed_num {parsed_num}, all_line_num {all_line_num}.\nOutput saved to: {output_file}'
)
else: else:
print('No data could be parsed.') print('No data could be parsed.')
def main() -> None: def main() -> None:
# Define command-line arguments
parser = argparse.ArgumentParser(description='Log Parsing Tool') parser = argparse.ArgumentParser(description='Log Parsing Tool')
parser.add_argument('-p', '--path', required=True, help='Path to the input log file') parser.add_argument('-p', '--path', required=True, help='Path to the input log file')
parser.add_argument('-o', '--output', required=True, help='Name tag for the output file') parser.add_argument('-o', '--output', required=True, help='Name tag for the output file')
# Parse arguments
args = parser.parse_args() args = parser.parse_args()
input_path = args.path input_path = args.path
output_tag = args.output output_tag = args.output
# Call the log parsing function
parse_log(input_path, output_tag) parse_log(input_path, output_tag)