mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-29 18:28:24 +02:00
refactor: Group access to output buffer in mqtt_connection_t
- Moves mqtt_connect_info to mqtt_connection_t. - Removes outbound_message in favor of accessing it trough connection.
This commit is contained in:
336
mqtt_client.c
336
mqtt_client.c
@ -4,7 +4,7 @@
|
||||
#include "esp_log.h"
|
||||
#include <stdint.h>
|
||||
#include "esp_heap_caps.h"
|
||||
|
||||
#include "mqtt_msg.h"
|
||||
|
||||
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
|
||||
#ifdef ESP_EVENT_ANY_ID
|
||||
@ -396,70 +396,70 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
||||
err = ESP_ERR_NO_MEM;
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.hostname, &client->config->host), goto _mqtt_set_config_failed);
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.path, &client->config->path), goto _mqtt_set_config_failed);
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->connect_info.username), goto _mqtt_set_config_failed);
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->connect_info.password), goto _mqtt_set_config_failed);
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->mqtt_state.connection.information.username), goto _mqtt_set_config_failed);
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->mqtt_state.connection.information.password), goto _mqtt_set_config_failed);
|
||||
|
||||
if (!config->credentials.set_null_client_id) {
|
||||
if (config->credentials.client_id) {
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->connect_info.client_id), goto _mqtt_set_config_failed);
|
||||
} else if (client->connect_info.client_id == NULL) {
|
||||
client->connect_info.client_id = platform_create_id_string();
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->mqtt_state.connection.information.client_id), goto _mqtt_set_config_failed);
|
||||
} else if (client->mqtt_state.connection.information.client_id == NULL) {
|
||||
client->mqtt_state.connection.information.client_id = platform_create_id_string();
|
||||
}
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
|
||||
ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.client_id, goto _mqtt_set_config_failed);
|
||||
ESP_LOGD(TAG, "MQTT client_id=%s", client->mqtt_state.connection.information.client_id);
|
||||
}
|
||||
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.uri, &client->config->uri), goto _mqtt_set_config_failed);
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->connect_info.will_topic), goto _mqtt_set_config_failed);
|
||||
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->mqtt_state.connection.information.will_topic), goto _mqtt_set_config_failed);
|
||||
|
||||
if (config->session.last_will.msg_len && config->session.last_will.msg) {
|
||||
free(client->connect_info.will_message);
|
||||
client->connect_info.will_message = malloc(config->session.last_will.msg_len);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
|
||||
memcpy(client->connect_info.will_message, config->session.last_will.msg, config->session.last_will.msg_len);
|
||||
client->connect_info.will_length = config->session.last_will.msg_len;
|
||||
free(client->mqtt_state.connection.information.will_message);
|
||||
client->mqtt_state.connection.information.will_message = malloc(config->session.last_will.msg_len);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
|
||||
memcpy(client->mqtt_state.connection.information.will_message, config->session.last_will.msg, config->session.last_will.msg_len);
|
||||
client->mqtt_state.connection.information.will_length = config->session.last_will.msg_len;
|
||||
} else if (config->session.last_will.msg) {
|
||||
free(client->connect_info.will_message);
|
||||
client->connect_info.will_message = strdup(config->session.last_will.msg);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
|
||||
client->connect_info.will_length = strlen(config->session.last_will.msg);
|
||||
free(client->mqtt_state.connection.information.will_message);
|
||||
client->mqtt_state.connection.information.will_message = strdup(config->session.last_will.msg);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
|
||||
client->mqtt_state.connection.information.will_length = strlen(config->session.last_will.msg);
|
||||
}
|
||||
if (config->session.last_will.qos) {
|
||||
client->connect_info.will_qos = config->session.last_will.qos;
|
||||
client->mqtt_state.connection.information.will_qos = config->session.last_will.qos;
|
||||
}
|
||||
if (config->session.last_will.retain) {
|
||||
client->connect_info.will_retain = config->session.last_will.retain;
|
||||
client->mqtt_state.connection.information.will_retain = config->session.last_will.retain;
|
||||
}
|
||||
|
||||
if (config->session.disable_clean_session == client->connect_info.clean_session) {
|
||||
client->connect_info.clean_session = !config->session.disable_clean_session;
|
||||
if (!client->connect_info.clean_session && config->credentials.set_null_client_id) {
|
||||
if (config->session.disable_clean_session == client->mqtt_state.connection.information.clean_session) {
|
||||
client->mqtt_state.connection.information.clean_session = !config->session.disable_clean_session;
|
||||
if (!client->mqtt_state.connection.information.clean_session && config->credentials.set_null_client_id) {
|
||||
ESP_LOGE(TAG, "Clean Session flag must be true if client has a null id");
|
||||
}
|
||||
}
|
||||
if (config->session.keepalive) {
|
||||
client->connect_info.keepalive = config->session.keepalive;
|
||||
client->mqtt_state.connection.information.keepalive = config->session.keepalive;
|
||||
}
|
||||
if (client->connect_info.keepalive == 0) {
|
||||
client->connect_info.keepalive = MQTT_KEEPALIVE_TICK;
|
||||
if (client->mqtt_state.connection.information.keepalive == 0) {
|
||||
client->mqtt_state.connection.information.keepalive = MQTT_KEEPALIVE_TICK;
|
||||
}
|
||||
if (config->session.disable_keepalive) {
|
||||
// internal `keepalive` value (in connect_info) is in line with 3.1.2.10 Keep Alive from mqtt spec:
|
||||
// * keepalive=0: Keep alive mechanism disabled (server not to disconnect the client on its inactivity)
|
||||
// * period in seconds to send a Control packet if inactive
|
||||
client->connect_info.keepalive = 0;
|
||||
client->mqtt_state.connection.information.keepalive = 0;
|
||||
}
|
||||
|
||||
if (config->session.protocol_ver) {
|
||||
client->connect_info.protocol_ver = config->session.protocol_ver;
|
||||
client->mqtt_state.connection.information.protocol_ver = config->session.protocol_ver;
|
||||
}
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_UNDEFINED) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_UNDEFINED) {
|
||||
#ifdef MQTT_PROTOCOL_311
|
||||
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
|
||||
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
|
||||
#else
|
||||
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1;
|
||||
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1;
|
||||
#endif
|
||||
} else if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
} else if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifndef MQTT_PROTOCOL_5
|
||||
ESP_LOGE(TAG, "Please first enable MQTT_PROTOCOL_5 feature in menuconfig");
|
||||
goto _mqtt_set_config_failed;
|
||||
@ -485,9 +485,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
||||
} else {
|
||||
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
|
||||
}
|
||||
if (config->network.transport) {
|
||||
client->config->transport = config->network.transport;
|
||||
}
|
||||
|
||||
if (config->broker.verification.alpn_protos) {
|
||||
for (int i = 0; i < client->config->num_alpn_protos; i++) {
|
||||
@ -592,15 +589,15 @@ void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
free(client->config->alpn_protos);
|
||||
free(client->config->clientkey_password);
|
||||
free(client->connect_info.will_topic);
|
||||
free(client->connect_info.will_message);
|
||||
free(client->connect_info.client_id);
|
||||
free(client->connect_info.username);
|
||||
free(client->connect_info.password);
|
||||
free(client->mqtt_state.connection.information.will_topic);
|
||||
free(client->mqtt_state.connection.information.will_message);
|
||||
free(client->mqtt_state.connection.information.client_id);
|
||||
free(client->mqtt_state.connection.information.username);
|
||||
free(client->mqtt_state.connection.information.password);
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_client_destory(client);
|
||||
#endif
|
||||
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
|
||||
memset(&client->mqtt_state.connection.information, 0, sizeof(mqtt_connect_info_t));
|
||||
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
|
||||
if (client->config->event_loop_handle) {
|
||||
esp_event_loop_delete(client->config->event_loop_handle);
|
||||
@ -620,8 +617,8 @@ static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout)
|
||||
|
||||
static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.keepalive > 0) {
|
||||
const uint64_t keepalive_ms = client->connect_info.keepalive * 1000;
|
||||
if (client->mqtt_state.connection.information.keepalive > 0) {
|
||||
const uint64_t keepalive_ms = client->mqtt_state.connection.information.keepalive * 1000;
|
||||
|
||||
if (client->wait_for_ping_resp == true ) {
|
||||
if (has_timed_out(client->keepalive_tick, keepalive_ms)) {
|
||||
@ -648,10 +645,10 @@ static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
|
||||
|
||||
static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length;
|
||||
int wlen = 0, widx = 0, len = client->mqtt_state.connection.outbound_message.length;
|
||||
while (len > 0) {
|
||||
wlen = esp_transport_write(client->transport,
|
||||
(char *)client->mqtt_state.outbound_message->data + widx,
|
||||
(char *)client->mqtt_state.connection.outbound_message.data + widx,
|
||||
len,
|
||||
client->config->network_timeout_ms);
|
||||
if (wlen < 0) {
|
||||
@ -674,29 +671,29 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
{
|
||||
int read_len, connect_rsp_code = 0;
|
||||
client->wait_for_ping_resp = false;
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_connect(&client->mqtt_state.mqtt_connection,
|
||||
&client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);
|
||||
mqtt5_msg_connect(&client->mqtt_state.connection,
|
||||
&client->mqtt_state.connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
|
||||
&client->connect_info);
|
||||
mqtt_msg_connect(&client->mqtt_state.connection,
|
||||
&client->mqtt_state.connection.information);
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Connect message cannot be created");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.outbound_message->data,
|
||||
client->mqtt_state.outbound_message->length);
|
||||
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.connection.outbound_message.data,
|
||||
client->mqtt_state.connection.outbound_message.length);
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
|
||||
client->mqtt_state.outbound_message->length);
|
||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.connection.outbound_message.data,
|
||||
client->mqtt_state.connection.outbound_message.length);
|
||||
}
|
||||
ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X",
|
||||
client->mqtt_state.pending_msg_type,
|
||||
@ -724,7 +721,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
|
||||
client->send_publish_packet_count = 0;
|
||||
@ -807,6 +804,24 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
||||
if (!create_client_data(client)) {
|
||||
goto _mqtt_init_failed;
|
||||
}
|
||||
int buffer_size = config->buffer.size;
|
||||
if (buffer_size <= 0) {
|
||||
buffer_size = MQTT_BUFFER_SIZE_BYTE;
|
||||
}
|
||||
|
||||
// use separate value for output buffer size if configured
|
||||
int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size;
|
||||
if (mqtt_connection_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) {
|
||||
goto _mqtt_init_failed;
|
||||
}
|
||||
|
||||
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed);
|
||||
client->mqtt_state.in_buffer_length = buffer_size;
|
||||
client->outbox = outbox_init();
|
||||
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
|
||||
client->status_bits = xEventGroupCreate();
|
||||
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
|
||||
|
||||
if (esp_mqtt_set_config(client, config) != ESP_OK) {
|
||||
goto _mqtt_init_failed;
|
||||
@ -826,27 +841,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
||||
client->reconnect_tick = platform_tick_get_ms();
|
||||
client->refresh_connection_tick = platform_tick_get_ms();
|
||||
client->wait_for_ping_resp = false;
|
||||
int buffer_size = config->buffer.size;
|
||||
if (buffer_size <= 0) {
|
||||
buffer_size = MQTT_BUFFER_SIZE_BYTE;
|
||||
}
|
||||
// use separate value for output buffer size if configured
|
||||
int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size;
|
||||
|
||||
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed);
|
||||
client->mqtt_state.in_buffer_length = buffer_size;
|
||||
client->mqtt_state.out_buffer = (uint8_t *)malloc(out_buffer_size);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.out_buffer, goto _mqtt_init_failed);
|
||||
|
||||
client->mqtt_state.out_buffer_length = out_buffer_size;
|
||||
client->outbox = outbox_init();
|
||||
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
|
||||
client->status_bits = xEventGroupCreate();
|
||||
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
|
||||
|
||||
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer,
|
||||
client->mqtt_state.out_buffer_length);
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (esp_mqtt5_create_default_config(client) != ESP_OK) {
|
||||
goto _mqtt_init_failed;
|
||||
@ -877,7 +871,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
|
||||
vEventGroupDelete(client->status_bits);
|
||||
}
|
||||
free(client->mqtt_state.in_buffer);
|
||||
free(client->mqtt_state.out_buffer);
|
||||
mqtt_connection_destroy(&client->mqtt_state.connection);
|
||||
if (client->api_lock) {
|
||||
vSemaphoreDelete(client->api_lock);
|
||||
}
|
||||
@ -944,9 +938,9 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
||||
if (pass) {
|
||||
pass[0] = 0; //terminal username
|
||||
pass ++;
|
||||
client->connect_info.password = strdup(pass);
|
||||
client->mqtt_state.connection.information.password = strdup(pass);
|
||||
}
|
||||
client->connect_info.username = strdup(user_info);
|
||||
client->mqtt_state.connection.information.username = strdup(user_info);
|
||||
|
||||
free(user_info);
|
||||
}
|
||||
@ -958,7 +952,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
||||
|
||||
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->event.msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
|
||||
#endif
|
||||
@ -982,16 +976,16 @@ esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mq
|
||||
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
client->event.client = client;
|
||||
client->event.protocol_ver = client->connect_info.protocol_ver;
|
||||
client->event.protocol_ver = client->mqtt_state.connection.information.protocol_ver;
|
||||
esp_err_t ret = ESP_FAIL;
|
||||
|
||||
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
|
||||
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
|
||||
ret = esp_event_loop_run(client->config->event_loop_handle, 0);
|
||||
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
|
||||
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
|
||||
ret = esp_event_loop_run(client->config->event_loop_handle, 0);
|
||||
#else
|
||||
return ESP_FAIL;
|
||||
#endif
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_client_delete_user_property(client->event.property->user_property);
|
||||
client->event.property->user_property = NULL;
|
||||
@ -1009,7 +1003,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
|
||||
size_t msg_data_offset = 0;
|
||||
char *msg_topic = NULL, *msg_data = NULL;
|
||||
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (esp_mqtt5_get_publish_data(client, msg_buf, msg_read_len, &msg_topic, &msg_topic_len, &msg_data, &msg_data_len) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "%s: esp_mqtt5_get_publish_data() failed", __func__);
|
||||
@ -1034,7 +1028,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
// post data event
|
||||
client->event.retain = mqtt_get_retain(msg_buf);
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->event.msg_id = mqtt5_get_id(msg_buf, msg_read_len);
|
||||
#endif
|
||||
@ -1082,7 +1076,7 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
|
||||
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
|
||||
char *msg_data = NULL;
|
||||
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property);
|
||||
#else
|
||||
@ -1135,16 +1129,16 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_
|
||||
{
|
||||
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
outbox_message_t msg = { 0 };
|
||||
msg.data = client->mqtt_state.outbound_message->data;
|
||||
msg.len = client->mqtt_state.outbound_message->length;
|
||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
||||
msg.remaining_data = remaining_data;
|
||||
msg.remaining_len = remaining_len;
|
||||
//Copy to queue buffer
|
||||
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||
outbox_message_t msg = { 0 };
|
||||
msg.data = client->mqtt_state.connection.outbound_message.data;
|
||||
msg.len = client->mqtt_state.connection.outbound_message.length;
|
||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
||||
msg.remaining_data = remaining_data;
|
||||
msg.remaining_len = remaining_len;
|
||||
//Copy to queue buffer
|
||||
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||
}
|
||||
|
||||
|
||||
@ -1323,23 +1317,23 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (msg_qos == 1) {
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt5_msg_puback(&client->mqtt_state.connection, msg_id);
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_msg_puback(&client->mqtt_state.connection, msg_id);
|
||||
}
|
||||
} else if (msg_qos == 2) {
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt5_msg_pubrec(&client->mqtt_state.connection, msg_id);
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_msg_pubrec(&client->mqtt_state.connection, msg_id);
|
||||
}
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@ -1355,7 +1349,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_decrement_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
@ -1370,15 +1364,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREC return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt5_msg_pubrel(&client->mqtt_state.connection, msg_id);
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_msg_pubrel(&client->mqtt_state.connection, msg_id);
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Publish response message PUBREL cannot be created");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@ -1388,15 +1382,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREL return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt5_msg_pubcomp(&client->mqtt_state.connection, msg_id);
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_msg_pubcomp(&client->mqtt_state.connection, msg_id);
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@ -1406,7 +1400,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_decrement_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
@ -1438,11 +1432,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
|
||||
{
|
||||
// decode queued data
|
||||
client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id,
|
||||
client->mqtt_state.connection.outbound_message.data = outbox_item_get_data(item, &client->mqtt_state.connection.outbound_message.length, &client->mqtt_state.pending_msg_id,
|
||||
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
|
||||
// set duplicate flag for QoS-1 and QoS-2 messages
|
||||
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) {
|
||||
mqtt_set_dup(client->mqtt_state.outbound_message->data);
|
||||
mqtt_set_dup(client->mqtt_state.connection.outbound_message.data);
|
||||
ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id);
|
||||
}
|
||||
|
||||
@ -1580,7 +1574,7 @@ static void esp_mqtt_task(void *pv)
|
||||
break;
|
||||
}
|
||||
client->event.event_id = MQTT_EVENT_CONNECTED;
|
||||
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
|
||||
}
|
||||
client->state = MQTT_STATE_CONNECTED;
|
||||
@ -1733,19 +1727,19 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
|
||||
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
// Notify the broker we are disconnecting
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_disconnect(&client->mqtt_state.mqtt_connection, &client->mqtt5_config->disconnect_property_info);
|
||||
if (client->mqtt_state.outbound_message->length) {
|
||||
mqtt5_msg_disconnect(&client->mqtt_state.connection, &client->mqtt5_config->disconnect_property_info);
|
||||
if (client->mqtt_state.connection.outbound_message.length) {
|
||||
esp_mqtt5_client_delete_user_property(client->mqtt5_config->disconnect_property_info.user_property);
|
||||
client->mqtt5_config->disconnect_property_info.user_property = NULL;
|
||||
memset(&client->mqtt5_config->disconnect_property_info, 0, sizeof(esp_mqtt5_disconnect_property_config_t));
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
|
||||
mqtt_msg_disconnect(&client->mqtt_state.connection);
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Disconnect message cannot be created");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@ -1793,8 +1787,8 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
||||
|
||||
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
mqtt_msg_pingreq(&client->mqtt_state.connection);
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Ping message cannot be created");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@ -1820,7 +1814,7 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
int max_qos = topic_list[0].qos;
|
||||
for (int topic_number = 0; topic_number < size; ++topic_number) {
|
||||
@ -1833,25 +1827,25 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||
mqtt5_msg_subscribe(&client->mqtt_state.connection,
|
||||
topic_list, size,
|
||||
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
|
||||
if (client->mqtt_state.outbound_message->length) {
|
||||
if (client->mqtt_state.connection.outbound_message.length) {
|
||||
client->mqtt5_config->subscribe_property_info = NULL;
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||
mqtt_msg_subscribe(&client->mqtt_state.connection,
|
||||
topic_list, size,
|
||||
&client->mqtt_state.pending_msg_id);
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Subscribe message cannot be created");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
|
||||
//move pending msg to outbox (if have)
|
||||
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
@ -1888,28 +1882,28 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
||||
ESP_LOGE(TAG, "Client has not connected");
|
||||
return -1;
|
||||
}
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
||||
mqtt5_msg_unsubscribe(&client->mqtt_state.connection,
|
||||
topic,
|
||||
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
|
||||
if (client->mqtt_state.outbound_message->length) {
|
||||
if (client->mqtt_state.connection.outbound_message.length) {
|
||||
client->mqtt5_config->unsubscribe_property_info = NULL;
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
||||
mqtt_msg_unsubscribe(&client->mqtt_state.connection,
|
||||
topic,
|
||||
&client->mqtt_state.pending_msg_id);
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
ESP_LOGE(TAG, "Unubscribe message cannot be created");
|
||||
return -1;
|
||||
}
|
||||
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
|
||||
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
|
||||
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
@ -1927,48 +1921,54 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
||||
return client->mqtt_state.pending_msg_id;
|
||||
}
|
||||
|
||||
static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data,
|
||||
int len, int qos, int retain, bool store)
|
||||
static int make_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
|
||||
int len, int qos, int retain)
|
||||
{
|
||||
uint16_t pending_msg_id = 0;
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||
mqtt5_msg_publish(&client->mqtt_state.connection,
|
||||
topic, data, len,
|
||||
qos, retain,
|
||||
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
|
||||
if (client->mqtt_state.outbound_message->length) {
|
||||
if (client->mqtt_state.connection.outbound_message.length) {
|
||||
client->mqtt5_config->publish_property_info = NULL;
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||
mqtt_msg_publish(&client->mqtt_state.connection,
|
||||
topic, data, len,
|
||||
qos, retain,
|
||||
&pending_msg_id);
|
||||
}
|
||||
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.length == 0) {
|
||||
ESP_LOGE(TAG, "Publish message cannot be created");
|
||||
return -1;
|
||||
}
|
||||
return pending_msg_id;
|
||||
}
|
||||
static inline int mqtt_client_enqueue_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
|
||||
int len, int qos, int retain, bool store)
|
||||
{
|
||||
int pending_msg_id = make_publish(client, topic, data, len, qos, retain);
|
||||
/* We have to set as pending all the qos>0 messages */
|
||||
//TODO: client->mqtt_state.outbound_message = publish_msg;
|
||||
if (qos > 0 || store) {
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
|
||||
client->mqtt_state.pending_msg_id = pending_msg_id;
|
||||
client->mqtt_state.pending_publish_qos = qos;
|
||||
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
||||
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
||||
if (client->mqtt_state.connection.outbound_message.fragmented_msg_total_length == 0) {
|
||||
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||
int first_fragment = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
|
||||
if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
|
||||
return -1;
|
||||
}
|
||||
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
|
||||
}
|
||||
}
|
||||
return pending_msg_id;
|
||||
@ -1990,7 +1990,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
#endif
|
||||
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
|
||||
ESP_LOGI(TAG, "MQTT5 publish check fail");
|
||||
MQTT_API_UNLOCK(client);
|
||||
@ -2008,7 +2008,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
len = strlen(data);
|
||||
}
|
||||
|
||||
int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false);
|
||||
int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false);
|
||||
if (pending_msg_id < 0) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
@ -2043,27 +2043,19 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
goto cannot_publish;
|
||||
}
|
||||
|
||||
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||
client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0;
|
||||
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||
int data_sent = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
|
||||
client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset = 0;
|
||||
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
|
||||
remaining_len -= data_sent;
|
||||
current_data += data_sent;
|
||||
|
||||
if (remaining_len > 0) {
|
||||
mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection;
|
||||
mqtt_connection_t *connection = &client->mqtt_state.connection;
|
||||
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
|
||||
if (remaining_len > connection->buffer_length) {
|
||||
// Continue with sending
|
||||
memcpy(connection->buffer, current_data, connection->buffer_length);
|
||||
connection->message.length = connection->buffer_length;
|
||||
sending = true;
|
||||
} else {
|
||||
memcpy(connection->buffer, current_data, remaining_len);
|
||||
connection->message.length = remaining_len;
|
||||
sending = true;
|
||||
}
|
||||
connection->message.data = connection->buffer;
|
||||
client->mqtt_state.outbound_message = &connection->message;
|
||||
int write_len = remaining_len > connection->buffer_length ? connection->buffer_length : remaining_len;
|
||||
memcpy(connection->buffer, current_data, write_len);
|
||||
connection->outbound_message.length = write_len;
|
||||
sending = true;
|
||||
} else {
|
||||
// Message was sent correctly
|
||||
sending = false;
|
||||
@ -2085,7 +2077,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
|
||||
cannot_publish:
|
||||
// clear out possible fragmented publish if failed or skipped
|
||||
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
|
||||
if (qos == 0) {
|
||||
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
|
||||
}
|
||||
@ -2112,7 +2104,7 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
|
||||
|
||||
MQTT_API_LOCK(client);
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
|
||||
ESP_LOGI(TAG, "esp_mqtt_client_enqueue check fail");
|
||||
MQTT_API_UNLOCK(client);
|
||||
@ -2120,7 +2112,7 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
|
||||
}
|
||||
}
|
||||
#endif
|
||||
int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store);
|
||||
int ret = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, store);
|
||||
MQTT_API_UNLOCK(client);
|
||||
if (ret == 0 && store == false) {
|
||||
// messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error
|
||||
|
Reference in New Issue
Block a user