ci/mqtt: Fix weekend test publish-connect on target

* Fix thread safe issue in paho-mqtt library
* Move the weekend test to ethernet runner
This commit is contained in:
David Cermak
2022-01-26 16:38:31 +01:00
parent 61b5285cff
commit 7691b44ee5
2 changed files with 13 additions and 7 deletions

View File

@@ -57,7 +57,7 @@ test_weekend_mqtt:
- .rules:labels:weekend_test - .rules:labels:weekend_test
tags: tags:
- ESP32 - ESP32
- Example_WIFI - Example_EthKitV1
script: script:
- export MQTT_PUBLISH_TEST=1 - export MQTT_PUBLISH_TEST=1
- export TEST_PATH=$CI_PROJECT_DIR/tools/test_apps/protocols/mqtt/publish_connect_test - export TEST_PATH=$CI_PROJECT_DIR/tools/test_apps/protocols/mqtt/publish_connect_test

View File

@@ -9,7 +9,8 @@ import ssl
import string import string
import subprocess import subprocess
import sys import sys
from threading import Event, Thread import time
from threading import Event, Lock, Thread
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import ttfw_idf import ttfw_idf
@@ -59,6 +60,7 @@ class MqttPublisher:
self.publish_cfg['qos'] = qos self.publish_cfg['qos'] = qos
self.publish_cfg['queue'] = queue self.publish_cfg['queue'] = queue
self.publish_cfg['transport'] = transport self.publish_cfg['transport'] = transport
self.lock = Lock()
# static variables used to pass options to and from static callbacks of paho-mqtt client # static variables used to pass options to and from static callbacks of paho-mqtt client
MqttPublisher.event_client_connected = Event() MqttPublisher.event_client_connected = Event()
MqttPublisher.event_client_got_all = Event() MqttPublisher.event_client_got_all = Event()
@@ -71,9 +73,11 @@ class MqttPublisher:
if self.log_details: if self.log_details:
print(text) print(text)
def mqtt_client_task(self, client): def mqtt_client_task(self, client, lock):
while not self.event_stop_client.is_set(): while not self.event_stop_client.is_set():
with lock:
client.loop() client.loop()
time.sleep(0.001) # yield to other threads
# The callback for when the client receives a CONNACK response from the server (needs to be static) # The callback for when the client receives a CONNACK response from the server (needs to be static)
@staticmethod @staticmethod
@@ -120,17 +124,19 @@ class MqttPublisher:
self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host)) self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host))
raise raise
# Starting a py-client in a separate thread # Starting a py-client in a separate thread
thread1 = Thread(target=self.mqtt_client_task, args=(self.client,)) thread1 = Thread(target=self.mqtt_client_task, args=(self.client, self.lock))
thread1.start() thread1.start()
self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port)) self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port))
if not MqttPublisher.event_client_connected.wait(timeout=30): if not MqttPublisher.event_client_connected.wait(timeout=30):
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host)) raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host))
with self.lock:
self.client.subscribe(self.publish_cfg['subscribe_topic'], qos) self.client.subscribe(self.publish_cfg['subscribe_topic'], qos)
self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n') self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n')
try: try:
# waiting till subscribed to defined topic # waiting till subscribed to defined topic
self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30) self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30)
for _ in range(MqttPublisher.published): for _ in range(MqttPublisher.published):
with self.lock:
self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos) self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos)
self.print_details('Publishing...') self.print_details('Publishing...')
self.print_details('Checking esp-client received msg published from py-client...') self.print_details('Checking esp-client received msg published from py-client...')