mqtt data events fixed to keep consistent msg_id when fragmented message received

closes #70
This commit is contained in:
David Cermak
2018-11-02 08:37:05 +01:00
parent e0bbbebc08
commit 06fe5cca8e

View File

@@ -77,6 +77,7 @@ struct esp_mqtt_client {
const static int STOPPED_BIT = BIT0; const static int STOPPED_BIT = BIT0;
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms); static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms);
static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
@@ -293,7 +294,7 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms); ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
client->event.event_id = MQTT_EVENT_DISCONNECTED; client->event.event_id = MQTT_EVENT_DISCONNECTED;
client->wait_for_ping_resp = false; client->wait_for_ping_resp = false;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event_with_msgid(client);
return ESP_OK; return ESP_OK;
} }
@@ -490,9 +491,14 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
return ESP_OK; return ESP_OK;
} }
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{ {
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
return esp_mqtt_dispatch_event(client);
}
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
client->event.user_context = client->config->user_context; client->event.user_context = client->config->user_context;
client->event.client = client; client->event.client = client;
@@ -536,6 +542,12 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
} else { } else {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length; mqtt_len = mqtt_data_length;
if (client->mqtt_state.message_length_read < client->mqtt_state.message_length) {
/* if message is framented -> correct the size for the first DATA event */
mqtt_data_length = client->mqtt_state.message_length_read - ((uint8_t*)mqtt_data- message);
}
/* read msg id only once */
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
} }
} else { } else {
mqtt_len = len_read; mqtt_len = len_read;
@@ -668,14 +680,14 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful"); ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED; client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event_with_msgid(client);
} }
break; break;
case MQTT_MSG_TYPE_UNSUBACK: case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful"); ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event_with_msgid(client);
} }
break; break;
case MQTT_MSG_TYPE_PUBLISH: case MQTT_MSG_TYPE_PUBLISH:
@@ -706,7 +718,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED; client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event_with_msgid(client);
} }
break; break;
@@ -726,7 +738,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED; client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event_with_msgid(client);
} }
break; break;
case MQTT_MSG_TYPE_PINGRESP: case MQTT_MSG_TYPE_PINGRESP:
@@ -765,7 +777,7 @@ static void esp_mqtt_task(void *pv)
switch ((int)client->state) { switch ((int)client->state) {
case MQTT_STATE_INIT: case MQTT_STATE_INIT:
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event_with_msgid(client);
if (client->transport == NULL) { if (client->transport == NULL) {
ESP_LOGE(TAG, "There are no transport"); ESP_LOGE(TAG, "There are no transport");
@@ -789,7 +801,7 @@ static void esp_mqtt_task(void *pv)
client->event.event_id = MQTT_EVENT_CONNECTED; client->event.event_id = MQTT_EVENT_CONNECTED;
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
client->state = MQTT_STATE_CONNECTED; client->state = MQTT_STATE_CONNECTED;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event_with_msgid(client);
client->refresh_connection_tick = platform_tick_get_ms(); client->refresh_connection_tick = platform_tick_get_ms();
break; break;