From 9de81ed2e2e335758160b3927f3a61b0f4c244fe Mon Sep 17 00:00:00 2001 From: David Cermak Date: Wed, 26 Jan 2022 16:38:31 +0100 Subject: [PATCH 1/2] ci/mqtt: Fix weekend test publish-connect on target * Fix thread safe issue in paho-mqtt library * Move the weekend test to ethernet runner --- .gitlab/ci/target-test.yml | 2 +- .../mqtt/publish_connect_test/app_test.py | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/.gitlab/ci/target-test.yml b/.gitlab/ci/target-test.yml index 17932ef21e..1fd72b5118 100644 --- a/.gitlab/ci/target-test.yml +++ b/.gitlab/ci/target-test.yml @@ -223,7 +223,7 @@ test_weekend_mqtt: - .rules:labels:weekend_test tags: - ESP32 - - Example_WIFI + - Example_EthKitV1 script: - export MQTT_PUBLISH_TEST=1 - export TEST_PATH=$CI_PROJECT_DIR/tools/test_apps/protocols/mqtt/publish_connect_test diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py b/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py index bb4e9dc5d0..cf4c612719 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py @@ -9,7 +9,8 @@ import ssl import string import subprocess import sys -from threading import Event, Thread +import time +from threading import Event, Lock, Thread import paho.mqtt.client as mqtt import ttfw_idf @@ -59,6 +60,7 @@ class MqttPublisher: self.publish_cfg['qos'] = qos self.publish_cfg['queue'] = queue self.publish_cfg['transport'] = transport + self.lock = Lock() # static variables used to pass options to and from static callbacks of paho-mqtt client MqttPublisher.event_client_connected = Event() MqttPublisher.event_client_got_all = Event() @@ -71,9 +73,11 @@ class MqttPublisher: if self.log_details: print(text) - def mqtt_client_task(self, client): + def mqtt_client_task(self, client, lock): while not self.event_stop_client.is_set(): - client.loop() + with lock: + client.loop() + time.sleep(0.001) # yield to other threads # The callback for when the client receives a CONNACK response from the server (needs to be static) @staticmethod @@ -120,18 +124,20 @@ class MqttPublisher: self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host)) raise # Starting a py-client in a separate thread - thread1 = Thread(target=self.mqtt_client_task, args=(self.client,)) + thread1 = Thread(target=self.mqtt_client_task, args=(self.client, self.lock)) thread1.start() self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port)) if not MqttPublisher.event_client_connected.wait(timeout=30): raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host)) - self.client.subscribe(self.publish_cfg['subscribe_topic'], qos) + with self.lock: + self.client.subscribe(self.publish_cfg['subscribe_topic'], qos) self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n') try: # waiting till subscribed to defined topic self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30) for _ in range(MqttPublisher.published): - self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos) + with self.lock: + self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos) self.print_details('Publishing...') self.print_details('Checking esp-client received msg published from py-client...') self.dut.expect(re.compile(r'Correct pattern received exactly x times'), timeout=60) From c2b037572cec2c9c56e773d95f49d9f2911a319d Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 27 Jan 2022 07:43:05 +0100 Subject: [PATCH 2/2] ci/mqtt: Make publish test-app message properties configurable Also increase the default sizes and repeat-counts to send more data and exercise the library more intensly --- .../mqtt/publish_connect_test/app_test.py | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py b/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py index cf4c612719..da22edb83f 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py @@ -10,6 +10,7 @@ import string import subprocess import sys import time +from itertools import count from threading import Event, Lock, Thread import paho.mqtt.client as mqtt @@ -383,16 +384,31 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data): with MqttPublisher(dut1, transport, qos, repeat, published, queue, publish_cfg): pass + # Initialize message sizes and repeat counts (if defined in the environment) + messages = [] + for i in count(0): + # Check env variable: MQTT_PUBLISH_MSG_{len|repeat}_{x} + env_dict = {var:'MQTT_PUBLISH_MSG_' + var + '_' + str(i) for var in ['len', 'repeat']} + if os.getenv(env_dict['len']) and os.getenv(env_dict['repeat']): + messages.append({var: int(os.getenv(env_dict[var])) for var in ['len', 'repeat']}) + continue + break + if not messages: # No message sizes present in the env - set defaults + messages = [{'len':0, 'repeat':5}, # zero-sized messages + {'len':2, 'repeat':10}, # short messages + {'len':200, 'repeat':3}, # long messages + {'len':20, 'repeat':50} # many medium sized + ] + + # Iterate over all publish message properties for qos in [0, 1, 2]: for transport in ['tcp', 'ssl', 'ws', 'wss']: for q in [0, 1]: if publish_cfg['broker_host_' + transport] is None: print('Skipping transport: {}...'.format(transport)) continue - start_publish_case(transport, qos, 0, 5, q) - start_publish_case(transport, qos, 2, 5, q) - start_publish_case(transport, qos, 50, 1, q) - start_publish_case(transport, qos, 10, 20, q) + for msg in messages: + start_publish_case(transport, qos, msg['len'], msg['repeat'], q) if __name__ == '__main__':