From 65d91f3908a826c6d8832c88ba7ef739ed1adc67 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Sun, 23 Jul 2017 21:28:23 +0200 Subject: [PATCH 01/12] implemented disconnected callback --- include/mqtt.h | 3 +-- mqtt.c | 11 +++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/include/mqtt.h b/include/mqtt.h index d4bc2da..0c88cde 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -44,8 +44,7 @@ typedef struct mqtt_settings { mqtt_write_callback write_cb; mqtt_event_callback connected_cb; - mqtt_event_callback disconnected_cb; // unused - mqtt_event_callback reconnect_cb; // unused + mqtt_event_callback disconnected_cb; mqtt_event_callback subscribe_cb; mqtt_event_callback publish_cb; diff --git a/mqtt.c b/mqtt.c index 9523a88..f87d64f 100644 --- a/mqtt.c +++ b/mqtt.c @@ -498,8 +498,12 @@ void mqtt_task(void *pvParameters) mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port); if (!mqtt_connect(client)) { client->settings->disconnect_cb(client); + + if (client->settings->disconnected_cb) { + client->settings->disconnected_cb(client, NULL); + } + continue; - //return; } mqtt_info("Connected to MQTT broker, create sending thread before call connected callback"); xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask); @@ -511,13 +515,16 @@ void mqtt_task(void *pvParameters) mqtt_start_receive_schedule(client); client->settings->disconnect_cb(client); + if (client->settings->disconnected_cb) { + client->settings->disconnected_cb(client, NULL); + } + vTaskDelete(xMqttSendingTask); vTaskDelay(1000 / portTICK_RATE_MS); } mqtt_destroy(client); - } mqtt_client *mqtt_start(mqtt_settings *settings) From 0d888bb101fcce4d80da3bb6004851d9ed786b92 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Sun, 23 Jul 2017 21:31:33 +0200 Subject: [PATCH 02/12] added auto-reconnect option to settings --- include/mqtt.h | 1 + mqtt.c | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/include/mqtt.h b/include/mqtt.h index 0c88cde..3ff5ccf 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -62,6 +62,7 @@ typedef struct mqtt_settings { uint32_t lwt_retain; uint32_t clean_session; uint32_t keepalive; + bool auto_reconnect; } mqtt_settings; typedef struct mqtt_event_data_t diff --git a/mqtt.c b/mqtt.c index f87d64f..e5567b8 100644 --- a/mqtt.c +++ b/mqtt.c @@ -503,7 +503,11 @@ void mqtt_task(void *pvParameters) client->settings->disconnected_cb(client, NULL); } - continue; + if (!client->settings->auto_reconnect) { + break; + } else { + continue; + } } mqtt_info("Connected to MQTT broker, create sending thread before call connected callback"); xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask); @@ -520,6 +524,9 @@ void mqtt_task(void *pvParameters) } vTaskDelete(xMqttSendingTask); + if (!client->settings->auto_reconnect) { + break; + } vTaskDelay(1000 / portTICK_RATE_MS); } From 9f907bbb306b2a9174d20277da24cff38fed7708 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Sun, 23 Jul 2017 21:54:40 +0200 Subject: [PATCH 03/12] added termination flag to stop mqtt client --- mqtt.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mqtt.c b/mqtt.c index e5567b8..0612009 100644 --- a/mqtt.c +++ b/mqtt.c @@ -19,6 +19,7 @@ static TaskHandle_t xMqttTask = NULL; static TaskHandle_t xMqttSendingTask = NULL; +static bool terminate_mqtt = false; static int resolve_dns(const char *host, struct sockaddr_in *ip) { struct hostent *he; @@ -397,6 +398,8 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { + if (terminate_mqtt) break; + read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); mqtt_info("Read len %d", read_len); @@ -408,7 +411,6 @@ void mqtt_start_receive_schedule(mqtt_client *client) break; } - msg_type = mqtt_get_type(client->mqtt_state.in_buffer); msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); @@ -493,6 +495,8 @@ void mqtt_task(void *pvParameters) mqtt_client *client = (mqtt_client *)pvParameters; while (1) { + if (terminate_mqtt) break; + client->settings->connect_cb(client); mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port); @@ -536,6 +540,8 @@ void mqtt_task(void *pvParameters) mqtt_client *mqtt_start(mqtt_settings *settings) { + terminate_mqtt = false; + int stackSize = 2048; uint8_t *rb_buf; @@ -645,6 +651,6 @@ void mqtt_publish(mqtt_client* client, const char *topic, const char *data, int void mqtt_stop() { - + terminate_mqtt = true; } From 30f20e19bbcd64e7a8de40a3b7bcc678e41b1a9f Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Mon, 24 Jul 2017 15:19:44 +0200 Subject: [PATCH 04/12] fixed stopping of mqtt task --- mqtt.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/mqtt.c b/mqtt.c index 0612009..fb024a5 100644 --- a/mqtt.c +++ b/mqtt.c @@ -288,7 +288,7 @@ static bool mqtt_connect(mqtt_client *client) mqtt_warn("Connection refused, server unavailable"); return false; case CONNECTION_REFUSE_BAD_USERNAME: - mqtt_warn("Connection refused, bad username"); + mqtt_warn("Connection refused, bad username or password"); return false; case CONNECTION_REFUSE_NOT_AUTHORIZED: mqtt_warn("Connection refused, not authorized"); @@ -479,19 +479,23 @@ void mqtt_start_receive_schedule(mqtt_client *client) break; } } - mqtt_info("network disconnected"); } void mqtt_destroy(mqtt_client *client) { + if (client == NULL) return; + free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); free(client); - vTaskDelete(xMqttTask); + + mqtt_info("Client destroyed"); } void mqtt_task(void *pvParameters) { + mqtt_info("Starting mqtt task"); + mqtt_client *client = (mqtt_client *)pvParameters; while (1) { @@ -534,8 +538,10 @@ void mqtt_task(void *pvParameters) vTaskDelay(1000 / portTICK_RATE_MS); } - mqtt_destroy(client); + mqtt_destroy(client); + xMqttTask = NULL; + vTaskDelete(NULL); } mqtt_client *mqtt_start(mqtt_settings *settings) From b5761d531265c53c9d4b78c402957f57774e9c46 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Mon, 24 Jul 2017 15:34:00 +0200 Subject: [PATCH 05/12] fixed memory leak in ringbuffer after destroying client --- mqtt.c | 1 + 1 file changed, 1 insertion(+) diff --git a/mqtt.c b/mqtt.c index fb024a5..05307cb 100644 --- a/mqtt.c +++ b/mqtt.c @@ -487,6 +487,7 @@ void mqtt_destroy(mqtt_client *client) free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); + free(client->send_rb.p_o); free(client); mqtt_info("Client destroyed"); From f49ecd1b924b5636cd50bf49b89320d8b46636d0 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Wed, 26 Jul 2017 13:46:34 +0200 Subject: [PATCH 06/12] utilizing pingreq to check if server is still reachable --- mqtt.c | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/mqtt.c b/mqtt.c index 05307cb..d380dd7 100644 --- a/mqtt.c +++ b/mqtt.c @@ -259,7 +259,7 @@ static bool mqtt_connect(mqtt_client *client) write_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); - if(write_len < 0) { + if (write_len <= 0) { mqtt_error("Writing failed: %d", errno); return false; } @@ -304,7 +304,7 @@ void mqtt_sending_task(void *pvParameters) { mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; - int send_len; + int send_len, read_len; mqtt_info("mqtt_sending_task"); while (1) { @@ -319,8 +319,8 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); - if(send_len < 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 10 * 1000); + if (send_len <= 0) { mqtt_info("Write error: %d", errno); break; // TODO is this right handling? } @@ -328,24 +328,27 @@ void mqtt_sending_task(void *pvParameters) //TODO: Check sending type, to callback publish message msg_len -= send_len; } - //invalidate keepalive timer - client->keepalive_tick = client->settings->keepalive / 2; - } - else { - if (client->keepalive_tick > 0) client->keepalive_tick --; - else { - client->keepalive_tick = client->settings->keepalive / 2; - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); - mqtt_info("Sending pingreq"); - client->settings->write_cb(client, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, 0); - } } + if (client->keepalive_tick > 0) client->keepalive_tick --; + else { + client->keepalive_tick = client->settings->keepalive / 2; + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + mqtt_info("Sending pingreq"); + client->settings->write_cb(client, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length, 0); + + read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 10 * 1000); + if (read_len < 0) { + mqtt_error("Error network response"); + break; + } + } } + xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -399,6 +402,7 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { if (terminate_mqtt) break; + if (xMqttSendingTask == NULL) break; read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); From 85e2ed9eee9edce3103c757e8bbb1a37d70d1cec Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Wed, 26 Jul 2017 14:22:42 +0200 Subject: [PATCH 07/12] Revert "utilizing pingreq to check if server is still reachable" This reverts commit f49ecd1b924b5636cd50bf49b89320d8b46636d0. --- mqtt.c | 44 ++++++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/mqtt.c b/mqtt.c index d380dd7..05307cb 100644 --- a/mqtt.c +++ b/mqtt.c @@ -259,7 +259,7 @@ static bool mqtt_connect(mqtt_client *client) write_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); - if (write_len <= 0) { + if(write_len < 0) { mqtt_error("Writing failed: %d", errno); return false; } @@ -304,7 +304,7 @@ void mqtt_sending_task(void *pvParameters) { mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; - int send_len, read_len; + int send_len; mqtt_info("mqtt_sending_task"); while (1) { @@ -319,8 +319,8 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 10 * 1000); - if (send_len <= 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); + if(send_len < 0) { mqtt_info("Write error: %d", errno); break; // TODO is this right handling? } @@ -328,27 +328,24 @@ void mqtt_sending_task(void *pvParameters) //TODO: Check sending type, to callback publish message msg_len -= send_len; } + //invalidate keepalive timer + client->keepalive_tick = client->settings->keepalive / 2; + } + else { + if (client->keepalive_tick > 0) client->keepalive_tick --; + else { + client->keepalive_tick = client->settings->keepalive / 2; + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + mqtt_info("Sending pingreq"); + client->settings->write_cb(client, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length, 0); + } } - if (client->keepalive_tick > 0) client->keepalive_tick --; - else { - client->keepalive_tick = client->settings->keepalive / 2; - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); - mqtt_info("Sending pingreq"); - client->settings->write_cb(client, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, 0); - - read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 10 * 1000); - if (read_len < 0) { - mqtt_error("Error network response"); - break; - } - } } - xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -402,7 +399,6 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { if (terminate_mqtt) break; - if (xMqttSendingTask == NULL) break; read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); From ba68cf7ed8f890e5b455f1e6d998f5aaf87fd719 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Wed, 26 Jul 2017 14:24:52 +0200 Subject: [PATCH 08/12] fixed memory leak in sending queue --- mqtt.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mqtt.c b/mqtt.c index 05307cb..74dbe86 100644 --- a/mqtt.c +++ b/mqtt.c @@ -485,6 +485,8 @@ void mqtt_destroy(mqtt_client *client) { if (client == NULL) return; + vQueueDelete(client->xSendingQueue); + free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); free(client->send_rb.p_o); From e0617af88318f3526f6d34b261b81d8abf6731f6 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Wed, 26 Jul 2017 14:47:01 +0200 Subject: [PATCH 09/12] Revert "Revert "utilizing pingreq to check if server is still reachable"" This reverts commit 85e2ed9eee9edce3103c757e8bbb1a37d70d1cec. --- mqtt.c | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/mqtt.c b/mqtt.c index 74dbe86..85bb316 100644 --- a/mqtt.c +++ b/mqtt.c @@ -259,7 +259,7 @@ static bool mqtt_connect(mqtt_client *client) write_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); - if(write_len < 0) { + if (write_len <= 0) { mqtt_error("Writing failed: %d", errno); return false; } @@ -304,7 +304,7 @@ void mqtt_sending_task(void *pvParameters) { mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; - int send_len; + int send_len, read_len; mqtt_info("mqtt_sending_task"); while (1) { @@ -319,8 +319,8 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); - if(send_len < 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 10 * 1000); + if (send_len <= 0) { mqtt_info("Write error: %d", errno); break; // TODO is this right handling? } @@ -328,24 +328,27 @@ void mqtt_sending_task(void *pvParameters) //TODO: Check sending type, to callback publish message msg_len -= send_len; } - //invalidate keepalive timer - client->keepalive_tick = client->settings->keepalive / 2; - } - else { - if (client->keepalive_tick > 0) client->keepalive_tick --; - else { - client->keepalive_tick = client->settings->keepalive / 2; - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); - mqtt_info("Sending pingreq"); - client->settings->write_cb(client, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, 0); - } } + if (client->keepalive_tick > 0) client->keepalive_tick --; + else { + client->keepalive_tick = client->settings->keepalive / 2; + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + mqtt_info("Sending pingreq"); + client->settings->write_cb(client, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length, 0); + + read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 10 * 1000); + if (read_len < 0) { + mqtt_error("Error network response"); + break; + } + } } + xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -399,6 +402,7 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { if (terminate_mqtt) break; + if (xMqttSendingTask == NULL) break; read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); From 6104cb93655aab7d10518a6f62c8413e78dc2aaf Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Wed, 26 Jul 2017 14:55:26 +0200 Subject: [PATCH 10/12] Revert "Revert "Revert "utilizing pingreq to check if server is still reachable""" This reverts commit e0617af88318f3526f6d34b261b81d8abf6731f6. --- mqtt.c | 44 ++++++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/mqtt.c b/mqtt.c index 85bb316..74dbe86 100644 --- a/mqtt.c +++ b/mqtt.c @@ -259,7 +259,7 @@ static bool mqtt_connect(mqtt_client *client) write_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); - if (write_len <= 0) { + if(write_len < 0) { mqtt_error("Writing failed: %d", errno); return false; } @@ -304,7 +304,7 @@ void mqtt_sending_task(void *pvParameters) { mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; - int send_len, read_len; + int send_len; mqtt_info("mqtt_sending_task"); while (1) { @@ -319,8 +319,8 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 10 * 1000); - if (send_len <= 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); + if(send_len < 0) { mqtt_info("Write error: %d", errno); break; // TODO is this right handling? } @@ -328,27 +328,24 @@ void mqtt_sending_task(void *pvParameters) //TODO: Check sending type, to callback publish message msg_len -= send_len; } + //invalidate keepalive timer + client->keepalive_tick = client->settings->keepalive / 2; + } + else { + if (client->keepalive_tick > 0) client->keepalive_tick --; + else { + client->keepalive_tick = client->settings->keepalive / 2; + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + mqtt_info("Sending pingreq"); + client->settings->write_cb(client, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length, 0); + } } - if (client->keepalive_tick > 0) client->keepalive_tick --; - else { - client->keepalive_tick = client->settings->keepalive / 2; - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); - mqtt_info("Sending pingreq"); - client->settings->write_cb(client, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, 0); - - read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 10 * 1000); - if (read_len < 0) { - mqtt_error("Error network response"); - break; - } - } } - xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -402,7 +399,6 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { if (terminate_mqtt) break; - if (xMqttSendingTask == NULL) break; read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); From e2231aa02024362f8847e48333c7bdc42a90cac4 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Fri, 28 Jul 2017 09:36:38 +0200 Subject: [PATCH 11/12] fixed dis/re-connect when internet connection fails --- include/mqtt.h | 1 + mqtt.c | 64 +++++++++++++++++++++++++++++--------------------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/include/mqtt.h b/include/mqtt.h index 3ff5ccf..8f8fb52 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -1,6 +1,7 @@ #ifndef _MQTT_H_ #define _MQTT_H_ #include +#include #include #include "mqtt_config.h" #include "mqtt_msg.h" diff --git a/mqtt.c b/mqtt.c index 74dbe86..2287903 100644 --- a/mqtt.c +++ b/mqtt.c @@ -150,28 +150,28 @@ static bool client_connect(mqtt_client *client) // including SSL objects if CNFIG_MQTT_SECURITY_ON is enabled void closeclient(mqtt_client *client) { + mqtt_info("Closing client socket"); + + if (client->socket != -1) + { + close(client->socket); + client->socket = -1; + } #if defined(CONFIG_MQTT_SECURITY_ON) - if (client->ssl != NULL) - { - SSL_shutdown(client->ssl); + if (client->ssl != NULL) + { + SSL_shutdown(client->ssl); - SSL_free(client->ssl); - client->ssl = NULL; - } -#endif - if (client->socket != -1) - { - close(client->socket); - client->socket = -1; - } + SSL_free(client->ssl); + client->ssl = NULL; + } -#if defined(CONFIG_MQTT_SECURITY_ON) - if (client->ctx != NULL) - { - SSL_CTX_free(client->ctx); - client->ctx = NULL; - } + if (client->ctx != NULL) + { + SSL_CTX_free(client->ctx); + client->ctx = NULL; + } #endif } @@ -305,9 +305,10 @@ void mqtt_sending_task(void *pvParameters) mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; int send_len; + bool connected = true; mqtt_info("mqtt_sending_task"); - while (1) { + while (connected) { if (xQueueReceive(client->xSendingQueue, &msg_len, 1000 / portTICK_RATE_MS)) { //queue available while (msg_len > 0) { @@ -319,10 +320,11 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); - if(send_len < 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 5 * 1000); + if(send_len <= 0) { mqtt_info("Write error: %d", errno); - break; // TODO is this right handling? + connected = false; + break; } //TODO: Check sending type, to callback publish message @@ -340,12 +342,19 @@ void mqtt_sending_task(void *pvParameters) client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); mqtt_info("Sending pingreq"); - client->settings->write_cb(client, + send_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); + if(send_len <= 0) { + mqtt_info("Write error: %d", errno); + connected = false; + break; + } } } } + closeclient(client); + xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -399,13 +408,12 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { if (terminate_mqtt) break; + if (xMqttSendingTask == NULL) break; read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0); mqtt_info("Read len %d", read_len); - if (read_len == 0) - break; - if (read_len < 0) { + if (read_len <= 0) { // ECONNRESET for example mqtt_info("Read error %d", errno); break; @@ -534,7 +542,9 @@ void mqtt_task(void *pvParameters) client->settings->disconnected_cb(client, NULL); } - vTaskDelete(xMqttSendingTask); + if (xMqttSendingTask != NULL) { + vTaskDelete(xMqttSendingTask); + } if (!client->settings->auto_reconnect) { break; } From 559c977ddaf15f8f47998670717e797ba7f7e34a Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Fri, 28 Jul 2017 10:03:21 +0200 Subject: [PATCH 12/12] removed newline in publish info log --- mqtt.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mqtt.c b/mqtt.c index 2287903..094927c 100644 --- a/mqtt.c +++ b/mqtt.c @@ -662,7 +662,7 @@ void mqtt_publish(mqtt_client* client, const char *topic, const char *data, int qos, retain, &client->mqtt_state.pending_msg_id); mqtt_queue(client); - mqtt_info("Queuing publish, length: %d, queue size(%d/%d)\r\n", + mqtt_info("Queuing publish, length: %d, queue size(%d/%d)", client->mqtt_state.outbound_message->length, client->send_rb.fill_cnt, client->send_rb.size);