Merge branch 'bugfix/mqtt_read_neg_and_ping_v4.4' into 'release/v4.4'

mqtt: Fix incorrect reads on error (v4.4)

See merge request espressif/esp-idf!18177
This commit is contained in:
David Čermák
2022-06-10 18:25:06 +08:00
3 changed files with 14 additions and 8 deletions

View File

@@ -57,7 +57,7 @@ test_weekend_mqtt:
- .rules:labels:weekend_test - .rules:labels:weekend_test
tags: tags:
- ESP32 - ESP32
- Example_WIFI - Example_EthKitV1
script: script:
- export MQTT_PUBLISH_TEST=1 - export MQTT_PUBLISH_TEST=1
- export TEST_PATH=$CI_PROJECT_DIR/tools/test_apps/protocols/mqtt/publish_connect_test - export TEST_PATH=$CI_PROJECT_DIR/tools/test_apps/protocols/mqtt/publish_connect_test

View File

@@ -9,7 +9,8 @@ import ssl
import string import string
import subprocess import subprocess
import sys import sys
from threading import Event, Thread import time
from threading import Event, Lock, Thread
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import ttfw_idf import ttfw_idf
@@ -59,6 +60,7 @@ class MqttPublisher:
self.publish_cfg['qos'] = qos self.publish_cfg['qos'] = qos
self.publish_cfg['queue'] = queue self.publish_cfg['queue'] = queue
self.publish_cfg['transport'] = transport self.publish_cfg['transport'] = transport
self.lock = Lock()
# static variables used to pass options to and from static callbacks of paho-mqtt client # static variables used to pass options to and from static callbacks of paho-mqtt client
MqttPublisher.event_client_connected = Event() MqttPublisher.event_client_connected = Event()
MqttPublisher.event_client_got_all = Event() MqttPublisher.event_client_got_all = Event()
@@ -71,9 +73,11 @@ class MqttPublisher:
if self.log_details: if self.log_details:
print(text) print(text)
def mqtt_client_task(self, client): def mqtt_client_task(self, client, lock):
while not self.event_stop_client.is_set(): while not self.event_stop_client.is_set():
with lock:
client.loop() client.loop()
time.sleep(0.001) # yield to other threads
# The callback for when the client receives a CONNACK response from the server (needs to be static) # The callback for when the client receives a CONNACK response from the server (needs to be static)
@staticmethod @staticmethod
@@ -120,17 +124,19 @@ class MqttPublisher:
self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host)) self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host))
raise raise
# Starting a py-client in a separate thread # Starting a py-client in a separate thread
thread1 = Thread(target=self.mqtt_client_task, args=(self.client,)) thread1 = Thread(target=self.mqtt_client_task, args=(self.client, self.lock))
thread1.start() thread1.start()
self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port)) self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port))
if not MqttPublisher.event_client_connected.wait(timeout=30): if not MqttPublisher.event_client_connected.wait(timeout=30):
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host)) raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host))
with self.lock:
self.client.subscribe(self.publish_cfg['subscribe_topic'], qos) self.client.subscribe(self.publish_cfg['subscribe_topic'], qos)
self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n') self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n')
try: try:
# waiting till subscribed to defined topic # waiting till subscribed to defined topic
self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30) self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30)
for _ in range(MqttPublisher.published): for _ in range(MqttPublisher.published):
with self.lock:
self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos) self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos)
self.print_details('Publishing...') self.print_details('Publishing...')
self.print_details('Checking esp-client received msg published from py-client...') self.print_details('Checking esp-client received msg published from py-client...')