Merge branch 'merge_enqueue' into 'master'

Minor cleanups on mqtt client

See merge request espressif/esp-mqtt!168
This commit is contained in:
Rocha Euripedes
2023-04-21 15:11:48 +08:00
2 changed files with 23 additions and 61 deletions

View File

@ -58,7 +58,6 @@ typedef struct mqtt_state {
uint16_t pending_msg_id; uint16_t pending_msg_id;
int pending_msg_type; int pending_msg_type;
int pending_publish_qos; int pending_publish_qos;
int pending_msg_count;
} mqtt_state_t; } mqtt_state_t;
typedef struct { typedef struct {

View File

@ -641,8 +641,11 @@ static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
client->config->network_timeout_ms); client->config->network_timeout_ms);
if (wlen < 0) { if (wlen < 0) {
ESP_LOGE(TAG, "Writing failed: errno=%d", errno); ESP_LOGE(TAG, "Writing failed: errno=%d", errno);
esp_mqtt_client_dispatch_transport_error(client);
return ESP_FAIL; return ESP_FAIL;
} else if (wlen == 0) { }
if (wlen == 0) {
ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno); ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno);
return ESP_ERR_TIMEOUT; return ESP_ERR_TIMEOUT;
} }
@ -937,14 +940,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
return ESP_OK; return ESP_OK;
} }
static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
{
if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_client_dispatch_transport_error(client);
return ESP_FAIL;
}
return ESP_OK;
}
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{ {
@ -1116,51 +1111,29 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
// Return false when message is not found, making the received counterpart invalid. // Return false when message is not found, making the received counterpart invalid.
static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id) static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
{ {
ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count);
if (client->mqtt_state.pending_msg_count == 0) {
return false;
}
if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) { if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) {
client->mqtt_state.pending_msg_count --; ESP_LOGD(TAG, "Removed pending_id=%d", client->mqtt_state.pending_msg_id);
return true; return true;
} }
ESP_LOGD(TAG, "Failed to remove pending_id=%d", client->mqtt_state.pending_msg_id);
return false; return false;
} }
static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
//unlock
}
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client)
{ {
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
if (client->mqtt_state.pending_msg_count > 0) {
outbox_message_t msg = { 0 }; outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data; msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length; msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id; msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type; msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos; msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer //Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
return NULL;
} }
@ -1363,7 +1336,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
if (msg_qos == 1 || msg_qos == 2) { if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (mqtt_write_data(client) != ESP_OK) { if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
return ESP_FAIL; return ESP_FAIL;
} }
@ -1400,7 +1373,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
} }
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED); outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
mqtt_write_data(client); esp_mqtt_write(client);
break; break;
case MQTT_MSG_TYPE_PUBREL: case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
@ -1417,7 +1390,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
return ESP_FAIL; return ESP_FAIL;
} }
mqtt_write_data(client); esp_mqtt_write(client);
break; break;
case MQTT_MSG_TYPE_PUBCOMP: case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
@ -1463,7 +1436,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
} }
// try to resend the data // try to resend the data
if (mqtt_write_data(client) != ESP_OK) { if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to resend data "); ESP_LOGE(TAG, "Error to resend data ");
esp_mqtt_abort_connection(client); esp_mqtt_abort_connection(client);
return ESP_FAIL; return ESP_FAIL;
@ -1492,7 +1465,6 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
#if MQTT_REPORT_DELETED_MESSAGES #if MQTT_REPORT_DELETED_MESSAGES
// also report the deleted items as MQTT_EVENT_DELETED events if enabled // also report the deleted items as MQTT_EVENT_DELETED events if enabled
int deleted_items = 0;
int msg_id = 0; int msg_id = 0;
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) { while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) {
client->event.event_id = MQTT_EVENT_DELETED; client->event.event_id = MQTT_EVENT_DELETED;
@ -1500,16 +1472,10 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
if (esp_mqtt_dispatch_event(client) != ESP_OK) { if (esp_mqtt_dispatch_event(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id); ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
} }
deleted_items ++;
} }
#else #else
int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
#endif #endif
client->mqtt_state.pending_msg_count -= deleted_items;
if (client->mqtt_state.pending_msg_count < 0) {
client->mqtt_state.pending_msg_count = 0;
}
} }
/** /**
@ -1770,7 +1736,7 @@ static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
ESP_LOGE(TAG, "Disconnect message cannot be created"); ESP_LOGE(TAG, "Disconnect message cannot be created");
return ESP_FAIL; return ESP_FAIL;
} }
if (mqtt_write_data(client) != ESP_OK) { if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending disconnect message"); ESP_LOGE(TAG, "Error sending disconnect message");
} }
return ESP_OK; return ESP_OK;
@ -1820,7 +1786,7 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
return ESP_FAIL; return ESP_FAIL;
} }
if (mqtt_write_data(client) != ESP_OK) { if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending ping"); ESP_LOGE(TAG, "Error sending ping");
return ESP_FAIL; return ESP_FAIL;
} }
@ -1873,15 +1839,14 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
} }
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
//move pending msg to outbox (if have) //move pending msg to outbox (if have)
if (!mqtt_enqueue(client)) { if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client); MQTT_API_UNLOCK(client);
return -1; return -1;
} }
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
if (mqtt_write_data(client) != ESP_OK) { if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos); ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos);
MQTT_API_UNLOCK(client); MQTT_API_UNLOCK(client);
return -1; return -1;
@ -1932,14 +1897,13 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++; if (!mqtt_enqueue(client, NULL, 0)) {
if (!mqtt_enqueue(client)) {
MQTT_API_UNLOCK(client); MQTT_API_UNLOCK(client);
return -1; return -1;
} }
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error
if (mqtt_write_data(client) != ESP_OK) { if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic); ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK(client); MQTT_API_UNLOCK(client);
return -1; return -1;
@ -1981,15 +1945,14 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos; client->mqtt_state.pending_publish_qos = qos;
client->mqtt_state.pending_msg_count ++;
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
if (!mqtt_enqueue(client)) { if (!mqtt_enqueue(client, NULL, 0)) {
return -1; return -1;
} }
} else { } else {
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
return -1; return -1;
} }
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
@ -2059,7 +2022,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
while (sending) { while (sending) {
if (mqtt_write_data(client) != ESP_OK) { if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_abort_connection(client); esp_mqtt_abort_connection(client);
ret = -1; ret = -1;
goto cannot_publish; goto cannot_publish;