diff --git a/include/mqtt.h b/include/mqtt.h index d4bc2da..8f8fb52 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -1,6 +1,7 @@ #ifndef _MQTT_H_ #define _MQTT_H_ #include +#include #include #include "mqtt_config.h" #include "mqtt_msg.h" @@ -44,8 +45,7 @@ typedef struct mqtt_settings { mqtt_write_callback write_cb; mqtt_event_callback connected_cb; - mqtt_event_callback disconnected_cb; // unused - mqtt_event_callback reconnect_cb; // unused + mqtt_event_callback disconnected_cb; mqtt_event_callback subscribe_cb; mqtt_event_callback publish_cb; @@ -63,6 +63,7 @@ typedef struct mqtt_settings { uint32_t lwt_retain; uint32_t clean_session; uint32_t keepalive; + bool auto_reconnect; } mqtt_settings; typedef struct mqtt_event_data_t diff --git a/mqtt.c b/mqtt.c index 9523a88..094927c 100644 --- a/mqtt.c +++ b/mqtt.c @@ -19,6 +19,7 @@ static TaskHandle_t xMqttTask = NULL; static TaskHandle_t xMqttSendingTask = NULL; +static bool terminate_mqtt = false; static int resolve_dns(const char *host, struct sockaddr_in *ip) { struct hostent *he; @@ -149,28 +150,28 @@ static bool client_connect(mqtt_client *client) // including SSL objects if CNFIG_MQTT_SECURITY_ON is enabled void closeclient(mqtt_client *client) { + mqtt_info("Closing client socket"); + + if (client->socket != -1) + { + close(client->socket); + client->socket = -1; + } #if defined(CONFIG_MQTT_SECURITY_ON) - if (client->ssl != NULL) - { - SSL_shutdown(client->ssl); + if (client->ssl != NULL) + { + SSL_shutdown(client->ssl); - SSL_free(client->ssl); - client->ssl = NULL; - } -#endif - if (client->socket != -1) - { - close(client->socket); - client->socket = -1; - } + SSL_free(client->ssl); + client->ssl = NULL; + } -#if defined(CONFIG_MQTT_SECURITY_ON) - if (client->ctx != NULL) - { - SSL_CTX_free(client->ctx); - client->ctx = NULL; - } + if (client->ctx != NULL) + { + SSL_CTX_free(client->ctx); + client->ctx = NULL; + } #endif } @@ -287,7 +288,7 @@ static bool mqtt_connect(mqtt_client *client) mqtt_warn("Connection refused, server unavailable"); return false; case CONNECTION_REFUSE_BAD_USERNAME: - mqtt_warn("Connection refused, bad username"); + mqtt_warn("Connection refused, bad username or password"); return false; case CONNECTION_REFUSE_NOT_AUTHORIZED: mqtt_warn("Connection refused, not authorized"); @@ -304,9 +305,10 @@ void mqtt_sending_task(void *pvParameters) mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; int send_len; + bool connected = true; mqtt_info("mqtt_sending_task"); - while (1) { + while (connected) { if (xQueueReceive(client->xSendingQueue, &msg_len, 1000 / portTICK_RATE_MS)) { //queue available while (msg_len > 0) { @@ -318,10 +320,11 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); - if(send_len < 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 5 * 1000); + if(send_len <= 0) { mqtt_info("Write error: %d", errno); - break; // TODO is this right handling? + connected = false; + break; } //TODO: Check sending type, to callback publish message @@ -339,12 +342,19 @@ void mqtt_sending_task(void *pvParameters) client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); mqtt_info("Sending pingreq"); - client->settings->write_cb(client, + send_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); + if(send_len <= 0) { + mqtt_info("Write error: %d", errno); + connected = false; + break; + } } } } + closeclient(client); + xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -397,18 +407,18 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { + if (terminate_mqtt) break; + if (xMqttSendingTask == NULL) break; + read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); mqtt_info("Read len %d", read_len); - if (read_len == 0) - break; - if (read_len < 0) { + if (read_len <= 0) { // ECONNRESET for example mqtt_info("Read error %d", errno); break; } - msg_type = mqtt_get_type(client->mqtt_state.in_buffer); msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); @@ -477,29 +487,46 @@ void mqtt_start_receive_schedule(mqtt_client *client) break; } } - mqtt_info("network disconnected"); } void mqtt_destroy(mqtt_client *client) { + if (client == NULL) return; + + vQueueDelete(client->xSendingQueue); + free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); + free(client->send_rb.p_o); free(client); - vTaskDelete(xMqttTask); + + mqtt_info("Client destroyed"); } void mqtt_task(void *pvParameters) { + mqtt_info("Starting mqtt task"); + mqtt_client *client = (mqtt_client *)pvParameters; while (1) { + if (terminate_mqtt) break; + client->settings->connect_cb(client); mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port); if (!mqtt_connect(client)) { client->settings->disconnect_cb(client); - continue; - //return; + + if (client->settings->disconnected_cb) { + client->settings->disconnected_cb(client, NULL); + } + + if (!client->settings->auto_reconnect) { + break; + } else { + continue; + } } mqtt_info("Connected to MQTT broker, create sending thread before call connected callback"); xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask); @@ -511,17 +538,29 @@ void mqtt_task(void *pvParameters) mqtt_start_receive_schedule(client); client->settings->disconnect_cb(client); - vTaskDelete(xMqttSendingTask); + if (client->settings->disconnected_cb) { + client->settings->disconnected_cb(client, NULL); + } + + if (xMqttSendingTask != NULL) { + vTaskDelete(xMqttSendingTask); + } + if (!client->settings->auto_reconnect) { + break; + } vTaskDelay(1000 / portTICK_RATE_MS); } + mqtt_destroy(client); - - + xMqttTask = NULL; + vTaskDelete(NULL); } mqtt_client *mqtt_start(mqtt_settings *settings) { + terminate_mqtt = false; + int stackSize = 2048; uint8_t *rb_buf; @@ -623,7 +662,7 @@ void mqtt_publish(mqtt_client* client, const char *topic, const char *data, int qos, retain, &client->mqtt_state.pending_msg_id); mqtt_queue(client); - mqtt_info("Queuing publish, length: %d, queue size(%d/%d)\r\n", + mqtt_info("Queuing publish, length: %d, queue size(%d/%d)", client->mqtt_state.outbound_message->length, client->send_rb.fill_cnt, client->send_rb.size); @@ -631,6 +670,6 @@ void mqtt_publish(mqtt_client* client, const char *topic, const char *data, int void mqtt_stop() { - + terminate_mqtt = true; }