diff --git a/include/mqtt.h b/include/mqtt.h index 3ff5ccf..8f8fb52 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -1,6 +1,7 @@ #ifndef _MQTT_H_ #define _MQTT_H_ #include +#include #include #include "mqtt_config.h" #include "mqtt_msg.h" diff --git a/mqtt.c b/mqtt.c index 74dbe86..2287903 100644 --- a/mqtt.c +++ b/mqtt.c @@ -150,28 +150,28 @@ static bool client_connect(mqtt_client *client) // including SSL objects if CNFIG_MQTT_SECURITY_ON is enabled void closeclient(mqtt_client *client) { + mqtt_info("Closing client socket"); + + if (client->socket != -1) + { + close(client->socket); + client->socket = -1; + } #if defined(CONFIG_MQTT_SECURITY_ON) - if (client->ssl != NULL) - { - SSL_shutdown(client->ssl); + if (client->ssl != NULL) + { + SSL_shutdown(client->ssl); - SSL_free(client->ssl); - client->ssl = NULL; - } -#endif - if (client->socket != -1) - { - close(client->socket); - client->socket = -1; - } + SSL_free(client->ssl); + client->ssl = NULL; + } -#if defined(CONFIG_MQTT_SECURITY_ON) - if (client->ctx != NULL) - { - SSL_CTX_free(client->ctx); - client->ctx = NULL; - } + if (client->ctx != NULL) + { + SSL_CTX_free(client->ctx); + client->ctx = NULL; + } #endif } @@ -305,9 +305,10 @@ void mqtt_sending_task(void *pvParameters) mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; int send_len; + bool connected = true; mqtt_info("mqtt_sending_task"); - while (1) { + while (connected) { if (xQueueReceive(client->xSendingQueue, &msg_len, 1000 / portTICK_RATE_MS)) { //queue available while (msg_len > 0) { @@ -319,10 +320,11 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); - if(send_len < 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 5 * 1000); + if(send_len <= 0) { mqtt_info("Write error: %d", errno); - break; // TODO is this right handling? + connected = false; + break; } //TODO: Check sending type, to callback publish message @@ -340,12 +342,19 @@ void mqtt_sending_task(void *pvParameters) client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); mqtt_info("Sending pingreq"); - client->settings->write_cb(client, + send_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); + if(send_len <= 0) { + mqtt_info("Write error: %d", errno); + connected = false; + break; + } } } } + closeclient(client); + xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -399,13 +408,12 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { if (terminate_mqtt) break; + if (xMqttSendingTask == NULL) break; read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); mqtt_info("Read len %d", read_len); - if (read_len == 0) - break; - if (read_len < 0) { + if (read_len <= 0) { // ECONNRESET for example mqtt_info("Read error %d", errno); break; @@ -534,7 +542,9 @@ void mqtt_task(void *pvParameters) client->settings->disconnected_cb(client, NULL); } - vTaskDelete(xMqttSendingTask); + if (xMqttSendingTask != NULL) { + vTaskDelete(xMqttSendingTask); + } if (!client->settings->auto_reconnect) { break; }