mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-29 18:28:24 +02:00
client receive: refactor receive to read only one message to fragment only payload for longer messages
Closes #113
This commit is contained in:
@ -112,10 +112,11 @@ static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01);
|
||||
|
||||
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
|
||||
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length);
|
||||
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length);
|
||||
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len);
|
||||
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
|
||||
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);
|
||||
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length);
|
||||
int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length);
|
||||
|
||||
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
|
||||
mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);
|
||||
|
@ -154,7 +154,7 @@ void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buff
|
||||
connection->buffer_length = buffer_length;
|
||||
}
|
||||
|
||||
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
|
||||
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len)
|
||||
{
|
||||
int i;
|
||||
uint32_t totlen = 0;
|
||||
@ -169,6 +169,7 @@ uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
|
||||
}
|
||||
}
|
||||
totlen += i;
|
||||
if (fixed_size_len) *fixed_size_len = i;
|
||||
|
||||
return totlen;
|
||||
}
|
||||
@ -529,3 +530,44 @@ mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection)
|
||||
init_message(connection);
|
||||
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* check flags: [MQTT-2.2.2-1], [MQTT-2.2.2-2]
|
||||
* returns 0 if flags are invalid, otherwise returns 1
|
||||
*/
|
||||
int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length)
|
||||
{
|
||||
int qos, dup;
|
||||
|
||||
if (length < 1) {
|
||||
return 0;
|
||||
}
|
||||
switch (mqtt_get_type(buffer))
|
||||
{
|
||||
case MQTT_MSG_TYPE_CONNECT:
|
||||
case MQTT_MSG_TYPE_CONNACK:
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
case MQTT_MSG_TYPE_PINGREQ:
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
case MQTT_MSG_TYPE_DISCONNECT:
|
||||
return (buffer[0] & 0x0f) == 0; /* all flag bits are 0 */
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
case MQTT_MSG_TYPE_SUBSCRIBE:
|
||||
case MQTT_MSG_TYPE_UNSUBSCRIBE:
|
||||
return (buffer[0] & 0x0f) == 0x02; /* only bit 1 is set */
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
qos = mqtt_get_qos(buffer);
|
||||
dup = mqtt_get_dup(buffer);
|
||||
/*
|
||||
* there is no qos=3 [MQTT-3.3.1-4]
|
||||
* dup flag must be set to 0 for all qos=0 messages [MQTT-3.3.1-2]
|
||||
*/
|
||||
return (qos < 3) && ((qos > 0) || (dup == 0));
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
460
mqtt_client.c
460
mqtt_client.c
@ -35,7 +35,7 @@ typedef struct mqtt_state
|
||||
int in_buffer_length;
|
||||
int out_buffer_length;
|
||||
uint32_t message_length;
|
||||
uint32_t message_length_read;
|
||||
uint32_t in_buffer_read_len;
|
||||
mqtt_message_t *outbound_message;
|
||||
mqtt_connection_t mqtt_connection;
|
||||
uint16_t pending_msg_id;
|
||||
@ -104,6 +104,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
|
||||
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
|
||||
static char *create_string(const char *ptr, int len);
|
||||
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms);
|
||||
|
||||
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
||||
{
|
||||
@ -274,13 +275,15 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
ESP_LOGE(TAG, "Writing failed, errno= %d", errno);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
read_len = esp_transport_read(client->transport,
|
||||
(char *)client->mqtt_state.in_buffer,
|
||||
client->mqtt_state.in_buffer_length,
|
||||
client->config->network_timeout_ms);
|
||||
if (read_len < 0) {
|
||||
ESP_LOGE(TAG, "Error network response");
|
||||
return ESP_FAIL;
|
||||
|
||||
client->mqtt_state.in_buffer_read_len = 0;
|
||||
client->mqtt_state.message_length = 0;
|
||||
|
||||
/* wait configured network timeout for broker connection response */
|
||||
read_len = mqtt_message_receive(client, client->config->network_timeout_ms);
|
||||
if (read_len <= 0) {
|
||||
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) {
|
||||
@ -534,92 +537,65 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
|
||||
static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
mqtt_header_state_t mqtt_header_state = MQTT_HEADER_STATE_INCOMPLETE;
|
||||
const char *mqtt_topic = NULL, *mqtt_data = NULL;
|
||||
uint32_t mqtt_topic_length, mqtt_data_length;
|
||||
uint32_t mqtt_len = 0, mqtt_offset = 0;
|
||||
int len_read= length;
|
||||
int max_to_read = client->mqtt_state.in_buffer_length;
|
||||
int buffer_offset = 0;
|
||||
esp_transport_handle_t transport = client->transport;
|
||||
uint8_t *msg_buf = client->mqtt_state.in_buffer;
|
||||
size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
|
||||
size_t msg_total_len = client->mqtt_state.message_length;
|
||||
size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len;
|
||||
size_t msg_data_offset = 0;
|
||||
const char *msg_topic = NULL, *msg_data = NULL;
|
||||
|
||||
do
|
||||
{
|
||||
if (mqtt_header_state == MQTT_HEADER_STATE_INCOMPLETE) {
|
||||
// any further reading only the underlying payload
|
||||
transport = esp_transport_get_payload_transport_handle(transport);
|
||||
if (!mqtt_header_complete(message, length)) {
|
||||
// mqtt header is not complete, continue reading
|
||||
memmove(client->mqtt_state.in_buffer, message, length);
|
||||
buffer_offset = length;
|
||||
message = client->mqtt_state.in_buffer;
|
||||
max_to_read = client->mqtt_state.in_buffer_length - length;
|
||||
mqtt_len = 0;
|
||||
} else {
|
||||
// mqtt header was fully read
|
||||
mqtt_header_state = MQTT_HEADER_STATE_COMPLETE;
|
||||
mqtt_data_length = mqtt_topic_length = length;
|
||||
// Read the topic
|
||||
if(NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length))) {
|
||||
ESP_LOGE(TAG, "Unable to read topic from header");
|
||||
break;
|
||||
}
|
||||
// Read the payload
|
||||
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
|
||||
if(mqtt_data_length > 0 && mqtt_data == NULL) {
|
||||
ESP_LOGE(TAG, "Unable to read data from message");
|
||||
break;
|
||||
} else if(mqtt_data_length > 0) {
|
||||
mqtt_len = mqtt_data_length;
|
||||
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
|
||||
} else {
|
||||
mqtt_len = 0;
|
||||
}
|
||||
|
||||
// read msg id only once
|
||||
client->event.msg_id = mqtt_get_id(message, length);
|
||||
}
|
||||
} else {
|
||||
mqtt_len = len_read;
|
||||
mqtt_data = (const char*)client->mqtt_state.in_buffer;
|
||||
mqtt_topic = NULL;
|
||||
mqtt_topic_length = 0;
|
||||
// get topic
|
||||
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
|
||||
if (msg_topic == NULL) {
|
||||
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: msg_topic_len=%u", __func__, msg_topic_len);
|
||||
|
||||
// get payload
|
||||
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
|
||||
if(msg_data_len > 0 && msg_data == NULL) {
|
||||
ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
// post data event
|
||||
client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len);
|
||||
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
|
||||
|
||||
post_data_event:
|
||||
ESP_LOGD(TAG, "Get data len= %d, topic len=%d, total_data: %d offset: %d", msg_data_len, msg_topic_len,
|
||||
client->event.total_data_len, msg_data_offset);
|
||||
client->event.event_id = MQTT_EVENT_DATA;
|
||||
client->event.data = msg_data_len > 0 ? (char *)msg_data : NULL;
|
||||
client->event.data_len = msg_data_len;
|
||||
client->event.current_data_offset = msg_data_offset;
|
||||
client->event.topic = (char *)msg_topic;
|
||||
client->event.topic_len = msg_topic_len;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
|
||||
if (msg_read_len < msg_total_len) {
|
||||
// if total data is longer then actual -> read payload only
|
||||
size_t buf_len = client->mqtt_state.in_buffer_length;
|
||||
esp_transport_handle_t transport = esp_transport_get_payload_transport_handle(client->transport);
|
||||
|
||||
msg_data = (const char*)client->mqtt_state.in_buffer;
|
||||
msg_topic = NULL;
|
||||
msg_topic_len = 0;
|
||||
msg_data_offset += msg_data_len;
|
||||
msg_data_len = esp_transport_read(transport, (char *)client->mqtt_state.in_buffer,
|
||||
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
|
||||
client->config->network_timeout_ms);
|
||||
if (msg_data_len <= 0) {
|
||||
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", msg_data_len, errno);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
if (mqtt_header_state == MQTT_HEADER_STATE_COMPLETE) {
|
||||
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
|
||||
client->event.event_id = MQTT_EVENT_DATA;
|
||||
client->event.data = mqtt_len > 0 ? (char *)mqtt_data : NULL;
|
||||
client->event.data_len = mqtt_len;
|
||||
client->event.total_data_len = mqtt_data_length;
|
||||
client->event.current_data_offset = mqtt_offset;
|
||||
client->event.topic = (char *)mqtt_topic;
|
||||
client->event.topic_len = mqtt_topic_length;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
}
|
||||
|
||||
mqtt_offset += mqtt_len;
|
||||
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
|
||||
break;
|
||||
}
|
||||
|
||||
len_read = esp_transport_read(transport,
|
||||
(char *)client->mqtt_state.in_buffer + buffer_offset,
|
||||
client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ?
|
||||
max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
|
||||
client->config->network_timeout_ms);
|
||||
length = len_read + buffer_offset;
|
||||
buffer_offset = 0;
|
||||
max_to_read = client->mqtt_state.in_buffer_length;
|
||||
if (len_read <= 0) {
|
||||
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", len_read, errno);
|
||||
break;
|
||||
}
|
||||
client->mqtt_state.message_length_read += len_read;
|
||||
} while (1);
|
||||
|
||||
msg_read_len += msg_data_len;
|
||||
goto post_data_event;
|
||||
}
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
|
||||
@ -677,109 +653,239 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client)
|
||||
//unlock
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Returns:
|
||||
* -1 in case of failure
|
||||
* 0 if no message has been received
|
||||
* 1 if a message has been received and placed to client->mqtt_state:
|
||||
* message length: client->mqtt_state.message_length
|
||||
* message content: client->mqtt_state.in_buffer
|
||||
*/
|
||||
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
|
||||
{
|
||||
int read_len, total_len, fixed_header_len;
|
||||
uint8_t *buf = client->mqtt_state.in_buffer + client->mqtt_state.in_buffer_read_len;
|
||||
esp_transport_handle_t t = client->transport;
|
||||
|
||||
client->mqtt_state.message_length = 0;
|
||||
if (client->mqtt_state.in_buffer_read_len == 0) {
|
||||
/*
|
||||
* Read first byte of the mqtt packet fixed header, it contains packet
|
||||
* type and flags.
|
||||
*/
|
||||
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
|
||||
if (read_len < 0) {
|
||||
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
|
||||
goto err;
|
||||
}
|
||||
if (read_len == 0) {
|
||||
ESP_LOGV(TAG, "%s: transport_read(): no data or EOF", __func__);
|
||||
return 0;
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
|
||||
/*
|
||||
* Verify the flags and act according to MQTT protocol: close connection
|
||||
* if the flags are set incorrectly.
|
||||
*/
|
||||
if (!mqtt_has_valid_msg_hdr(buf, read_len)) {
|
||||
ESP_LOGE(TAG, "%s: received a message with an invalid header=0x%x", __func__, *buf);
|
||||
goto err;
|
||||
}
|
||||
buf++;
|
||||
client->mqtt_state.in_buffer_read_len++;
|
||||
}
|
||||
/* any further reading only the underlying payload */
|
||||
t = esp_transport_get_payload_transport_handle(t);
|
||||
if ((client->mqtt_state.in_buffer_read_len == 1) ||
|
||||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
|
||||
do {
|
||||
/*
|
||||
* Read the "remaining length" part of mqtt packet fixed header. It
|
||||
* starts at second byte and spans up to 4 bytes, but we accept here
|
||||
* only up to 2 bytes of remaining length, i.e. messages with
|
||||
* maximal remaining length value = 16383 (maximal total message
|
||||
* size of 16386 bytes).
|
||||
*/
|
||||
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
|
||||
if (read_len < 0) {
|
||||
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
|
||||
goto err;
|
||||
}
|
||||
if (read_len == 0) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
|
||||
return 0;
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
|
||||
buf++;
|
||||
client->mqtt_state.in_buffer_read_len++;
|
||||
} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
|
||||
}
|
||||
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
|
||||
ESP_LOGD(TAG, "%s: total message length: %d (already read: %u)", __func__, total_len, client->mqtt_state.in_buffer_read_len);
|
||||
client->mqtt_state.message_length = total_len;
|
||||
if (client->mqtt_state.in_buffer_length < total_len) {
|
||||
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
|
||||
/*
|
||||
* In case larger publish messages, we only need to read full topic, data can be split to multiple data event.
|
||||
* Evaluate and correct total_len to read only publish message header, so data can be read separately
|
||||
*/
|
||||
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
|
||||
/* read next 2 bytes - topic length to get minimum portion of publish packet */
|
||||
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
|
||||
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
|
||||
if (read_len < 0) {
|
||||
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
|
||||
goto err;
|
||||
} else if (read_len == 0) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
|
||||
return 0;
|
||||
}
|
||||
client->mqtt_state.in_buffer_read_len += read_len;
|
||||
buf += read_len;
|
||||
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %u)",
|
||||
__func__, total_len, client->mqtt_state.in_buffer_read_len);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8;
|
||||
topic_len |= client->mqtt_state.in_buffer[fixed_header_len+1];
|
||||
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer)>0?2:0);
|
||||
ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len);
|
||||
if (client->mqtt_state.in_buffer_length < total_len) {
|
||||
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
|
||||
goto err;
|
||||
} else {
|
||||
total_len = client->mqtt_state.in_buffer_length;
|
||||
}
|
||||
/* free to continue with reading */
|
||||
} else {
|
||||
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
if (client->mqtt_state.in_buffer_read_len < total_len) {
|
||||
/* read the rest of the mqtt message */
|
||||
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
|
||||
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
|
||||
if (read_len < 0) {
|
||||
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
|
||||
goto err;
|
||||
}
|
||||
if (read_len == 0) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
|
||||
return 0;
|
||||
}
|
||||
client->mqtt_state.in_buffer_read_len += read_len;
|
||||
if (client->mqtt_state.in_buffer_read_len < total_len) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %u)",
|
||||
__func__, total_len, client->mqtt_state.in_buffer_read_len);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: transport_read():%d %d", __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
return 1;
|
||||
err:
|
||||
return -1;
|
||||
}
|
||||
|
||||
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
int read_len;
|
||||
uint8_t msg_type;
|
||||
uint8_t msg_qos;
|
||||
uint8_t msg_type;
|
||||
uint8_t msg_qos;
|
||||
uint16_t msg_id;
|
||||
uint32_t transport_message_offset = 0 ;
|
||||
|
||||
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0);
|
||||
|
||||
if (read_len < 0) {
|
||||
ESP_LOGE(TAG, "Read error or end of stream, errno:%d", errno);
|
||||
/* non-blocking receive in order not to block other tasks */
|
||||
int recv = mqtt_message_receive(client, 0);
|
||||
if (recv < 0) {
|
||||
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
if (read_len == 0) {
|
||||
if (recv == 0) {
|
||||
return ESP_OK;
|
||||
}
|
||||
int read_len = client->mqtt_state.message_length;
|
||||
|
||||
// In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here
|
||||
while ( transport_message_offset < read_len ) {
|
||||
// If the message was valid, get the type, quality of service and id of the message
|
||||
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
|
||||
client->mqtt_state.message_length_read = read_len - transport_message_offset;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
// If the message was valid, get the type, quality of service and id of the message
|
||||
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
|
||||
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
|
||||
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, read_len);
|
||||
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
|
||||
switch (msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "Subscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "UnSubscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
if (msg_qos == 1) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
else if (msg_qos == 2) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
switch (msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "Subscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "UnSubscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
if (deliver_publish(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (msg_qos == 1) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
else if (msg_qos == 2) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
|
||||
// TODO: Shoule reconnect?
|
||||
// return ESP_FAIL;
|
||||
}
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
// Deliver the publish message
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length);
|
||||
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
transport_message_offset += client->mqtt_state.message_length;
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
client->mqtt_state.in_buffer_read_len = 0;
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user