client receive: refactor receive to read only one message to fragment only payload for longer messages

Closes #113
This commit is contained in:
David Cermak
2019-04-26 16:38:09 +02:00
parent c4fdd7759c
commit fd564b1f17
3 changed files with 328 additions and 179 deletions

View File

@ -112,10 +112,11 @@ static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01);
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length);
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length);
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length);
int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length);
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);

View File

@ -154,7 +154,7 @@ void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buff
connection->buffer_length = buffer_length;
}
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len)
{
int i;
uint32_t totlen = 0;
@ -169,6 +169,7 @@ uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
}
}
totlen += i;
if (fixed_size_len) *fixed_size_len = i;
return totlen;
}
@ -529,3 +530,44 @@ mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection)
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
/*
* check flags: [MQTT-2.2.2-1], [MQTT-2.2.2-2]
* returns 0 if flags are invalid, otherwise returns 1
*/
int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length)
{
int qos, dup;
if (length < 1) {
return 0;
}
switch (mqtt_get_type(buffer))
{
case MQTT_MSG_TYPE_CONNECT:
case MQTT_MSG_TYPE_CONNACK:
case MQTT_MSG_TYPE_PUBACK:
case MQTT_MSG_TYPE_PUBREC:
case MQTT_MSG_TYPE_PUBCOMP:
case MQTT_MSG_TYPE_SUBACK:
case MQTT_MSG_TYPE_UNSUBACK:
case MQTT_MSG_TYPE_PINGREQ:
case MQTT_MSG_TYPE_PINGRESP:
case MQTT_MSG_TYPE_DISCONNECT:
return (buffer[0] & 0x0f) == 0; /* all flag bits are 0 */
case MQTT_MSG_TYPE_PUBREL:
case MQTT_MSG_TYPE_SUBSCRIBE:
case MQTT_MSG_TYPE_UNSUBSCRIBE:
return (buffer[0] & 0x0f) == 0x02; /* only bit 1 is set */
case MQTT_MSG_TYPE_PUBLISH:
qos = mqtt_get_qos(buffer);
dup = mqtt_get_dup(buffer);
/*
* there is no qos=3 [MQTT-3.3.1-4]
* dup flag must be set to 0 for all qos=0 messages [MQTT-3.3.1-2]
*/
return (qos < 3) && ((qos > 0) || (dup == 0));
default:
return 0;
}
}

View File

@ -35,7 +35,7 @@ typedef struct mqtt_state
int in_buffer_length;
int out_buffer_length;
uint32_t message_length;
uint32_t message_length_read;
uint32_t in_buffer_read_len;
mqtt_message_t *outbound_message;
mqtt_connection_t mqtt_connection;
uint16_t pending_msg_id;
@ -104,6 +104,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
static char *create_string(const char *ptr, int len);
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms);
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
@ -274,13 +275,15 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
ESP_LOGE(TAG, "Writing failed, errno= %d", errno);
return ESP_FAIL;
}
read_len = esp_transport_read(client->transport,
(char *)client->mqtt_state.in_buffer,
client->mqtt_state.in_buffer_length,
client->config->network_timeout_ms);
if (read_len < 0) {
ESP_LOGE(TAG, "Error network response");
return ESP_FAIL;
client->mqtt_state.in_buffer_read_len = 0;
client->mqtt_state.message_length = 0;
/* wait configured network timeout for broker connection response */
read_len = mqtt_message_receive(client, client->config->network_timeout_ms);
if (read_len <= 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len);
return ESP_FAIL;
}
if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) {
@ -534,92 +537,65 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
{
mqtt_header_state_t mqtt_header_state = MQTT_HEADER_STATE_INCOMPLETE;
const char *mqtt_topic = NULL, *mqtt_data = NULL;
uint32_t mqtt_topic_length, mqtt_data_length;
uint32_t mqtt_len = 0, mqtt_offset = 0;
int len_read= length;
int max_to_read = client->mqtt_state.in_buffer_length;
int buffer_offset = 0;
esp_transport_handle_t transport = client->transport;
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
size_t msg_total_len = client->mqtt_state.message_length;
size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len;
size_t msg_data_offset = 0;
const char *msg_topic = NULL, *msg_data = NULL;
do
{
if (mqtt_header_state == MQTT_HEADER_STATE_INCOMPLETE) {
// any further reading only the underlying payload
transport = esp_transport_get_payload_transport_handle(transport);
if (!mqtt_header_complete(message, length)) {
// mqtt header is not complete, continue reading
memmove(client->mqtt_state.in_buffer, message, length);
buffer_offset = length;
message = client->mqtt_state.in_buffer;
max_to_read = client->mqtt_state.in_buffer_length - length;
mqtt_len = 0;
} else {
// mqtt header was fully read
mqtt_header_state = MQTT_HEADER_STATE_COMPLETE;
mqtt_data_length = mqtt_topic_length = length;
// Read the topic
if(NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length))) {
ESP_LOGE(TAG, "Unable to read topic from header");
break;
}
// Read the payload
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
if(mqtt_data_length > 0 && mqtt_data == NULL) {
ESP_LOGE(TAG, "Unable to read data from message");
break;
} else if(mqtt_data_length > 0) {
mqtt_len = mqtt_data_length;
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
} else {
mqtt_len = 0;
}
// read msg id only once
client->event.msg_id = mqtt_get_id(message, length);
}
} else {
mqtt_len = len_read;
mqtt_data = (const char*)client->mqtt_state.in_buffer;
mqtt_topic = NULL;
mqtt_topic_length = 0;
// get topic
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
return ESP_FAIL;
}
ESP_LOGD(TAG, "%s: msg_topic_len=%u", __func__, msg_topic_len);
// get payload
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
if(msg_data_len > 0 && msg_data == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__);
return ESP_FAIL;
}
// post data event
client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
ESP_LOGD(TAG, "Get data len= %d, topic len=%d, total_data: %d offset: %d", msg_data_len, msg_topic_len,
client->event.total_data_len, msg_data_offset);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = msg_data_len > 0 ? (char *)msg_data : NULL;
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
client->event.topic = (char *)msg_topic;
client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client);
if (msg_read_len < msg_total_len) {
// if total data is longer then actual -> read payload only
size_t buf_len = client->mqtt_state.in_buffer_length;
esp_transport_handle_t transport = esp_transport_get_payload_transport_handle(client->transport);
msg_data = (const char*)client->mqtt_state.in_buffer;
msg_topic = NULL;
msg_topic_len = 0;
msg_data_offset += msg_data_len;
msg_data_len = esp_transport_read(transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", msg_data_len, errno);
return ESP_FAIL;
}
if (mqtt_header_state == MQTT_HEADER_STATE_COMPLETE) {
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = mqtt_len > 0 ? (char *)mqtt_data : NULL;
client->event.data_len = mqtt_len;
client->event.total_data_len = mqtt_data_length;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
}
mqtt_offset += mqtt_len;
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
break;
}
len_read = esp_transport_read(transport,
(char *)client->mqtt_state.in_buffer + buffer_offset,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ?
max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
client->config->network_timeout_ms);
length = len_read + buffer_offset;
buffer_offset = 0;
max_to_read = client->mqtt_state.in_buffer_length;
if (len_read <= 0) {
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", len_read, errno);
break;
}
client->mqtt_state.message_length_read += len_read;
} while (1);
msg_read_len += msg_data_len;
goto post_data_event;
}
return ESP_OK;
}
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
@ -677,109 +653,239 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client)
//unlock
}
/*
* Returns:
* -1 in case of failure
* 0 if no message has been received
* 1 if a message has been received and placed to client->mqtt_state:
* message length: client->mqtt_state.message_length
* message content: client->mqtt_state.in_buffer
*/
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
{
int read_len, total_len, fixed_header_len;
uint8_t *buf = client->mqtt_state.in_buffer + client->mqtt_state.in_buffer_read_len;
esp_transport_handle_t t = client->transport;
client->mqtt_state.message_length = 0;
if (client->mqtt_state.in_buffer_read_len == 0) {
/*
* Read first byte of the mqtt packet fixed header, it contains packet
* type and flags.
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGV(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
/*
* Verify the flags and act according to MQTT protocol: close connection
* if the flags are set incorrectly.
*/
if (!mqtt_has_valid_msg_hdr(buf, read_len)) {
ESP_LOGE(TAG, "%s: received a message with an invalid header=0x%x", __func__, *buf);
goto err;
}
buf++;
client->mqtt_state.in_buffer_read_len++;
}
/* any further reading only the underlying payload */
t = esp_transport_get_payload_transport_handle(t);
if ((client->mqtt_state.in_buffer_read_len == 1) ||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
do {
/*
* Read the "remaining length" part of mqtt packet fixed header. It
* starts at second byte and spans up to 4 bytes, but we accept here
* only up to 2 bytes of remaining length, i.e. messages with
* maximal remaining length value = 16383 (maximal total message
* size of 16386 bytes).
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
buf++;
client->mqtt_state.in_buffer_read_len++;
} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
}
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
ESP_LOGD(TAG, "%s: total message length: %d (already read: %u)", __func__, total_len, client->mqtt_state.in_buffer_read_len);
client->mqtt_state.message_length = total_len;
if (client->mqtt_state.in_buffer_length < total_len) {
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
/*
* In case larger publish messages, we only need to read full topic, data can be split to multiple data event.
* Evaluate and correct total_len to read only publish message header, so data can be read separately
*/
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
/* read next 2 bytes - topic length to get minimum portion of publish packet */
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
} else if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %u)",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
}
int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8;
topic_len |= client->mqtt_state.in_buffer[fixed_header_len+1];
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer)>0?2:0);
ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len);
if (client->mqtt_state.in_buffer_length < total_len) {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
} else {
total_len = client->mqtt_state.in_buffer_length;
}
/* free to continue with reading */
} else {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
}
}
if (client->mqtt_state.in_buffer_read_len < total_len) {
/* read the rest of the mqtt message */
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %u)",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
}
ESP_LOGD(TAG, "%s: transport_read():%d %d", __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
return 1;
err:
return -1;
}
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
{
int read_len;
uint8_t msg_type;
uint8_t msg_qos;
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
uint32_t transport_message_offset = 0 ;
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0);
if (read_len < 0) {
ESP_LOGE(TAG, "Read error or end of stream, errno:%d", errno);
/* non-blocking receive in order not to block other tasks */
int recv = mqtt_message_receive(client, 0);
if (recv < 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
return ESP_FAIL;
}
if (read_len == 0) {
if (recv == 0) {
return ESP_OK;
}
int read_len = client->mqtt_state.message_length;
// In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here
while ( transport_message_offset < read_len ) {
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
client->mqtt_state.message_length_read = read_len - transport_message_offset;
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, read_len);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
switch (msg_type)
{
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
if (msg_qos == 1) {
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
}
else if (msg_qos == 2) {
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
}
switch (msg_type)
{
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
if (deliver_publish(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
return ESP_FAIL;
}
if (msg_qos == 1) {
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
}
else if (msg_qos == 2) {
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
}
if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
// TODO: Shoule reconnect?
// return ESP_FAIL;
}
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
return ESP_FAIL;
}
// Deliver the publish message
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length);
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
}
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
break;
}
transport_message_offset += client->mqtt_state.message_length;
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
break;
}
client->mqtt_state.in_buffer_read_len = 0;
return ESP_OK;
}