client receive: refactor receive to read only one message to fragment only payload for longer messages

Closes #113
This commit is contained in:
David Cermak
2019-04-26 16:38:09 +02:00
parent c4fdd7759c
commit fd564b1f17
3 changed files with 328 additions and 179 deletions

View File

@ -112,10 +112,11 @@ static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01);
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length); bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length);
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length); uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length); const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length); const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length); uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length);
int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length);
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);

View File

@ -154,7 +154,7 @@ void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buff
connection->buffer_length = buffer_length; connection->buffer_length = buffer_length;
} }
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length) uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len)
{ {
int i; int i;
uint32_t totlen = 0; uint32_t totlen = 0;
@ -169,6 +169,7 @@ uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
} }
} }
totlen += i; totlen += i;
if (fixed_size_len) *fixed_size_len = i;
return totlen; return totlen;
} }
@ -529,3 +530,44 @@ mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection)
init_message(connection); init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
} }
/*
* check flags: [MQTT-2.2.2-1], [MQTT-2.2.2-2]
* returns 0 if flags are invalid, otherwise returns 1
*/
int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length)
{
int qos, dup;
if (length < 1) {
return 0;
}
switch (mqtt_get_type(buffer))
{
case MQTT_MSG_TYPE_CONNECT:
case MQTT_MSG_TYPE_CONNACK:
case MQTT_MSG_TYPE_PUBACK:
case MQTT_MSG_TYPE_PUBREC:
case MQTT_MSG_TYPE_PUBCOMP:
case MQTT_MSG_TYPE_SUBACK:
case MQTT_MSG_TYPE_UNSUBACK:
case MQTT_MSG_TYPE_PINGREQ:
case MQTT_MSG_TYPE_PINGRESP:
case MQTT_MSG_TYPE_DISCONNECT:
return (buffer[0] & 0x0f) == 0; /* all flag bits are 0 */
case MQTT_MSG_TYPE_PUBREL:
case MQTT_MSG_TYPE_SUBSCRIBE:
case MQTT_MSG_TYPE_UNSUBSCRIBE:
return (buffer[0] & 0x0f) == 0x02; /* only bit 1 is set */
case MQTT_MSG_TYPE_PUBLISH:
qos = mqtt_get_qos(buffer);
dup = mqtt_get_dup(buffer);
/*
* there is no qos=3 [MQTT-3.3.1-4]
* dup flag must be set to 0 for all qos=0 messages [MQTT-3.3.1-2]
*/
return (qos < 3) && ((qos > 0) || (dup == 0));
default:
return 0;
}
}

View File

@ -35,7 +35,7 @@ typedef struct mqtt_state
int in_buffer_length; int in_buffer_length;
int out_buffer_length; int out_buffer_length;
uint32_t message_length; uint32_t message_length;
uint32_t message_length_read; uint32_t in_buffer_read_len;
mqtt_message_t *outbound_message; mqtt_message_t *outbound_message;
mqtt_connection_t mqtt_connection; mqtt_connection_t mqtt_connection;
uint16_t pending_msg_id; uint16_t pending_msg_id;
@ -104,6 +104,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
static char *create_string(const char *ptr, int len); static char *create_string(const char *ptr, int len);
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms);
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{ {
@ -274,13 +275,15 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
ESP_LOGE(TAG, "Writing failed, errno= %d", errno); ESP_LOGE(TAG, "Writing failed, errno= %d", errno);
return ESP_FAIL; return ESP_FAIL;
} }
read_len = esp_transport_read(client->transport,
(char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len = 0;
client->mqtt_state.in_buffer_length, client->mqtt_state.message_length = 0;
client->config->network_timeout_ms);
if (read_len < 0) { /* wait configured network timeout for broker connection response */
ESP_LOGE(TAG, "Error network response"); read_len = mqtt_message_receive(client, client->config->network_timeout_ms);
return ESP_FAIL; if (read_len <= 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len);
return ESP_FAIL;
} }
if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) { if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) {
@ -534,92 +537,65 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
return ESP_FAIL; return ESP_FAIL;
} }
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
{ {
mqtt_header_state_t mqtt_header_state = MQTT_HEADER_STATE_INCOMPLETE; uint8_t *msg_buf = client->mqtt_state.in_buffer;
const char *mqtt_topic = NULL, *mqtt_data = NULL; size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
uint32_t mqtt_topic_length, mqtt_data_length; size_t msg_total_len = client->mqtt_state.message_length;
uint32_t mqtt_len = 0, mqtt_offset = 0; size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len;
int len_read= length; size_t msg_data_offset = 0;
int max_to_read = client->mqtt_state.in_buffer_length; const char *msg_topic = NULL, *msg_data = NULL;
int buffer_offset = 0;
esp_transport_handle_t transport = client->transport;
do // get topic
{ msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (mqtt_header_state == MQTT_HEADER_STATE_INCOMPLETE) { if (msg_topic == NULL) {
// any further reading only the underlying payload ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
transport = esp_transport_get_payload_transport_handle(transport); return ESP_FAIL;
if (!mqtt_header_complete(message, length)) { }
// mqtt header is not complete, continue reading ESP_LOGD(TAG, "%s: msg_topic_len=%u", __func__, msg_topic_len);
memmove(client->mqtt_state.in_buffer, message, length);
buffer_offset = length; // get payload
message = client->mqtt_state.in_buffer; msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
max_to_read = client->mqtt_state.in_buffer_length - length; if(msg_data_len > 0 && msg_data == NULL) {
mqtt_len = 0; ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__);
} else { return ESP_FAIL;
// mqtt header was fully read }
mqtt_header_state = MQTT_HEADER_STATE_COMPLETE;
mqtt_data_length = mqtt_topic_length = length; // post data event
// Read the topic client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len);
if(NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length))) { client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
ESP_LOGE(TAG, "Unable to read topic from header");
break; post_data_event:
} ESP_LOGD(TAG, "Get data len= %d, topic len=%d, total_data: %d offset: %d", msg_data_len, msg_topic_len,
// Read the payload client->event.total_data_len, msg_data_offset);
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); client->event.event_id = MQTT_EVENT_DATA;
if(mqtt_data_length > 0 && mqtt_data == NULL) { client->event.data = msg_data_len > 0 ? (char *)msg_data : NULL;
ESP_LOGE(TAG, "Unable to read data from message"); client->event.data_len = msg_data_len;
break; client->event.current_data_offset = msg_data_offset;
} else if(mqtt_data_length > 0) { client->event.topic = (char *)msg_topic;
mqtt_len = mqtt_data_length; client->event.topic_len = msg_topic_len;
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message); esp_mqtt_dispatch_event(client);
} else {
mqtt_len = 0; if (msg_read_len < msg_total_len) {
} // if total data is longer then actual -> read payload only
size_t buf_len = client->mqtt_state.in_buffer_length;
// read msg id only once esp_transport_handle_t transport = esp_transport_get_payload_transport_handle(client->transport);
client->event.msg_id = mqtt_get_id(message, length);
} msg_data = (const char*)client->mqtt_state.in_buffer;
} else { msg_topic = NULL;
mqtt_len = len_read; msg_topic_len = 0;
mqtt_data = (const char*)client->mqtt_state.in_buffer; msg_data_offset += msg_data_len;
mqtt_topic = NULL; msg_data_len = esp_transport_read(transport, (char *)client->mqtt_state.in_buffer,
mqtt_topic_length = 0; msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", msg_data_len, errno);
return ESP_FAIL;
} }
msg_read_len += msg_data_len;
if (mqtt_header_state == MQTT_HEADER_STATE_COMPLETE) { goto post_data_event;
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); }
client->event.event_id = MQTT_EVENT_DATA; return ESP_OK;
client->event.data = mqtt_len > 0 ? (char *)mqtt_data : NULL;
client->event.data_len = mqtt_len;
client->event.total_data_len = mqtt_data_length;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
}
mqtt_offset += mqtt_len;
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
break;
}
len_read = esp_transport_read(transport,
(char *)client->mqtt_state.in_buffer + buffer_offset,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ?
max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
client->config->network_timeout_ms);
length = len_read + buffer_offset;
buffer_offset = 0;
max_to_read = client->mqtt_state.in_buffer_length;
if (len_read <= 0) {
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", len_read, errno);
break;
}
client->mqtt_state.message_length_read += len_read;
} while (1);
} }
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
@ -677,109 +653,239 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client)
//unlock //unlock
} }
/*
* Returns:
* -1 in case of failure
* 0 if no message has been received
* 1 if a message has been received and placed to client->mqtt_state:
* message length: client->mqtt_state.message_length
* message content: client->mqtt_state.in_buffer
*/
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
{
int read_len, total_len, fixed_header_len;
uint8_t *buf = client->mqtt_state.in_buffer + client->mqtt_state.in_buffer_read_len;
esp_transport_handle_t t = client->transport;
client->mqtt_state.message_length = 0;
if (client->mqtt_state.in_buffer_read_len == 0) {
/*
* Read first byte of the mqtt packet fixed header, it contains packet
* type and flags.
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGV(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
/*
* Verify the flags and act according to MQTT protocol: close connection
* if the flags are set incorrectly.
*/
if (!mqtt_has_valid_msg_hdr(buf, read_len)) {
ESP_LOGE(TAG, "%s: received a message with an invalid header=0x%x", __func__, *buf);
goto err;
}
buf++;
client->mqtt_state.in_buffer_read_len++;
}
/* any further reading only the underlying payload */
t = esp_transport_get_payload_transport_handle(t);
if ((client->mqtt_state.in_buffer_read_len == 1) ||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
do {
/*
* Read the "remaining length" part of mqtt packet fixed header. It
* starts at second byte and spans up to 4 bytes, but we accept here
* only up to 2 bytes of remaining length, i.e. messages with
* maximal remaining length value = 16383 (maximal total message
* size of 16386 bytes).
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
buf++;
client->mqtt_state.in_buffer_read_len++;
} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
}
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
ESP_LOGD(TAG, "%s: total message length: %d (already read: %u)", __func__, total_len, client->mqtt_state.in_buffer_read_len);
client->mqtt_state.message_length = total_len;
if (client->mqtt_state.in_buffer_length < total_len) {
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
/*
* In case larger publish messages, we only need to read full topic, data can be split to multiple data event.
* Evaluate and correct total_len to read only publish message header, so data can be read separately
*/
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
/* read next 2 bytes - topic length to get minimum portion of publish packet */
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
} else if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %u)",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
}
int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8;
topic_len |= client->mqtt_state.in_buffer[fixed_header_len+1];
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer)>0?2:0);
ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len);
if (client->mqtt_state.in_buffer_length < total_len) {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
} else {
total_len = client->mqtt_state.in_buffer_length;
}
/* free to continue with reading */
} else {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
}
}
if (client->mqtt_state.in_buffer_read_len < total_len) {
/* read the rest of the mqtt message */
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %u)",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
}
ESP_LOGD(TAG, "%s: transport_read():%d %d", __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
return 1;
err:
return -1;
}
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
{ {
int read_len; uint8_t msg_type;
uint8_t msg_type; uint8_t msg_qos;
uint8_t msg_qos;
uint16_t msg_id; uint16_t msg_id;
uint32_t transport_message_offset = 0 ;
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0); /* non-blocking receive in order not to block other tasks */
int recv = mqtt_message_receive(client, 0);
if (read_len < 0) { if (recv < 0) {
ESP_LOGE(TAG, "Read error or end of stream, errno:%d", errno); ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
return ESP_FAIL; return ESP_FAIL;
} }
if (recv == 0) {
if (read_len == 0) {
return ESP_OK; return ESP_OK;
} }
int read_len = client->mqtt_state.message_length;
// In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here // If the message was valid, get the type, quality of service and id of the message
while ( transport_message_offset < read_len ) { msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
// If the message was valid, get the type, quality of service and id of the message msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]); msg_id = mqtt_get_id(client->mqtt_state.in_buffer, read_len);
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
client->mqtt_state.message_length_read = read_len - transport_message_offset;
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
switch (msg_type) switch (msg_type)
{ {
case MQTT_MSG_TYPE_SUBACK: case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful"); ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED; client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client); esp_mqtt_dispatch_event_with_msgid(client);
} }
break; break;
case MQTT_MSG_TYPE_UNSUBACK: case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful"); ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client); esp_mqtt_dispatch_event_with_msgid(client);
} }
break; break;
case MQTT_MSG_TYPE_PUBLISH: case MQTT_MSG_TYPE_PUBLISH:
if (msg_qos == 1) { ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); if (deliver_publish(client) != ESP_OK) {
} ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
else if (msg_qos == 2) { return ESP_FAIL;
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); }
} if (msg_qos == 1) {
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
}
else if (msg_qos == 2) {
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
}
if (msg_qos == 1 || msg_qos == 2) { if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (mqtt_write_data(client) != ESP_OK) { if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
// TODO: Shoule reconnect? return ESP_FAIL;
// return ESP_FAIL;
}
} }
// Deliver the publish message }
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length); break;
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); case MQTT_MSG_TYPE_PUBACK:
break; if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
case MQTT_MSG_TYPE_PUBACK: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
outbox_set_pending(client->outbox, msg_id, CONFIRMED); outbox_set_pending(client->outbox, msg_id, CONFIRMED);
mqtt_write_data(client); client->event.event_id = MQTT_EVENT_PUBLISHED;
break; esp_mqtt_dispatch_event_with_msgid(client);
case MQTT_MSG_TYPE_PUBREL: }
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); break;
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); case MQTT_MSG_TYPE_PUBREC:
mqtt_write_data(client); ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
break; client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
case MQTT_MSG_TYPE_PUBCOMP: outbox_set_pending(client->outbox, msg_id, CONFIRMED);
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); mqtt_write_data(client);
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { break;
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); case MQTT_MSG_TYPE_PUBREL:
client->event.event_id = MQTT_EVENT_PUBLISHED; ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
esp_mqtt_dispatch_event_with_msgid(client); client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
} mqtt_write_data(client);
break; break;
case MQTT_MSG_TYPE_PINGRESP: case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
client->wait_for_ping_resp = false; if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
break; ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
} client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
transport_message_offset += client->mqtt_state.message_length; }
break;
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
break;
} }
client->mqtt_state.in_buffer_read_len = 0;
return ESP_OK; return ESP_OK;
} }