Merge branch 'bugfix/consolidate_err_messages' into 'master'

client: Report failure on timeout in mid-message timeout (GitHub PR)

See merge request espressif/esp-mqtt!165
This commit is contained in:
Rocha Euripedes
2024-01-24 22:23:04 +08:00

View File

@ -43,21 +43,36 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client);
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client);
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client)
/**
* @brief Processes error reported from transport layer (considering the message read status)
*
* @param err: Error reported from TCP transport
* @param client: MQTT client handle
* @param mid_message: True if the error occured when reading incomplete message
*
* @return - 0 on Timeout
* - -1 on Timeout with incomplete message
* - -2 on Error or EOF
*/
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client, bool mid_message)
{
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
ESP_LOGD(TAG, "%s: transport_read(): EOF", __func__);
return 0;
}
if (err == ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT) {
if (mid_message) {
// No error message, because we could've read with timeout 0 (caller will decide)
return -1;
}
// Not an error, continue
ESP_LOGD(TAG, "%s: transport_read(): call timed out before data was ready!", __func__);
return 0;
}
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
ESP_LOGE(TAG, "%s: transport_read(): EOF", __func__);
}
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
esp_mqtt_client_dispatch_transport_error(client);
return -1;
return -2;
}
#if MQTT_ENABLE_SSL
@ -1094,7 +1109,7 @@ post_data_event:
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL;
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
}
msg_data_len = ret;
@ -1178,11 +1193,13 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_
/*
* Returns:
* -1 in case of failure
* -2 in case of failure or EOF (clean connection closure)
* -1 timeout while in-the-middle of the messge
* 0 if no message has been received
* 1 if a message has been received and placed to client->mqtt_state:
* message length: client->mqtt_state.message_length
* message content: client->mqtt_state.in_buffer
*
*/
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
{
@ -1198,7 +1215,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, false);
}
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
/*
@ -1224,7 +1241,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
buf++;
@ -1245,7 +1262,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
@ -1276,7 +1293,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
@ -1290,23 +1307,32 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
return 1;
err:
esp_mqtt_client_dispatch_transport_error(client);
return -1;
return -2;
}
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
{
uint8_t msg_type = 0, msg_qos = 0;
uint16_t msg_id = 0;
size_t previous_in_buffer_read_len = client->mqtt_state.in_buffer_read_len;
/* non-blocking receive in order not to block other tasks */
int recv = mqtt_message_receive(client, 0);
if (recv < 0) {
if (recv == 0) { // Timeout
return ESP_OK;
}
if (recv == -1) { // Mid-message timeout
if (previous_in_buffer_read_len == client->mqtt_state.in_buffer_read_len) {
// Report error only if didn't receive anything since previous iteration
ESP_LOGE(TAG, "%s: Network timeout while reading MQTT message", __func__);
return ESP_FAIL;
}
return ESP_OK; // Treat as standard timeout (keep reading the message)
}
if (recv < 0) { // Other error
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
return ESP_FAIL;
}
if (recv == 0) {
return ESP_OK;
}
int read_len = client->mqtt_state.message_length;
// If the message was valid, get the type, quality of service and id of the message