diff --git a/mqtt_client.c b/mqtt_client.c index f45c43b..22490eb 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -309,6 +309,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len); return ESP_FAIL; } + client->mqtt_state.in_buffer_read_len = 0; connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer); switch (connect_rsp_code) { case CONNECTION_ACCEPTED: @@ -1192,6 +1193,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; mqtt_enqueue(client); //move pending msg to outbox (if have) + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); @@ -1220,6 +1222,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; mqtt_enqueue(client); + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);