Merge branch 'mqtt_test_adjust' into 'master'

Improve mqtt publish connect tests

See merge request espressif/esp-idf!34889
This commit is contained in:
Rocha Euripedes
2024-12-05 22:06:02 +08:00
11 changed files with 161 additions and 116 deletions

View File

@ -10,6 +10,13 @@
- mbedtls
- protocol_examples_common
.mqtt_dependencies: &mqtt_dependencies
<<: *default_dependencies
depends_filepatterns:
- components/mqtt/**/*
depends_components+:
- mqtt
examples/protocols/esp_http_client:
<<: *default_dependencies
enable:
@ -134,51 +141,55 @@ examples/protocols/modbus:
- examples/common_components/protocol_examples_common/**/*
- examples/protocols/modbus/mb_example_common/**/*
examples/protocols/mqtt:
<<: *default_dependencies
depends_filepatterns:
- components/mqtt/**/*
examples/protocols/mqtt/custom_outbox:
<<: *default_dependencies
<<: *mqtt_dependencies
examples/protocols/mqtt/ssl:
<<: *default_dependencies
<<: *mqtt_dependencies
disable_test:
- if: IDF_TARGET != "esp32"
reason: only test on esp32
examples/protocols/mqtt/ssl_ds:
<<: *default_dependencies
<<: *mqtt_dependencies
disable:
- if: SOC_DIG_SIGN_SUPPORTED != 1
temporary: false
reason: DS not present
depends_filepatterns+:
- examples/protocols/mqtt/ssl_ds/**/*
examples/protocols/mqtt/tcp:
<<: *default_dependencies
<<: *mqtt_dependencies
disable_test:
- if: IDF_TARGET != "esp32"
reason: only test on esp32
depends_filepatterns+:
- examples/protocols/mqtt/tcp/**/*
examples/protocols/mqtt/ws:
<<: *default_dependencies
<<: *mqtt_dependencies
disable_test:
- if: IDF_TARGET != "esp32"
reason: only test on esp32
depends_filepatterns+:
- examples/protocols/mqtt/ws/**/*
examples/protocols/mqtt/wss:
<<: *default_dependencies
<<: *mqtt_dependencies
disable_test:
- if: IDF_TARGET != "esp32"
reason: only test on esp32
depends_filepatterns+:
- examples/protocols/mqtt/wss/**/*
examples/protocols/mqtt5:
<<: *default_dependencies
<<: *mqtt_dependencies
disable_test:
- if: IDF_TARGET != "esp32"
reason: only test on esp32
depends_filepatterns+:
- examples/protocols/mqtt5/**/*
examples/protocols/smtp_client:
<<: *default_dependencies

View File

@ -20,6 +20,9 @@ DEFAULT_TARGET_TEST_CHILD_PIPELINE_FILEPATH = os.path.join(IDF_PATH, 'target_tes
DEFAULT_BUILD_CHILD_PIPELINE_NAME = 'Build Child Pipeline'
DEFAULT_TARGET_TEST_CHILD_PIPELINE_NAME = 'Target Test Child Pipeline'
DEFAULT_TARGET_TEST_JOB_TEMPLATE_NAME = '.dynamic_target_test_template'
TIMEOUT_4H_TEMPLATE_NAME = '.timeout_4h_template'
TEST_RELATED_BUILD_JOB_NAME = 'build_test_related_apps'
NON_TEST_RELATED_BUILD_JOB_NAME = 'build_non_test_related_apps'

View File

@ -19,16 +19,19 @@ from dynamic_pipelines.constants import BUILD_ONLY_LABEL
from dynamic_pipelines.constants import DEFAULT_CASES_TEST_PER_JOB
from dynamic_pipelines.constants import DEFAULT_TARGET_TEST_CHILD_PIPELINE_FILEPATH
from dynamic_pipelines.constants import DEFAULT_TARGET_TEST_CHILD_PIPELINE_NAME
from dynamic_pipelines.constants import DEFAULT_TARGET_TEST_JOB_TEMPLATE_NAME
from dynamic_pipelines.constants import DEFAULT_TEST_PATHS
from dynamic_pipelines.constants import (
KNOWN_GENERATE_TEST_CHILD_PIPELINE_WARNINGS_FILEPATH,
)
from dynamic_pipelines.constants import TIMEOUT_4H_TEMPLATE_NAME
from dynamic_pipelines.models import EmptyJob
from dynamic_pipelines.models import Job
from dynamic_pipelines.models import TargetTestJob
from dynamic_pipelines.utils import dump_jobs_to_yaml
from idf_build_apps import App
from idf_ci.app import import_apps_from_txt
from idf_pytest.constants import TIMEOUT_4H_MARKERS
from idf_pytest.script import get_pytest_cases
@ -82,7 +85,13 @@ def get_target_test_jobs(
print('WARNING: excluding test cases with runner tags:', runner_tags)
continue
_extends = [DEFAULT_TARGET_TEST_JOB_TEMPLATE_NAME]
for timeout_4h_marker in TIMEOUT_4H_MARKERS:
if timeout_4h_marker in env_markers:
_extends.append(TIMEOUT_4H_TEMPLATE_NAME)
target_test_job = TargetTestJob(
extends=_extends,
name=f'{target_selector} - {",".join(env_markers)}',
tags=runner_tags,
parallel=len(cases) // DEFAULT_CASES_TEST_PER_JOB + 1,

View File

@ -94,3 +94,6 @@
- section_start "upload_junit_reports"
- run_cmd python tools/ci/artifacts_handler.py upload --type logs junit_reports
- section_end "upload_junit_reports"
.timeout_4h_template:
timeout: 4 hours

View File

@ -75,6 +75,7 @@ ENV_MARKERS = {
'twai_transceiver': 'runners with a TWAI PHY transceiver',
'flash_encryption_wifi_high_traffic': 'Flash Encryption runners with wifi high traffic support',
'ethernet': 'ethernet runner',
'ethernet_stress': 'ethernet runner with stress test',
'ethernet_flash_8m': 'ethernet runner with 8mb flash',
'ethernet_router': 'both the runner and dut connect to the same router through ethernet NIC',
'ethernet_vlan': 'ethernet runner GARM-32-SH-1-R16S5N3',
@ -127,6 +128,11 @@ ENV_MARKERS = {
'ram_app': 'ram_app runners',
}
# by default the timeout is 1h, for some special cases we need to extend it
TIMEOUT_4H_MARKERS = [
'ethernet_stress',
]
DEFAULT_CONFIG_RULES_STR = ['sdkconfig.ci=default', 'sdkconfig.ci.*=', '=default']
DEFAULT_IGNORE_WARNING_FILEPATH = os.path.join(IDF_PATH, 'tools', 'ci', 'ignore_build_warnings.txt')
DEFAULT_BUILD_TEST_RULES_FILEPATH = os.path.join(IDF_PATH, '.gitlab', 'ci', 'default-build-test-rules.yml')

View File

@ -16,7 +16,7 @@ tools/test_apps/protocols/mqtt/publish_connect_test:
temporary: true
reason: lack of runners
depends_components:
- esp_eth
- mqtt
depends_filepatterns:
- tools/ci/python_packages/common_test_methods.py
- examples/common_components/**/*

View File

@ -24,18 +24,6 @@ menu "Example Configuration"
help
URL of an mqtt broker for wss transport
config EXAMPLE_PUBLISH_TOPIC
string "publish topic"
default "/topic/publish/esp2py"
help
topic to which esp32 client publishes
config EXAMPLE_SUBSCRIBE_TOPIC
string "subscribe topic"
default "/topic/subscribe/py2esp"
help
topic to which esp32 client subscribes (and expects data)
config EXAMPLE_BROKER_CERTIFICATE_OVERRIDE
string "Broker certificate override"
default ""

View File

@ -152,6 +152,8 @@ static int do_publish_setup(int argc, char **argv) {
command_context.data = calloc(1, sizeof(publish_context_t));
((publish_context_t*)command_context.data)->pattern = strdup(*publish_setup_args.pattern->sval);
((publish_context_t*)command_context.data)->pattern_repetitions = *publish_setup_args.pattern_repetitions->ival;
((publish_context_t*)command_context.data)->subscribe_to = strdup(*publish_setup_args.subscribe_to->sval);
((publish_context_t*)command_context.data)->publish_to = strdup(*publish_setup_args.publish_to->sval);
publish_setup(&command_context, *publish_setup_args.transport->sval);
return 0;
}
@ -210,6 +212,8 @@ void register_common_commands(void) {
}
void register_publish_commands(void) {
publish_setup_args.transport = arg_str1(NULL,NULL,"<transport>", "Selected transport to test");
publish_setup_args.publish_to = arg_str1(NULL,NULL,"<transport>", "Selected publish_to to publish");
publish_setup_args.subscribe_to = arg_str1(NULL,NULL,"<transport>", "Selected subscribe_to to publish");
publish_setup_args.pattern = arg_str1(NULL,NULL,"<pattern>", "Message pattern repeated to build big messages");
publish_setup_args.pattern_repetitions = arg_int1(NULL,NULL,"<pattern repetitions>", "How many times the pattern is repeated");
publish_setup_args.end = arg_end(1);
@ -220,7 +224,7 @@ void register_publish_commands(void) {
publish_args.end = arg_end(1);
const esp_console_cmd_t publish_setup = {
.command = "publish_setup",
.help = "Run publish test\n",
.help = "Set publish test parameters\n",
.hint = NULL,
.func = &do_publish_setup,
.argtable = &publish_setup_args

View File

@ -17,6 +17,8 @@ typedef struct {
typedef struct {
transport_t selected_transport;
char *pattern;
char *subscribe_to;
char *publish_to;
int pattern_repetitions;
int qos;
char *expected;
@ -41,6 +43,8 @@ typedef struct {
typedef struct {
struct arg_str *transport;
struct arg_str *subscribe_to;
struct arg_str *publish_to;
struct arg_str *pattern;
struct arg_int *pattern_repetitions;
struct arg_end *end;

View File

@ -14,6 +14,7 @@
#include "freertos/FreeRTOS.h"
#include <freertos/event_groups.h>
#include "esp_system.h"
#include "esp_random.h"
#include "esp_log.h"
#include "mqtt_client.h"
@ -24,7 +25,7 @@ static const char *TAG = "publish_test";
static EventGroupHandle_t mqtt_event_group;
const static int CONNECTED_BIT = BIT0;
#define CLIENT_ID_SUFFIX_SIZE 12
#if CONFIG_EXAMPLE_BROKER_CERTIFICATE_OVERRIDDEN == 1
static const uint8_t mqtt_eclipseprojects_io_pem_start[] = "-----BEGIN CERTIFICATE-----\n" CONFIG_EXAMPLE_BROKER_CERTIFICATE_OVERRIDE "\n-----END CERTIFICATE-----";
#else
@ -45,8 +46,8 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
xEventGroupSetBits(mqtt_event_group, CONNECTED_BIT);
msg_id = esp_mqtt_client_subscribe(client, CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, test_data->qos);
ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, msg_id);
msg_id = esp_mqtt_client_subscribe(client, test_data->subscribe_to, test_data->qos);
ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", test_data->subscribe_to, msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
@ -101,8 +102,6 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
}
}
void test_init(void)
{
ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size());
@ -169,6 +168,10 @@ static void configure_client(command_context_t * ctx, const char *transport)
ESP_LOGI(TAG, "Set certificate");
config.broker.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start;
}
// Generate a random client id for each iteration
char client_id[CLIENT_ID_SUFFIX_SIZE] = {0};
snprintf(client_id, sizeof(client_id), "esp32-%08X", esp_random());
config.credentials.client_id = client_id;
esp_mqtt_set_config(ctx->mqtt_client, &config);
}
}
@ -200,9 +203,9 @@ void publish_test(command_context_t * ctx, int expect_to_publish, int qos, bool
for (int i = 0; i < data->nr_of_msg_expected; i++) {
int msg_id;
if (enqueue) {
msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0, true);
msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0, true);
} else {
msg_id = esp_mqtt_client_publish(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0);
msg_id = esp_mqtt_client_publish(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0);
if(msg_id < 0) {
ESP_LOGE(TAG, "Failed to publish");
break;

View File

@ -29,24 +29,30 @@ DEFAULT_MSG_SIZE = 16
# Publisher class creating a python client to send/receive published data from esp-mqtt client
class MqttPublisher(mqtt.Client):
def __init__(self, repeat, published, publish_cfg, log_details=False): # type: (MqttPublisher, int, int, dict, bool) -> None
self.sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE))
def __init__(self, config, log_details=False): # type: (MqttPublisher, dict, bool) -> None
self.log_details = log_details
self.repeat = repeat
self.publish_cfg = publish_cfg
self.expected_data = f'{self.sample_string * self.repeat}'
self.published = published
self.config = config
self.expected_data = f'{config["pattern"] * config["scenario"]["msg_len"]}'
self.received = 0
self.subscribe_mid = 0
self.lock = Lock()
self.event_client_connected = Event()
self.event_client_subscribed = Event()
self.event_client_got_all = Event()
transport = 'websockets' if self.publish_cfg['transport'] in ['ws', 'wss'] else 'tcp'
super().__init__('MqttTestRunner', userdata=0, transport=transport)
transport = 'websockets' if self.config['transport'] in ['ws', 'wss'] else 'tcp'
client_id = 'MqttTestRunner' + ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(5))
super().__init__(client_id, userdata=0, transport=transport)
def print_details(self, text): # type: (str) -> None
if self.log_details:
logging.info(text)
def on_subscribe(self, client: Any, userdata: Any, mid: Any, granted_qos: Any) -> None:
"""Verify successful subscription."""
if mid == self.subscribe_mid:
logging.info(f'Subscribed to {self.config["subscribe_topic"]} successfully with QoS: {granted_qos}')
self.event_client_subscribed.set()
def on_connect(self, mqttc: Any, obj: Any, flags: Any, rc:int) -> None:
self.event_client_connected.set()
@ -56,31 +62,29 @@ class MqttPublisher(mqtt.Client):
def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
payload = msg.payload.decode('utf-8')
if payload == self.expected_data:
userdata += 1
self.user_data_set(userdata)
self.received = userdata
if userdata == self.published:
self.received += 1
if self.received == self.config['scenario']['nr_of_msgs']:
self.event_client_got_all.set()
else:
differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data))))
logging.error(f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:'
f'{len(self.expected_data)}')
logging.info(f'Repetitions: {payload.count(self.sample_string)}')
logging.info(f'Pattern: {self.sample_string}')
logging.info(f'First : {payload[:DEFAULT_MSG_SIZE]}')
logging.info(f'Last : {payload[-DEFAULT_MSG_SIZE:]}')
logging.info(f'Repetitions: {payload.count(self.config["pattern"])}')
logging.info(f'Pattern: {self.config["pattern"]}')
logging.info(f'First: {payload[:DEFAULT_MSG_SIZE]}')
logging.info(f'Last: {payload[-DEFAULT_MSG_SIZE:]}')
matcher = difflib.SequenceMatcher(a=payload, b=self.expected_data)
for match in matcher.get_matching_blocks():
logging.info(f'Match: {match}')
def __enter__(self) -> Any:
qos = self.publish_cfg['qos']
broker_host = self.publish_cfg['broker_host_' + self.publish_cfg['transport']]
broker_port = self.publish_cfg['broker_port_' + self.publish_cfg['transport']]
qos = self.config['qos']
broker_host = self.config['broker_host_' + self.config['transport']]
broker_port = self.config['broker_port_' + self.config['transport']]
try:
self.print_details('Connecting...')
if self.publish_cfg['transport'] in ['ssl', 'wss']:
if self.config['transport'] in ['ssl', 'wss']:
self.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
self.tls_insecure_set(True)
self.event_client_connected.clear()
@ -94,7 +98,8 @@ class MqttPublisher(mqtt.Client):
if not self.event_client_connected.wait(timeout=30):
raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}')
self.event_client_got_all.clear()
self.subscribe(self.publish_cfg['subscribe_topic'], qos)
result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos)
assert result == 0
return self
def __exit__(self, exc_type, exc_value, traceback): # type: (MqttPublisher, str, str, dict) -> None
@ -102,38 +107,47 @@ class MqttPublisher(mqtt.Client):
self.loop_stop()
def get_configurations(dut: Dut) -> Dict[str,Any]:
def get_configurations(dut: Dut, test_case: Any) -> Dict[str,Any]:
publish_cfg = {}
try:
@no_type_check
def get_broker_from_dut(dut, config_option):
def get_config_from_dut(dut, config_option):
# logging.info('Option:', config_option, dut.app.sdkconfig.get(config_option))
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get(config_option))
if value is None:
return None, None
return value.group(1), int(value.group(2))
# Get publish test configuration
publish_cfg['publish_topic'] = dut.app.sdkconfig.get('EXAMPLE_SUBSCRIBE_TOPIC').replace('"','')
publish_cfg['subscribe_topic'] = dut.app.sdkconfig.get('EXAMPLE_PUBLISH_TOPIC').replace('"','')
publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI')
publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI')
publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WS_URI')
publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI')
publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI')
publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI')
publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WS_URI')
publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI')
except Exception:
logging.info('ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig')
raise
transport, qos, enqueue, scenario = test_case
if publish_cfg['broker_host_' + transport] is None:
pytest.skip(f'Skipping transport: {transport}...')
publish_cfg['scenario'] = scenario
publish_cfg['qos'] = qos
publish_cfg['enqueue'] = enqueue
publish_cfg['transport'] = transport
publish_cfg['pattern'] = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE))
publish_cfg['test_timeout'] = get_timeout(test_case)
unique_topic = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE))
publish_cfg['subscribe_topic'] = 'test/subscribe_to/' + unique_topic
publish_cfg['publish_topic'] = 'test/subscribe_to/' + unique_topic
logging.info(f'configuration: {publish_cfg}')
return publish_cfg
@contextlib.contextmanager
def connected_and_subscribed(dut:Dut, transport:str, pattern:str, pattern_repetitions:int) -> Any:
dut.write(f'publish_setup {transport} {pattern} {pattern_repetitions}')
dut.write(f'start')
def connected_and_subscribed(dut:Dut) -> Any:
dut.write('start')
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60)
yield
dut.write(f'stop')
dut.write('stop')
def get_scenarios() -> List[Dict[str, int]]:
@ -147,112 +161,112 @@ def get_scenarios() -> List[Dict[str, int]]:
continue
break
if not scenarios: # No message sizes present in the env - set defaults
scenarios = [{'len':0, 'repeat':5}, # zero-sized messages
{'len':2, 'repeat':5}, # short messages
{'len':200, 'repeat':3}, # long messages
scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages
{'msg_len':2, 'nr_of_msgs':5}, # short messages
{'msg_len':200, 'nr_of_msgs':3}, # long messages
]
return scenarios
def get_timeout(test_case: Any) -> int:
transport, qos, enqueue, scenario = test_case
if transport in ['ws', 'wss'] or qos == 2:
return 120
return 60
timeout = int(scenario['nr_of_msgs'] * 20)
if qos == 1:
timeout += 30
if qos == 2:
timeout += 45
if transport in ['ws', 'wss']:
timeout += 30
return timeout
def run_publish_test_case(dut: Dut, test_case: Any, publish_cfg: Any) -> None:
transport, qos, enqueue, scenario = test_case
if publish_cfg['broker_host_' + transport] is None:
pytest.skip(f'Skipping transport: {transport}...')
repeat = scenario['len']
published = scenario['repeat']
publish_cfg['qos'] = qos
publish_cfg['queue'] = enqueue
publish_cfg['transport'] = transport
test_timeout = get_timeout(test_case)
logging.info(f'Starting Publish test: transport:{transport}, qos:{qos}, nr_of_msgs:{published},'
f' msg_size:{repeat*DEFAULT_MSG_SIZE}, enqueue:{enqueue}')
with MqttPublisher(repeat, published, publish_cfg) as publisher, connected_and_subscribed(dut, transport, publisher.sample_string, scenario['len']):
def run_publish_test_case(dut: Dut, config: Any) -> None:
logging.info(f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},'
f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},'
f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}')
dut.write(f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}')
with MqttPublisher(config) as publisher, connected_and_subscribed(dut):
assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe'
msgs_published: List[mqtt.MQTTMessageInfo] = []
dut.write(f'publish {publisher.published} {qos} {enqueue}')
assert publisher.event_client_got_all.wait(timeout=test_timeout), (f'Not all data received from ESP32: {transport} '
f'qos={qos} received: {publisher.received} '
f'expected: {publisher.published}')
dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}')
assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (f'Not all data received from ESP32: {config["transport"]} '
f'qos={config["qos"]} received: {publisher.received} '
f'expected: {config["scenario"]["nr_of_msgs"]}')
logging.info(' - all data received from ESP32')
payload = publisher.sample_string * publisher.repeat
for _ in range(publisher.published):
payload = config['pattern'] * config['scenario']['msg_len']
for _ in range(config['scenario']['nr_of_msgs']):
with publisher.lock:
msg = publisher.publish(topic=publisher.publish_cfg['publish_topic'], payload=payload, qos=qos)
if qos > 0:
msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos'])
if config['qos'] > 0:
msgs_published.append(msg)
logging.info(f'Published: {len(msgs_published)}')
while msgs_published:
msgs_published = [msg for msg in msgs_published if msg.is_published()]
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
logging.info('All messages from runner published')
try:
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=test_timeout)
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout'])
except pexpect.exceptions.ExceptionPexpect:
dut.write(f'publish_report')
dut.write('publish_report')
dut.expect(re.compile(rb'Test Report'), timeout=30)
raise
logging.info('ESP32 received all data from runner')
stress_scenarios = [{'len':20, 'repeat':50}] # many medium sized
stress_scenarios = [{'msg_len':20, 'nr_of_msgs':30}] # many medium sized
transport_cases = ['tcp', 'ws', 'wss', 'ssl']
qos_cases = [0, 1, 2]
enqueue_cases = [0, 1]
local_broker_supported_transports = ['tcp']
local_broker_scenarios = [{'len':0, 'repeat':5}, # zero-sized messages
{'len':5, 'repeat':20}, # short messages
{'len':500, 'repeat':10}, # long messages
{'len':20, 'repeat':20}] # many medium sized
local_broker_scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages
{'msg_len':5, 'nr_of_msgs':20}, # short messages
{'msg_len':500, 'nr_of_msgs':10}, # long messages
{'msg_len':20, 'nr_of_msgs':20}] # many medium sized
def make_cases(scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]:
return [test_case for test_case in product(transport_cases, qos_cases, enqueue_cases, scenarios)]
def make_cases(transport: Any, scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]:
return [test_case for test_case in product(transport, qos_cases, enqueue_cases, scenarios)]
test_cases = make_cases(get_scenarios())
stress_test_cases = make_cases(stress_scenarios)
test_cases = make_cases(transport_cases, get_scenarios())
stress_test_cases = make_cases(transport_cases, stress_scenarios)
@pytest.mark.esp32
@pytest.mark.ethernet
@pytest.mark.nightly_run
@pytest.mark.parametrize('test_case', test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut)
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.confirm_write('init', expect_pattern='init', timeout=30)
run_publish_test_case(dut, test_case, publish_cfg)
run_publish_test_case(dut, publish_cfg)
@pytest.mark.esp32
@pytest.mark.ethernet
@pytest.mark.ethernet_stress
@pytest.mark.nightly_run
@pytest.mark.parametrize('test_case', stress_test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut)
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.write('init')
run_publish_test_case(dut, test_case, publish_cfg)
run_publish_test_case(dut, publish_cfg)
@pytest.mark.esp32
@pytest.mark.ethernet
@pytest.mark.parametrize('test_case', make_cases(local_broker_scenarios))
@pytest.mark.parametrize('test_case', make_cases(local_broker_supported_transports, local_broker_scenarios))
@pytest.mark.parametrize('config', ['local_broker'], indirect=True)
def test_mqtt_publish_lcoal(dut: Dut, test_case: Any) -> None:
def test_mqtt_publish_local(dut: Dut, test_case: Any) -> None:
if test_case[0] not in local_broker_supported_transports:
pytest.skip(f'Skipping transport: {test_case[0]}...')
dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1)
publish_cfg = get_configurations(dut)
publish_cfg = get_configurations(dut, test_case)
publish_cfg['broker_host_tcp'] = dut_ip
publish_cfg['broker_port_tcp'] = 1234
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.confirm_write('init', expect_pattern='init', timeout=30)
run_publish_test_case(dut, test_case, publish_cfg)
run_publish_test_case(dut, publish_cfg)