diff --git a/mqtt_client.c b/mqtt_client.c index 6c5a795..627222c 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -77,6 +77,7 @@ struct esp_mqtt_client { const static int STOPPED_BIT = BIT0; static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client); +static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms); static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client); @@ -293,7 +294,7 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client) ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms); client->event.event_id = MQTT_EVENT_DISCONNECTED; client->wait_for_ping_resp = false; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); return ESP_OK; } @@ -490,9 +491,14 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) return ESP_OK; } -static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) +static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) { client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + return esp_mqtt_dispatch_event(client); +} + +static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) +{ client->event.user_context = client->config->user_context; client->event.client = client; @@ -536,6 +542,12 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i } else { total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; mqtt_len = mqtt_data_length; + if (client->mqtt_state.message_length_read < client->mqtt_state.message_length) { + /* if message is framented -> correct the size for the first DATA event */ + mqtt_data_length = client->mqtt_state.message_length_read - ((uint8_t*)mqtt_data- message); + } + /* read msg id only once */ + client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); } } else { mqtt_len = len_read; @@ -668,14 +680,14 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { ESP_LOGD(TAG, "Subscribe successful"); client->event.event_id = MQTT_EVENT_SUBSCRIBED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; case MQTT_MSG_TYPE_UNSUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { ESP_LOGD(TAG, "UnSubscribe successful"); client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; case MQTT_MSG_TYPE_PUBLISH: @@ -706,7 +718,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; @@ -726,7 +738,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; case MQTT_MSG_TYPE_PINGRESP: @@ -765,7 +777,7 @@ static void esp_mqtt_task(void *pv) switch ((int)client->state) { case MQTT_STATE_INIT: client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); if (client->transport == NULL) { ESP_LOGE(TAG, "There are no transport"); @@ -789,7 +801,7 @@ static void esp_mqtt_task(void *pv) client->event.event_id = MQTT_EVENT_CONNECTED; client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); client->state = MQTT_STATE_CONNECTED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); client->refresh_connection_tick = platform_tick_get_ms(); break;