From d22b7d7ada85b25b5a41deb183fa447c74f177c9 Mon Sep 17 00:00:00 2001 From: Tuan PM Date: Mon, 12 Sep 2016 17:05:28 +0700 Subject: [PATCH] -add queue - add ping timer - add subscribe function --- include/mqtt.h | 7 +- include/ringbuf.h | 23 ++++++ mqtt.c | 195 ++++++++++++++++++++++++++++++++++++++++++---- ringbuf.c | 106 +++++++++++++++++++++++++ 4 files changed, 314 insertions(+), 17 deletions(-) create mode 100644 include/ringbuf.h create mode 100644 ringbuf.c diff --git a/include/mqtt.h b/include/mqtt.h index a9d8d27..ec4a4d0 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -4,7 +4,7 @@ #include #include "mqtt_config.h" #include "mqtt_msg.h" - +#include "ringbuf.h" typedef void (* mqtt_callback)(void *); typedef struct { @@ -52,11 +52,14 @@ typedef struct { mqtt_settings *settings; mqtt_state_t mqtt_state; mqtt_connect_info_t connect_info; + QueueHandle_t xSendingQueue; + RINGBUF send_rb; + uint32_t keepalive_tick; } mqtt_client; void mqtt_start(mqtt_settings *mqtt_info); void mqtt_task(void *pvParameters); +void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos); void mqtt_publish(); -void mqtt_subscribe(); void mqtt_detroy(); #endif diff --git a/include/ringbuf.h b/include/ringbuf.h new file mode 100644 index 0000000..6444b23 --- /dev/null +++ b/include/ringbuf.h @@ -0,0 +1,23 @@ +#ifndef _RING_BUF_H_ +#define _RING_BUF_H_ + +#include + + +typedef struct{ + uint8_t* p_o; /**< Original pointer */ + uint8_t* volatile p_r; /**< Read pointer */ + uint8_t* volatile p_w; /**< Write pointer */ + volatile int32_t fill_cnt; /**< Number of filled slots */ + int32_t size; /**< Buffer size */ + int32_t block_size; +}RINGBUF; + +int32_t rb_init(RINGBUF *r, uint8_t* buf, int32_t size, int32_t block_size); +int32_t rb_put(RINGBUF *r, uint8_t* c); +int32_t rb_get(RINGBUF *r, uint8_t* c); +int32_t rb_available(RINGBUF *r); +uint32_t rb_read(RINGBUF *r, uint8_t *buf, int len); +uint32_t rb_write(RINGBUF *r, uint8_t *buf, int len); + +#endif diff --git a/mqtt.c b/mqtt.c index 7983ed6..7db39d9 100644 --- a/mqtt.c +++ b/mqtt.c @@ -2,9 +2,9 @@ * @Author: Tuan PM * @Date: 2016-09-10 09:33:06 * @Last Modified by: Tuan PM -* @Last Modified time: 2016-09-12 12:35:23 +* @Last Modified time: 2016-09-12 17:03:56 */ -#include "mqtt.h" +#include #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/semphr.h" @@ -13,9 +13,11 @@ #include "lwip/sockets.h" #include "lwip/dns.h" #include "lwip/netdb.h" -#include +#include "ringbuf.h" +#include "mqtt.h" static TaskHandle_t xMqttTask = NULL; +static TaskHandle_t xMqttSendingTask = NULL; static int resolev_dns(const char *host, struct sockaddr_in *ip) { @@ -95,6 +97,10 @@ static bool mqtt_connect(mqtt_client *client) client->mqtt_state.outbound_message->length); mqtt_info("Reading MQTT CONNECT response message"); read_len = read(client->socket, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE); + + tv.tv_sec = 0; /* No timeout */ + setsockopt(client->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); + if (read_len < 0) { mqtt_error("Error network response"); return false; @@ -107,9 +113,6 @@ static bool mqtt_connect(mqtt_client *client) switch (connect_rsp_code) { case CONNECTION_ACCEPTED: mqtt_info("Connected"); - if (client->settings->connected_cb) { - client->settings->connected_cb(client); - } return true; case CONNECTION_REFUSE_PROTOCOL: case CONNECTION_REFUSE_SERVER_UNAVAILABLE: @@ -124,41 +127,176 @@ static bool mqtt_connect(mqtt_client *client) return false; } +void mqtt_sending_task(void *pvParameters) +{ + mqtt_client *client = (mqtt_client *)pvParameters; + uint32_t msg_len, send_len; + mqtt_info("mqtt_sending_task"); + + while (1) { + if (xQueueReceive(client->xSendingQueue, &msg_len, 1000 / portTICK_RATE_MS)) { + //queue available + while (msg_len > 0) { + send_len = msg_len; + if (send_len > CONFIG_MQTT_BUFFER_SIZE_BYTE) + send_len = CONFIG_MQTT_BUFFER_SIZE_BYTE; + mqtt_info("Sending...%d bytes", send_len); + + rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); + write(client->socket, client->mqtt_state.out_buffer, send_len); + msg_len -= send_len; + } + //invalidate keepalive timer + client->keepalive_tick = client->settings->keepalive / 2; + } + else { + if (client->keepalive_tick > 0) client->keepalive_tick --; + else { + client->keepalive_tick = client->settings->keepalive / 2; + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + mqtt_info("Sending pingreq"); + write(client->socket, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + } + } + } + vTaskDelete(NULL); +} + +void mqtt_start_receive_schedule(mqtt_client *client) +{ + int read_len; + uint8_t msg_type; + uint8_t msg_qos; + uint16_t msg_id; + + while (1) { + read_len = read(client->socket, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE); + mqtt_info("Read len %d", read_len); + if (read_len == 0) + break; + + msg_type = mqtt_get_type(client->mqtt_state.in_buffer); + msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); + msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + // mqtt_info("msg_type %d, msg_id: %d, pending_id: %d", msg_type, msg_id, client->mqtt_state.pending_msg_type); + switch (msg_type) + { + case MQTT_MSG_TYPE_SUBACK: + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) + mqtt_info("Subscribe successful"); + break; + case MQTT_MSG_TYPE_UNSUBACK: + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) + mqtt_info("UnSubscribe successful"); + break; + case MQTT_MSG_TYPE_PUBLISH: + if (msg_qos == 1) + client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + else if (msg_qos == 2) + client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + if (msg_qos == 1 || msg_qos == 2) { + mqtt_info("Queue response QoS: %d", msg_qos); + // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + // mqtt_info("MQTT: Queue full"); + // } + } + // deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + break; + case MQTT_MSG_TYPE_PUBACK: + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { + mqtt_info("received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); + } + + break; + case MQTT_MSG_TYPE_PUBREC: + client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + // mqtt_info("MQTT: Queue full"); + // } + break; + case MQTT_MSG_TYPE_PUBREL: + client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + // mqtt_info("MQTT: Queue full"); + // } + break; + case MQTT_MSG_TYPE_PUBCOMP: + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { + mqtt_info("eceive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); + } + break; + case MQTT_MSG_TYPE_PINGREQ: + client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection); + // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + // mqtt_info("MQTT: Queue full"); + // } + break; + case MQTT_MSG_TYPE_PINGRESP: + // Ignore + break; + } + } + mqtt_info("network disconnected"); +} void mqtt_destroy(mqtt_client *client) { free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); free(client); + vTaskDelete(xMqttTask); } void mqtt_task(void *pvParameters) { mqtt_client *client = (mqtt_client *)pvParameters; - client->socket = client_connect(client->settings->host, client->settings->port); - mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port); - if (!mqtt_connect(client)) { + while (1) { + client->socket = client_connect(client->settings->host, client->settings->port); + mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port); + if (!mqtt_connect(client)) { + continue; + //return; + } + mqtt_info("Connected to MQTT broker, create sending thread before call connected callback"); + xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask); + if (client->settings->connected_cb) { + client->settings->connected_cb(client); + } + + mqtt_info("mqtt_start_receive_schedule"); + mqtt_start_receive_schedule(client); + close(client->socket); - //return; + vTaskDelete(xMqttSendingTask); + vTaskDelay(1000 / portTICK_RATE_MS); + } - mqtt_info("wait"); - while(1); mqtt_destroy(client); - vTaskDelete(NULL); + } void mqtt_start(mqtt_settings *settings) { + uint8_t *rb_buf; if (xMqttTask != NULL) return; mqtt_client *client = malloc(sizeof(mqtt_client)); - memset(client, 0, sizeof(mqtt_client)); + if (client == NULL) { - mqtt_error("Memory not enought"); + mqtt_error("Memory not enough"); return; } + memset(client, 0, sizeof(mqtt_client)); + client->settings = settings; client->connect_info.client_id = settings->client_id; client->connect_info.username = settings->username; @@ -167,6 +305,7 @@ void mqtt_start(mqtt_settings *settings) client->connect_info.will_message = settings->lwt_msg; client->connect_info.will_qos = settings->lwt_qos; client->connect_info.will_retain = settings->lwt_retain; + client->keepalive_tick = settings->keepalive / 2; client->connect_info.keepalive = settings->keepalive; client->connect_info.clean_session = settings->clean_session; @@ -177,6 +316,19 @@ void mqtt_start(mqtt_settings *settings) client->mqtt_state.out_buffer_length = CONFIG_MQTT_BUFFER_SIZE_BYTE; client->mqtt_state.connect_info = &client->connect_info; + + + /* Create a queue capable of containing 64 unsigned long values. */ + client->xSendingQueue = xQueueCreate(64, sizeof( uint32_t )); + rb_buf = (uint8_t*) malloc(CONFIG_MQTT_QUEUE_BUFFER_SIZE_WORD * 4); + + if (rb_buf == NULL) { + mqtt_error("Memory not enough"); + return; + } + + rb_init(&client->send_rb, rb_buf, CONFIG_MQTT_QUEUE_BUFFER_SIZE_WORD * 4, 1); + mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length); @@ -184,6 +336,19 @@ void mqtt_start(mqtt_settings *settings) xTaskCreate(&mqtt_task, "mqtt_task", 2048, client, CONFIG_MQTT_PRIORITY, &xMqttTask); } +void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos) +{ + + client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, + topic, qos, + &client->mqtt_state.pending_msg_id); + mqtt_info("MQTT: queue subscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); + rb_write(&client->send_rb, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + xQueueSend(client->xSendingQueue, &client->mqtt_state.outbound_message->length, 0); + +} void mqtt_stop() { diff --git a/ringbuf.c b/ringbuf.c new file mode 100644 index 0000000..5bb4adb --- /dev/null +++ b/ringbuf.c @@ -0,0 +1,106 @@ +/** +* \file +* Ring Buffer library +*/ +#include +#include +#include "ringbuf.h" + +/** +* \brief init a RINGBUF object +* \param r pointer to a RINGBUF object +* \param buf pointer to a byte array +* \param size size of buf +* \param block_size is size of data as block +* \return 0 if successfull, otherwise failed +*/ +int32_t rb_init(RINGBUF *r, uint8_t* buf, int32_t size, int32_t block_size) +{ + if (r == 0 || buf == 0 || size < 2) return -1; + + if (size % block_size != 0) return -1; + + r->p_o = r->p_r = r->p_w = buf; + r->fill_cnt = 0; + r->size = size; + r->block_size = block_size; + return 0; +} +/** +* \brief put a character into ring buffer +* \param r pointer to a ringbuf object +* \param c character to be put +* \return 0 if successfull, otherwise failed +*/ +int32_t rb_put(RINGBUF *r, uint8_t *c) +{ + int32_t i; + uint8_t *data = c; + if (r->fill_cnt >= r->size) + return -1; // ring buffer is full, this should be atomic operation + + + r->fill_cnt += r->block_size; // increase filled slots count, this should be atomic operation + + for (i = 0; i < r->block_size; i++) { + *r->p_w = *data; // put character into buffer + + r->p_w ++; + data ++; + } + + if (r->p_w >= r->p_o + r->size) // rollback if write pointer go pass + r->p_w = r->p_o; // the physical boundary + + return 0; +} +/** +* \brief get a character from ring buffer +* \param r pointer to a ringbuf object +* \param c read character +* \return 0 if successfull, otherwise failed +*/ +int32_t rb_get(RINGBUF *r, uint8_t *c) +{ + int32_t i; + uint8_t *data = c; + if (r->fill_cnt <= 0)return -1; // ring buffer is empty, this should be atomic operation + + r->fill_cnt -= r->block_size; // decrease filled slots count + + for (i = 0; i < r->block_size; i++) + *data++ = *r->p_r++; // get the character out + + if (r->p_r >= r->p_o + r->size) // rollback if write pointer go pass + r->p_r = r->p_o; // the physical boundary + + return 0; +} + +int32_t rb_available(RINGBUF *r) +{ + return (r->size - r->fill_cnt); +} + +uint32_t rb_read(RINGBUF *r, uint8_t *buf, int len) +{ + int n = 0; + uint8_t data; + while (len > 0) { + while (rb_get(r, &data) != 0); + *buf++ = data; + n ++; + len --; + } + + return n; +} + +uint32_t rb_write(RINGBUF *r, uint8_t *buf, int len) +{ + uint32_t wi; + for (wi = 0; wi < len; wi++) { + while (rb_put(r, &buf[wi]) != 0); + } + return 0; +}