mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-29 18:28:24 +02:00
client: Report failure on timeout in mid-message timeout
Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
This commit is contained in:
@ -43,21 +43,36 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client);
|
||||
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client);
|
||||
|
||||
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client)
|
||||
/**
|
||||
* @brief Processes error reported from transport layer (considering the message read status)
|
||||
*
|
||||
* @param err: Error reported from TCP transport
|
||||
* @param client: MQTT client handle
|
||||
* @param mid_message: True if the error occured when reading incomplete message
|
||||
*
|
||||
* @return - 0 on Timeout
|
||||
* - -1 on Timeout with incomplete message
|
||||
* - -2 on Error or EOF
|
||||
*/
|
||||
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client, bool mid_message)
|
||||
{
|
||||
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): EOF", __func__);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (err == ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT) {
|
||||
if (mid_message) {
|
||||
// No error message, because we could've read with timeout 0 (caller will decide)
|
||||
return -1;
|
||||
}
|
||||
// Not an error, continue
|
||||
ESP_LOGD(TAG, "%s: transport_read(): call timed out before data was ready!", __func__);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
|
||||
ESP_LOGE(TAG, "%s: transport_read(): EOF", __func__);
|
||||
}
|
||||
|
||||
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
|
||||
esp_mqtt_client_dispatch_transport_error(client);
|
||||
return -1;
|
||||
return -2;
|
||||
}
|
||||
|
||||
#if MQTT_ENABLE_SSL
|
||||
@ -1094,7 +1109,7 @@ post_data_event:
|
||||
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
|
||||
client->config->network_timeout_ms);
|
||||
if (ret <= 0) {
|
||||
return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL;
|
||||
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
|
||||
}
|
||||
|
||||
msg_data_len = ret;
|
||||
@ -1178,11 +1193,13 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_
|
||||
|
||||
/*
|
||||
* Returns:
|
||||
* -1 in case of failure
|
||||
* -2 in case of failure or EOF (clean connection closure)
|
||||
* -1 timeout while in-the-middle of the messge
|
||||
* 0 if no message has been received
|
||||
* 1 if a message has been received and placed to client->mqtt_state:
|
||||
* message length: client->mqtt_state.message_length
|
||||
* message content: client->mqtt_state.in_buffer
|
||||
*
|
||||
*/
|
||||
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
|
||||
{
|
||||
@ -1198,7 +1215,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
*/
|
||||
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
|
||||
if (read_len <= 0) {
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client);
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client, false);
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
|
||||
/*
|
||||
@ -1224,7 +1241,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
*/
|
||||
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
|
||||
if (read_len <= 0) {
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client);
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client, true);
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
|
||||
buf++;
|
||||
@ -1245,7 +1262,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
|
||||
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
|
||||
if (read_len <= 0) {
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client);
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client, true);
|
||||
}
|
||||
client->mqtt_state.in_buffer_read_len += read_len;
|
||||
buf += read_len;
|
||||
@ -1276,7 +1293,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
|
||||
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
|
||||
if (read_len <= 0) {
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client);
|
||||
return esp_mqtt_handle_transport_read_error(read_len, client, true);
|
||||
}
|
||||
client->mqtt_state.in_buffer_read_len += read_len;
|
||||
if (client->mqtt_state.in_buffer_read_len < total_len) {
|
||||
@ -1290,23 +1307,32 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
return 1;
|
||||
err:
|
||||
esp_mqtt_client_dispatch_transport_error(client);
|
||||
return -1;
|
||||
return -2;
|
||||
}
|
||||
|
||||
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
uint8_t msg_type = 0, msg_qos = 0;
|
||||
uint16_t msg_id = 0;
|
||||
size_t previous_in_buffer_read_len = client->mqtt_state.in_buffer_read_len;
|
||||
|
||||
/* non-blocking receive in order not to block other tasks */
|
||||
int recv = mqtt_message_receive(client, 0);
|
||||
if (recv < 0) {
|
||||
if (recv == 0) { // Timeout
|
||||
return ESP_OK;
|
||||
}
|
||||
if (recv == -1) { // Mid-message timeout
|
||||
if (previous_in_buffer_read_len == client->mqtt_state.in_buffer_read_len) {
|
||||
// Report error only if didn't receive anything since previous iteration
|
||||
ESP_LOGE(TAG, "%s: Network timeout while reading MQTT message", __func__);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
return ESP_OK; // Treat as standard timeout (keep reading the message)
|
||||
}
|
||||
if (recv < 0) { // Other error
|
||||
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (recv == 0) {
|
||||
return ESP_OK;
|
||||
}
|
||||
int read_len = client->mqtt_state.message_length;
|
||||
|
||||
// If the message was valid, get the type, quality of service and id of the message
|
||||
|
Reference in New Issue
Block a user