diff --git a/lib/include/mqtt_client_priv.h b/lib/include/mqtt_client_priv.h index e359420..bf09ceb 100644 --- a/lib/include/mqtt_client_priv.h +++ b/lib/include/mqtt_client_priv.h @@ -58,7 +58,6 @@ typedef struct mqtt_state { uint16_t pending_msg_id; int pending_msg_type; int pending_publish_qos; - int pending_msg_count; } mqtt_state_t; typedef struct { diff --git a/mqtt_client.c b/mqtt_client.c index 258d1da..c68bd53 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -641,8 +641,11 @@ static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client) client->config->network_timeout_ms); if (wlen < 0) { ESP_LOGE(TAG, "Writing failed: errno=%d", errno); + esp_mqtt_client_dispatch_transport_error(client); return ESP_FAIL; - } else if (wlen == 0) { + } + + if (wlen == 0) { ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno); return ESP_ERR_TIMEOUT; } @@ -937,14 +940,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u return ESP_OK; } -static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) -{ - if (esp_mqtt_write(client) != ESP_OK) { - esp_mqtt_client_dispatch_transport_error(client); - return ESP_FAIL; - } - return ESP_OK; -} static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) { @@ -1116,51 +1111,29 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) // Return false when message is not found, making the received counterpart invalid. static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id) { - ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count); - if (client->mqtt_state.pending_msg_count == 0) { - return false; - } if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) { - client->mqtt_state.pending_msg_count --; + ESP_LOGD(TAG, "Removed pending_id=%d", client->mqtt_state.pending_msg_id); return true; } + ESP_LOGD(TAG, "Failed to remove pending_id=%d", client->mqtt_state.pending_msg_id); return false; } -static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) -{ - ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful", - client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - //lock mutex - outbox_message_t msg = { 0 }; - msg.data = client->mqtt_state.outbound_message->data; - msg.len = client->mqtt_state.outbound_message->length; - msg.msg_id = client->mqtt_state.pending_msg_id; - msg.msg_type = client->mqtt_state.pending_msg_type; - msg.msg_qos = client->mqtt_state.pending_publish_qos; - msg.remaining_data = remaining_data; - msg.remaining_len = remaining_len; - //Copy to queue buffer - return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); - //unlock -} - -static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client) +static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - if (client->mqtt_state.pending_msg_count > 0) { outbox_message_t msg = { 0 }; msg.data = client->mqtt_state.outbound_message->data; msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; msg.msg_type = client->mqtt_state.pending_msg_type; msg.msg_qos = client->mqtt_state.pending_publish_qos; + msg.remaining_data = remaining_data; + msg.remaining_len = remaining_len; //Copy to queue buffer return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); - } - return NULL; } @@ -1363,7 +1336,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (msg_qos == 1 || msg_qos == 2) { ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); return ESP_FAIL; } @@ -1400,7 +1373,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) } outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED); - mqtt_write_data(client); + esp_mqtt_write(client); break; case MQTT_MSG_TYPE_PUBREL: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); @@ -1417,7 +1390,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_FAIL; } - mqtt_write_data(client); + esp_mqtt_write(client); break; case MQTT_MSG_TYPE_PUBCOMP: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); @@ -1463,7 +1436,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item } // try to resend the data - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error to resend data "); esp_mqtt_abort_connection(client); return ESP_FAIL; @@ -1492,7 +1465,6 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client) // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds #if MQTT_REPORT_DELETED_MESSAGES // also report the deleted items as MQTT_EVENT_DELETED events if enabled - int deleted_items = 0; int msg_id = 0; while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) { client->event.event_id = MQTT_EVENT_DELETED; @@ -1500,16 +1472,10 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client) if (esp_mqtt_dispatch_event(client) != ESP_OK) { ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id); } - deleted_items ++; } #else - int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); + outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); #endif - client->mqtt_state.pending_msg_count -= deleted_items; - - if (client->mqtt_state.pending_msg_count < 0) { - client->mqtt_state.pending_msg_count = 0; - } } /** @@ -1770,7 +1736,7 @@ static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client) ESP_LOGE(TAG, "Disconnect message cannot be created"); return ESP_FAIL; } - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error sending disconnect message"); } return ESP_OK; @@ -1820,7 +1786,7 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client) return ESP_FAIL; } - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error sending ping"); return ESP_FAIL; } @@ -1873,15 +1839,14 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, } client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_count ++; //move pending msg to outbox (if have) - if (!mqtt_enqueue(client)) { + if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); return -1; } outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos); MQTT_API_UNLOCK(client); return -1; @@ -1932,14 +1897,13 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_count ++; - if (!mqtt_enqueue(client)) { + if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); return -1; } outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic); MQTT_API_UNLOCK(client); return -1; @@ -1981,15 +1945,14 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; - client->mqtt_state.pending_msg_count ++; // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { - if (!mqtt_enqueue(client)) { + if (!mqtt_enqueue(client, NULL, 0)) { return -1; } } else { int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; - if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { + if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { return -1; } client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; @@ -2059,7 +2022,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, while (sending) { - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { esp_mqtt_abort_connection(client); ret = -1; goto cannot_publish;