From e0617af88318f3526f6d34b261b81d8abf6731f6 Mon Sep 17 00:00:00 2001 From: Tijn Kooijmans Date: Wed, 26 Jul 2017 14:47:01 +0200 Subject: [PATCH] Revert "Revert "utilizing pingreq to check if server is still reachable"" This reverts commit 85e2ed9eee9edce3103c757e8bbb1a37d70d1cec. --- mqtt.c | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/mqtt.c b/mqtt.c index 74dbe86..85bb316 100644 --- a/mqtt.c +++ b/mqtt.c @@ -259,7 +259,7 @@ static bool mqtt_connect(mqtt_client *client) write_len = client->settings->write_cb(client, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length, 0); - if(write_len < 0) { + if (write_len <= 0) { mqtt_error("Writing failed: %d", errno); return false; } @@ -304,7 +304,7 @@ void mqtt_sending_task(void *pvParameters) { mqtt_client *client = (mqtt_client *)pvParameters; uint32_t msg_len; - int send_len; + int send_len, read_len; mqtt_info("mqtt_sending_task"); while (1) { @@ -319,8 +319,8 @@ void mqtt_sending_task(void *pvParameters) rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); - send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0); - if(send_len < 0) { + send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 10 * 1000); + if (send_len <= 0) { mqtt_info("Write error: %d", errno); break; // TODO is this right handling? } @@ -328,24 +328,27 @@ void mqtt_sending_task(void *pvParameters) //TODO: Check sending type, to callback publish message msg_len -= send_len; } - //invalidate keepalive timer - client->keepalive_tick = client->settings->keepalive / 2; - } - else { - if (client->keepalive_tick > 0) client->keepalive_tick --; - else { - client->keepalive_tick = client->settings->keepalive / 2; - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); - mqtt_info("Sending pingreq"); - client->settings->write_cb(client, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, 0); - } } + if (client->keepalive_tick > 0) client->keepalive_tick --; + else { + client->keepalive_tick = client->settings->keepalive / 2; + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + mqtt_info("Sending pingreq"); + client->settings->write_cb(client, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length, 0); + + read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 10 * 1000); + if (read_len < 0) { + mqtt_error("Error network response"); + break; + } + } } + xMqttSendingTask = NULL; vTaskDelete(NULL); } @@ -399,6 +402,7 @@ void mqtt_start_receive_schedule(mqtt_client *client) while (1) { if (terminate_mqtt) break; + if (xMqttSendingTask == NULL) break; read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0);