change: Makes topic in mqtt publish test unique for case

- Makes each case to subscribe/publish to unique topics
- Makes test to wait for the successful subscription on the runner
This commit is contained in:
Euripedes Rocha
2024-11-28 17:35:33 +01:00
parent eea2fdf9a9
commit 589c800634
5 changed files with 98 additions and 95 deletions

View File

@@ -24,18 +24,6 @@ menu "Example Configuration"
help
URL of an mqtt broker for wss transport
config EXAMPLE_PUBLISH_TOPIC
string "publish topic"
default "/topic/publish/esp2py"
help
topic to which esp32 client publishes
config EXAMPLE_SUBSCRIBE_TOPIC
string "subscribe topic"
default "/topic/subscribe/py2esp"
help
topic to which esp32 client subscribes (and expects data)
config EXAMPLE_BROKER_CERTIFICATE_OVERRIDE
string "Broker certificate override"
default ""

View File

@@ -152,6 +152,8 @@ static int do_publish_setup(int argc, char **argv) {
command_context.data = calloc(1, sizeof(publish_context_t));
((publish_context_t*)command_context.data)->pattern = strdup(*publish_setup_args.pattern->sval);
((publish_context_t*)command_context.data)->pattern_repetitions = *publish_setup_args.pattern_repetitions->ival;
((publish_context_t*)command_context.data)->subscribe_to = strdup(*publish_setup_args.subscribe_to->sval);
((publish_context_t*)command_context.data)->publish_to = strdup(*publish_setup_args.publish_to->sval);
publish_setup(&command_context, *publish_setup_args.transport->sval);
return 0;
}
@@ -210,6 +212,8 @@ void register_common_commands(void) {
}
void register_publish_commands(void) {
publish_setup_args.transport = arg_str1(NULL,NULL,"<transport>", "Selected transport to test");
publish_setup_args.publish_to = arg_str1(NULL,NULL,"<transport>", "Selected publish_to to publish");
publish_setup_args.subscribe_to = arg_str1(NULL,NULL,"<transport>", "Selected subscribe_to to publish");
publish_setup_args.pattern = arg_str1(NULL,NULL,"<pattern>", "Message pattern repeated to build big messages");
publish_setup_args.pattern_repetitions = arg_int1(NULL,NULL,"<pattern repetitions>", "How many times the pattern is repeated");
publish_setup_args.end = arg_end(1);
@@ -220,7 +224,7 @@ void register_publish_commands(void) {
publish_args.end = arg_end(1);
const esp_console_cmd_t publish_setup = {
.command = "publish_setup",
.help = "Run publish test\n",
.help = "Set publish test parameters\n",
.hint = NULL,
.func = &do_publish_setup,
.argtable = &publish_setup_args

View File

@@ -17,6 +17,8 @@ typedef struct {
typedef struct {
transport_t selected_transport;
char *pattern;
char *subscribe_to;
char *publish_to;
int pattern_repetitions;
int qos;
char *expected;
@@ -41,6 +43,8 @@ typedef struct {
typedef struct {
struct arg_str *transport;
struct arg_str *subscribe_to;
struct arg_str *publish_to;
struct arg_str *pattern;
struct arg_int *pattern_repetitions;
struct arg_end *end;

View File

@@ -46,8 +46,8 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
xEventGroupSetBits(mqtt_event_group, CONNECTED_BIT);
msg_id = esp_mqtt_client_subscribe(client, CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, test_data->qos);
ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, msg_id);
msg_id = esp_mqtt_client_subscribe(client, test_data->subscribe_to, test_data->qos);
ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", test_data->subscribe_to, msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
@@ -203,9 +203,9 @@ void publish_test(command_context_t * ctx, int expect_to_publish, int qos, bool
for (int i = 0; i < data->nr_of_msg_expected; i++) {
int msg_id;
if (enqueue) {
msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0, true);
msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0, true);
} else {
msg_id = esp_mqtt_client_publish(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0);
msg_id = esp_mqtt_client_publish(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0);
if(msg_id < 0) {
ESP_LOGE(TAG, "Failed to publish");
break;

View File

@@ -1,6 +1,5 @@
# SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import contextlib
import difflib
import logging
@@ -30,18 +29,17 @@ DEFAULT_MSG_SIZE = 16
# Publisher class creating a python client to send/receive published data from esp-mqtt client
class MqttPublisher(mqtt.Client):
def __init__(self, repeat, published, publish_cfg, log_details=False): # type: (MqttPublisher, int, int, dict, bool) -> None
self.sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE))
def __init__(self, config, log_details=False): # type: (MqttPublisher, dict, bool) -> None
self.log_details = log_details
self.repeat = repeat
self.publish_cfg = publish_cfg
self.expected_data = f'{self.sample_string * self.repeat}'
self.published = published
self.config = config
self.expected_data = f'{config["pattern"] * config["scenario"]["msg_len"]}'
self.received = 0
self.subscribe_mid = 0
self.lock = Lock()
self.event_client_connected = Event()
self.event_client_subscribed = Event()
self.event_client_got_all = Event()
transport = 'websockets' if self.publish_cfg['transport'] in ['ws', 'wss'] else 'tcp'
transport = 'websockets' if self.config['transport'] in ['ws', 'wss'] else 'tcp'
client_id = 'MqttTestRunner' + ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(5))
super().__init__(client_id, userdata=0, transport=transport)
@@ -49,6 +47,12 @@ class MqttPublisher(mqtt.Client):
if self.log_details:
logging.info(text)
def on_subscribe(self, client: Any, userdata: Any, mid: Any, granted_qos: Any) -> None:
"""Verify successful subscription."""
if mid == self.subscribe_mid:
logging.info(f'Subscribed to {self.config["subscribe_topic"]} successfully with QoS: {granted_qos}')
self.event_client_subscribed.set()
def on_connect(self, mqttc: Any, obj: Any, flags: Any, rc:int) -> None:
self.event_client_connected.set()
@@ -58,17 +62,15 @@ class MqttPublisher(mqtt.Client):
def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
payload = msg.payload.decode('utf-8')
if payload == self.expected_data:
userdata += 1
self.user_data_set(userdata)
self.received = userdata
if userdata == self.published:
self.received += 1
if self.received == self.config['scenario']['nr_of_msgs']:
self.event_client_got_all.set()
else:
differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data))))
logging.error(f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:'
f'{len(self.expected_data)}')
logging.info(f'Repetitions: {payload.count(self.sample_string)}')
logging.info(f'Pattern: {self.sample_string}')
logging.info(f'Repetitions: {payload.count(self.config["pattern"])}')
logging.info(f'Pattern: {self.config["pattern"]}')
logging.info(f'First: {payload[:DEFAULT_MSG_SIZE]}')
logging.info(f'Last: {payload[-DEFAULT_MSG_SIZE:]}')
matcher = difflib.SequenceMatcher(a=payload, b=self.expected_data)
@@ -76,13 +78,13 @@ class MqttPublisher(mqtt.Client):
logging.info(f'Match: {match}')
def __enter__(self) -> Any:
qos = self.publish_cfg['qos']
broker_host = self.publish_cfg['broker_host_' + self.publish_cfg['transport']]
broker_port = self.publish_cfg['broker_port_' + self.publish_cfg['transport']]
qos = self.config['qos']
broker_host = self.config['broker_host_' + self.config['transport']]
broker_port = self.config['broker_port_' + self.config['transport']]
try:
self.print_details('Connecting...')
if self.publish_cfg['transport'] in ['ssl', 'wss']:
if self.config['transport'] in ['ssl', 'wss']:
self.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
self.tls_insecure_set(True)
self.event_client_connected.clear()
@@ -96,7 +98,8 @@ class MqttPublisher(mqtt.Client):
if not self.event_client_connected.wait(timeout=30):
raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}')
self.event_client_got_all.clear()
self.subscribe(self.publish_cfg['subscribe_topic'], qos)
result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos)
assert result == 0
return self
def __exit__(self, exc_type, exc_value, traceback): # type: (MqttPublisher, str, str, dict) -> None
@@ -104,38 +107,47 @@ class MqttPublisher(mqtt.Client):
self.loop_stop()
def get_configurations(dut: Dut) -> Dict[str,Any]:
def get_configurations(dut: Dut, test_case: Any) -> Dict[str,Any]:
publish_cfg = {}
try:
@no_type_check
def get_broker_from_dut(dut, config_option):
def get_config_from_dut(dut, config_option):
# logging.info('Option:', config_option, dut.app.sdkconfig.get(config_option))
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get(config_option))
if value is None:
return None, None
return value.group(1), int(value.group(2))
# Get publish test configuration
publish_cfg['publish_topic'] = dut.app.sdkconfig.get('EXAMPLE_SUBSCRIBE_TOPIC').replace('"','')
publish_cfg['subscribe_topic'] = dut.app.sdkconfig.get('EXAMPLE_PUBLISH_TOPIC').replace('"','')
publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI')
publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI')
publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WS_URI')
publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI')
publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI')
publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI')
publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WS_URI')
publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI')
except Exception:
logging.info('ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig')
raise
transport, qos, enqueue, scenario = test_case
if publish_cfg['broker_host_' + transport] is None:
pytest.skip(f'Skipping transport: {transport}...')
publish_cfg['scenario'] = scenario
publish_cfg['qos'] = qos
publish_cfg['enqueue'] = enqueue
publish_cfg['transport'] = transport
publish_cfg['pattern'] = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE))
publish_cfg['test_timeout'] = get_timeout(test_case)
unique_topic = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE))
publish_cfg['subscribe_topic'] = 'test/subscribe_to/' + unique_topic
publish_cfg['publish_topic'] = 'test/subscribe_to/' + unique_topic
logging.info(f'configuration: {publish_cfg}')
return publish_cfg
@contextlib.contextmanager
def connected_and_subscribed(dut:Dut, transport:str, pattern:str, pattern_repetitions:int) -> Any:
dut.write(f'publish_setup {transport} {pattern} {pattern_repetitions}')
dut.write(f'start')
def connected_and_subscribed(dut:Dut) -> Any:
dut.write('start')
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60)
yield
dut.write(f'stop')
dut.write('stop')
def get_scenarios() -> List[Dict[str, int]]:
@@ -149,16 +161,16 @@ def get_scenarios() -> List[Dict[str, int]]:
continue
break
if not scenarios: # No message sizes present in the env - set defaults
scenarios = [{'len':0, 'repeat':5}, # zero-sized messages
{'len':2, 'repeat':5}, # short messages
{'len':200, 'repeat':3}, # long messages
scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages
{'msg_len':2, 'nr_of_msgs':5}, # short messages
{'msg_len':200, 'nr_of_msgs':3}, # long messages
]
return scenarios
def get_timeout(test_case: Any) -> int:
transport, qos, enqueue, scenario = test_case
timeout = int(scenario['repeat'] * 10)
timeout = int(scenario['nr_of_msgs'] * 20)
if qos == 1:
timeout += 30
if qos == 2:
@@ -168,37 +180,33 @@ def get_timeout(test_case: Any) -> int:
return timeout
def run_publish_test_case(dut: Dut, test_case: Any, publish_cfg: Any) -> None:
transport, qos, enqueue, scenario = test_case
if publish_cfg['broker_host_' + transport] is None:
pytest.skip(f'Skipping transport: {transport}...')
repeat = scenario['len']
published = scenario['repeat']
publish_cfg['qos'] = qos
publish_cfg['queue'] = enqueue
publish_cfg['transport'] = transport
test_timeout = get_timeout(test_case)
logging.info(f'Starting Publish test: transport:{transport}, qos:{qos}, nr_of_msgs:{published},'
f' msg_size:{repeat * DEFAULT_MSG_SIZE}, enqueue:{enqueue}')
with MqttPublisher(repeat, published, publish_cfg) as publisher, connected_and_subscribed(dut, transport, publisher.sample_string, scenario['len']):
def run_publish_test_case(dut: Dut, config: Any) -> None:
logging.info(f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},'
f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},'
f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}')
dut.write(f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}')
with MqttPublisher(config) as publisher, connected_and_subscribed(dut):
assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe'
msgs_published: List[mqtt.MQTTMessageInfo] = []
dut.write(f'publish {publisher.published} {qos} {enqueue}')
assert publisher.event_client_got_all.wait(timeout=test_timeout), (f'Not all data received from ESP32: {transport} '
f'qos={qos} received: {publisher.received} '
f'expected: {publisher.published}')
dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}')
assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (f'Not all data received from ESP32: {config["transport"]} '
f'qos={config["qos"]} received: {publisher.received} '
f'expected: {config["scenario"]["nr_of_msgs"]}')
logging.info(' - all data received from ESP32')
payload = publisher.sample_string * publisher.repeat
for _ in range(publisher.published):
payload = config['pattern'] * config['scenario']['msg_len']
for _ in range(config['scenario']['nr_of_msgs']):
with publisher.lock:
msg = publisher.publish(topic=publisher.publish_cfg['publish_topic'], payload=payload, qos=qos)
if qos > 0:
msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos'])
if config['qos'] > 0:
msgs_published.append(msg)
logging.info(f'Published: {len(msgs_published)}')
while msgs_published:
msgs_published = [msg for msg in msgs_published if msg.is_published()]
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
logging.info('All messages from runner published')
try:
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=test_timeout)
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout'])
except pexpect.exceptions.ExceptionPexpect:
dut.write('publish_report')
dut.expect(re.compile(rb'Test Report'), timeout=30)
@@ -206,35 +214,34 @@ def run_publish_test_case(dut: Dut, test_case: Any, publish_cfg: Any) -> None:
logging.info('ESP32 received all data from runner')
stress_scenarios = [{'len':20, 'repeat':30}] # many medium sized
stress_scenarios = [{'msg_len':20, 'nr_of_msgs':30}] # many medium sized
transport_cases = ['tcp', 'ws', 'wss', 'ssl']
qos_cases = [0, 1, 2]
enqueue_cases = [0, 1]
local_broker_supported_transports = ['tcp']
local_broker_scenarios = [{'len':0, 'repeat':5}, # zero-sized messages
{'len':5, 'repeat':20}, # short messages
{'len':500, 'repeat':10}, # long messages
{'len':20, 'repeat':20}] # many medium sized
local_broker_scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages
{'msg_len':5, 'nr_of_msgs':20}, # short messages
{'msg_len':500, 'nr_of_msgs':10}, # long messages
{'msg_len':20, 'nr_of_msgs':20}] # many medium sized
def make_cases(scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]:
return [test_case for test_case in product(transport_cases, qos_cases, enqueue_cases, scenarios)]
def make_cases(transport: Any, scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]:
return [test_case for test_case in product(transport, qos_cases, enqueue_cases, scenarios)]
test_cases = make_cases(get_scenarios())
stress_test_cases = make_cases(stress_scenarios)
test_cases = make_cases(transport_cases, get_scenarios())
stress_test_cases = make_cases(transport_cases, stress_scenarios)
@pytest.mark.esp32
@pytest.mark.ethernet
@pytest.mark.nightly_run
@pytest.mark.parametrize('test_case', test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut)
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.confirm_write('init', expect_pattern='init', timeout=30)
run_publish_test_case(dut, test_case, publish_cfg)
run_publish_test_case(dut, publish_cfg)
@pytest.mark.esp32
@@ -243,23 +250,23 @@ def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
@pytest.mark.parametrize('test_case', stress_test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut)
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.write('init')
run_publish_test_case(dut, test_case, publish_cfg)
run_publish_test_case(dut, publish_cfg)
@pytest.mark.esp32
@pytest.mark.ethernet
@pytest.mark.parametrize('test_case', make_cases(local_broker_scenarios))
@pytest.mark.parametrize('test_case', make_cases(local_broker_supported_transports, local_broker_scenarios))
@pytest.mark.parametrize('config', ['local_broker'], indirect=True)
def test_mqtt_publish_lcoal(dut: Dut, test_case: Any) -> None:
def test_mqtt_publish_local(dut: Dut, test_case: Any) -> None:
if test_case[0] not in local_broker_supported_transports:
pytest.skip(f'Skipping transport: {test_case[0]}...')
dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1)
publish_cfg = get_configurations(dut)
publish_cfg = get_configurations(dut, test_case)
publish_cfg['broker_host_tcp'] = dut_ip
publish_cfg['broker_port_tcp'] = 1234
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.confirm_write('init', expect_pattern='init', timeout=30)
run_publish_test_case(dut, test_case, publish_cfg)
run_publish_test_case(dut, publish_cfg)