mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-30 18:58:07 +02:00
corrected outbox for oversized messge and qos1, added errno to error messages
This commit is contained in:
@ -35,7 +35,7 @@ typedef enum pending_state {
|
|||||||
|
|
||||||
outbox_handle_t outbox_init();
|
outbox_handle_t outbox_init();
|
||||||
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
|
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
|
||||||
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending);
|
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick);
|
||||||
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
|
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
|
||||||
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
|
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
|
||||||
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
||||||
|
@ -40,7 +40,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
|
|||||||
item->msg_type = message->msg_type;
|
item->msg_type = message->msg_type;
|
||||||
item->msg_qos = message->msg_qos;
|
item->msg_qos = message->msg_qos;
|
||||||
item->tick = tick;
|
item->tick = tick;
|
||||||
item->len = message->len;
|
item->len = message->len + message->remaining_len;
|
||||||
item->pending = QUEUED;
|
item->pending = QUEUED;
|
||||||
item->buffer = malloc(message->len + message->remaining_len);
|
item->buffer = malloc(message->len + message->remaining_len);
|
||||||
ESP_MEM_CHECK(TAG, item->buffer, {
|
ESP_MEM_CHECK(TAG, item->buffer, {
|
||||||
@ -67,11 +67,14 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending)
|
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick)
|
||||||
{
|
{
|
||||||
outbox_item_handle_t item;
|
outbox_item_handle_t item;
|
||||||
STAILQ_FOREACH(item, outbox, next) {
|
STAILQ_FOREACH(item, outbox, next) {
|
||||||
if (item->pending == pending) {
|
if (item->pending == pending) {
|
||||||
|
if (tick) {
|
||||||
|
*tick = item->tick;
|
||||||
|
}
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -169,7 +172,7 @@ int outbox_get_size(outbox_handle_t outbox)
|
|||||||
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size)
|
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size)
|
||||||
{
|
{
|
||||||
while(outbox_get_size(outbox) > max_size) {
|
while(outbox_get_size(outbox) > max_size) {
|
||||||
outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED);
|
outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED, NULL);
|
||||||
if (item == NULL) {
|
if (item == NULL) {
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
@ -501,7 +501,7 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
|
|||||||
client->config->network_timeout_ms);
|
client->config->network_timeout_ms);
|
||||||
// client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
// client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
if (write_len <= 0) {
|
if (write_len <= 0) {
|
||||||
ESP_LOGE(TAG, "Error write data or timeout, written len = %d", write_len);
|
ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
/* we've just sent a mqtt control packet, update keepalive counter
|
/* we've just sent a mqtt control packet, update keepalive counter
|
||||||
@ -624,9 +624,6 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem
|
|||||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||||
//lock mutex
|
//lock mutex
|
||||||
outbox_message_t msg = { 0 };
|
outbox_message_t msg = { 0 };
|
||||||
if (client->mqtt_state.pending_msg_count > 0) {
|
|
||||||
client->mqtt_state.pending_msg_count --;
|
|
||||||
}
|
|
||||||
msg.data = client->mqtt_state.outbound_message->data;
|
msg.data = client->mqtt_state.outbound_message->data;
|
||||||
msg.len = client->mqtt_state.outbound_message->length;
|
msg.len = client->mqtt_state.outbound_message->length;
|
||||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||||
@ -669,7 +666,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
|||||||
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0);
|
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0);
|
||||||
|
|
||||||
if (read_len < 0) {
|
if (read_len < 0) {
|
||||||
ESP_LOGE(TAG, "Read error or end of stream");
|
ESP_LOGE(TAG, "Read error or end of stream, errno:%d", errno);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -787,6 +784,7 @@ static void esp_mqtt_task(void *pv)
|
|||||||
{
|
{
|
||||||
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
|
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
|
||||||
uint32_t last_retransmit = 0;
|
uint32_t last_retransmit = 0;
|
||||||
|
int32_t msg_tick = 0;
|
||||||
client->run = true;
|
client->run = true;
|
||||||
|
|
||||||
//get transport by scheme
|
//get transport by scheme
|
||||||
@ -845,7 +843,7 @@ static void esp_mqtt_task(void *pv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// resend all non-transmitted messages first
|
// resend all non-transmitted messages first
|
||||||
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED);
|
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
|
||||||
if (item) {
|
if (item) {
|
||||||
if (mqtt_resend_queued(client, item) == ESP_OK) {
|
if (mqtt_resend_queued(client, item) == ESP_OK) {
|
||||||
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
||||||
@ -853,8 +851,8 @@ static void esp_mqtt_task(void *pv)
|
|||||||
// resend other "transmitted" messages after 1s
|
// resend other "transmitted" messages after 1s
|
||||||
} else if (platform_tick_get_ms() - last_retransmit > 1000) {
|
} else if (platform_tick_get_ms() - last_retransmit > 1000) {
|
||||||
last_retransmit = platform_tick_get_ms();
|
last_retransmit = platform_tick_get_ms();
|
||||||
item = outbox_dequeue(client->outbox, TRANSMITTED);
|
item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick);
|
||||||
if (item) {
|
if (item && (last_retransmit - msg_tick > 1000)) {
|
||||||
mqtt_resend_queued(client, item);
|
mqtt_resend_queued(client, item);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1048,15 +1046,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
qos, retain,
|
qos, retain,
|
||||||
&pending_msg_id);
|
&pending_msg_id);
|
||||||
|
|
||||||
/* We have to set as pending all the qos>0 messages) */
|
/* We have to set as pending all the qos>0 messages */
|
||||||
if (qos > 0) {
|
if (qos > 0) {
|
||||||
client->mqtt_state.outbound_message = publish_msg;
|
client->mqtt_state.outbound_message = publish_msg;
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_id = pending_msg_id;
|
client->mqtt_state.pending_msg_id = pending_msg_id;
|
||||||
client->mqtt_state.pending_publish_qos = qos;
|
client->mqtt_state.pending_publish_qos = qos;
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
// by default store as QUEUED (not transmitted yet)
|
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
||||||
mqtt_enqueue(client);
|
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
||||||
|
mqtt_enqueue(client);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
client->mqtt_state.outbound_message = publish_msg;
|
client->mqtt_state.outbound_message = publish_msg;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user