-add queue

- add ping timer
- add subscribe function
This commit is contained in:
Tuan PM
2016-09-12 17:05:28 +07:00
parent 5474489515
commit d22b7d7ada
4 changed files with 314 additions and 17 deletions

View File

@@ -4,7 +4,7 @@
#include <string.h> #include <string.h>
#include "mqtt_config.h" #include "mqtt_config.h"
#include "mqtt_msg.h" #include "mqtt_msg.h"
#include "ringbuf.h"
typedef void (* mqtt_callback)(void *); typedef void (* mqtt_callback)(void *);
typedef struct { typedef struct {
@@ -52,11 +52,14 @@ typedef struct {
mqtt_settings *settings; mqtt_settings *settings;
mqtt_state_t mqtt_state; mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info; mqtt_connect_info_t connect_info;
QueueHandle_t xSendingQueue;
RINGBUF send_rb;
uint32_t keepalive_tick;
} mqtt_client; } mqtt_client;
void mqtt_start(mqtt_settings *mqtt_info); void mqtt_start(mqtt_settings *mqtt_info);
void mqtt_task(void *pvParameters); void mqtt_task(void *pvParameters);
void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos);
void mqtt_publish(); void mqtt_publish();
void mqtt_subscribe();
void mqtt_detroy(); void mqtt_detroy();
#endif #endif

23
include/ringbuf.h Normal file
View File

@@ -0,0 +1,23 @@
#ifndef _RING_BUF_H_
#define _RING_BUF_H_
#include <stdint.h>
typedef struct{
uint8_t* p_o; /**< Original pointer */
uint8_t* volatile p_r; /**< Read pointer */
uint8_t* volatile p_w; /**< Write pointer */
volatile int32_t fill_cnt; /**< Number of filled slots */
int32_t size; /**< Buffer size */
int32_t block_size;
}RINGBUF;
int32_t rb_init(RINGBUF *r, uint8_t* buf, int32_t size, int32_t block_size);
int32_t rb_put(RINGBUF *r, uint8_t* c);
int32_t rb_get(RINGBUF *r, uint8_t* c);
int32_t rb_available(RINGBUF *r);
uint32_t rb_read(RINGBUF *r, uint8_t *buf, int len);
uint32_t rb_write(RINGBUF *r, uint8_t *buf, int len);
#endif

195
mqtt.c
View File

@@ -2,9 +2,9 @@
* @Author: Tuan PM * @Author: Tuan PM
* @Date: 2016-09-10 09:33:06 * @Date: 2016-09-10 09:33:06
* @Last Modified by: Tuan PM * @Last Modified by: Tuan PM
* @Last Modified time: 2016-09-12 12:35:23 * @Last Modified time: 2016-09-12 17:03:56
*/ */
#include "mqtt.h" #include <stdio.h>
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
#include "freertos/task.h" #include "freertos/task.h"
#include "freertos/semphr.h" #include "freertos/semphr.h"
@@ -13,9 +13,11 @@
#include "lwip/sockets.h" #include "lwip/sockets.h"
#include "lwip/dns.h" #include "lwip/dns.h"
#include "lwip/netdb.h" #include "lwip/netdb.h"
#include <stdio.h> #include "ringbuf.h"
#include "mqtt.h"
static TaskHandle_t xMqttTask = NULL; static TaskHandle_t xMqttTask = NULL;
static TaskHandle_t xMqttSendingTask = NULL;
static int resolev_dns(const char *host, struct sockaddr_in *ip) { static int resolev_dns(const char *host, struct sockaddr_in *ip) {
@@ -95,6 +97,10 @@ static bool mqtt_connect(mqtt_client *client)
client->mqtt_state.outbound_message->length); client->mqtt_state.outbound_message->length);
mqtt_info("Reading MQTT CONNECT response message"); mqtt_info("Reading MQTT CONNECT response message");
read_len = read(client->socket, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE); read_len = read(client->socket, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE);
tv.tv_sec = 0; /* No timeout */
setsockopt(client->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
if (read_len < 0) { if (read_len < 0) {
mqtt_error("Error network response"); mqtt_error("Error network response");
return false; return false;
@@ -107,9 +113,6 @@ static bool mqtt_connect(mqtt_client *client)
switch (connect_rsp_code) { switch (connect_rsp_code) {
case CONNECTION_ACCEPTED: case CONNECTION_ACCEPTED:
mqtt_info("Connected"); mqtt_info("Connected");
if (client->settings->connected_cb) {
client->settings->connected_cb(client);
}
return true; return true;
case CONNECTION_REFUSE_PROTOCOL: case CONNECTION_REFUSE_PROTOCOL:
case CONNECTION_REFUSE_SERVER_UNAVAILABLE: case CONNECTION_REFUSE_SERVER_UNAVAILABLE:
@@ -124,41 +127,176 @@ static bool mqtt_connect(mqtt_client *client)
return false; return false;
} }
void mqtt_sending_task(void *pvParameters)
{
mqtt_client *client = (mqtt_client *)pvParameters;
uint32_t msg_len, send_len;
mqtt_info("mqtt_sending_task");
while (1) {
if (xQueueReceive(client->xSendingQueue, &msg_len, 1000 / portTICK_RATE_MS)) {
//queue available
while (msg_len > 0) {
send_len = msg_len;
if (send_len > CONFIG_MQTT_BUFFER_SIZE_BYTE)
send_len = CONFIG_MQTT_BUFFER_SIZE_BYTE;
mqtt_info("Sending...%d bytes", send_len);
rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer);
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len);
write(client->socket, client->mqtt_state.out_buffer, send_len);
msg_len -= send_len;
}
//invalidate keepalive timer
client->keepalive_tick = client->settings->keepalive / 2;
}
else {
if (client->keepalive_tick > 0) client->keepalive_tick --;
else {
client->keepalive_tick = client->settings->keepalive / 2;
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
mqtt_info("Sending pingreq");
write(client->socket,
client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
}
}
}
vTaskDelete(NULL);
}
void mqtt_start_receive_schedule(mqtt_client *client)
{
int read_len;
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
while (1) {
read_len = read(client->socket, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE);
mqtt_info("Read len %d", read_len);
if (read_len == 0)
break;
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
// mqtt_info("msg_type %d, msg_id: %d, pending_id: %d", msg_type, msg_id, client->mqtt_state.pending_msg_type);
switch (msg_type)
{
case MQTT_MSG_TYPE_SUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
mqtt_info("Subscribe successful");
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
mqtt_info("UnSubscribe successful");
break;
case MQTT_MSG_TYPE_PUBLISH:
if (msg_qos == 1)
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
else if (msg_qos == 2)
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
if (msg_qos == 1 || msg_qos == 2) {
mqtt_info("Queue response QoS: %d", msg_qos);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
// mqtt_info("MQTT: Queue full");
// }
}
// deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
mqtt_info("received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
}
break;
case MQTT_MSG_TYPE_PUBREC:
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
// mqtt_info("MQTT: Queue full");
// }
break;
case MQTT_MSG_TYPE_PUBREL:
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
// mqtt_info("MQTT: Queue full");
// }
break;
case MQTT_MSG_TYPE_PUBCOMP:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
mqtt_info("eceive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
}
break;
case MQTT_MSG_TYPE_PINGREQ:
client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
// mqtt_info("MQTT: Queue full");
// }
break;
case MQTT_MSG_TYPE_PINGRESP:
// Ignore
break;
}
}
mqtt_info("network disconnected");
}
void mqtt_destroy(mqtt_client *client) void mqtt_destroy(mqtt_client *client)
{ {
free(client->mqtt_state.in_buffer); free(client->mqtt_state.in_buffer);
free(client->mqtt_state.out_buffer); free(client->mqtt_state.out_buffer);
free(client); free(client);
vTaskDelete(xMqttTask);
} }
void mqtt_task(void *pvParameters) void mqtt_task(void *pvParameters)
{ {
mqtt_client *client = (mqtt_client *)pvParameters; mqtt_client *client = (mqtt_client *)pvParameters;
client->socket = client_connect(client->settings->host, client->settings->port);
mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port);
if (!mqtt_connect(client)) { while (1) {
client->socket = client_connect(client->settings->host, client->settings->port);
mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port);
if (!mqtt_connect(client)) {
continue;
//return;
}
mqtt_info("Connected to MQTT broker, create sending thread before call connected callback");
xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask);
if (client->settings->connected_cb) {
client->settings->connected_cb(client);
}
mqtt_info("mqtt_start_receive_schedule");
mqtt_start_receive_schedule(client);
close(client->socket); close(client->socket);
//return; vTaskDelete(xMqttSendingTask);
vTaskDelay(1000 / portTICK_RATE_MS);
} }
mqtt_info("wait");
while(1);
mqtt_destroy(client); mqtt_destroy(client);
vTaskDelete(NULL);
} }
void mqtt_start(mqtt_settings *settings) void mqtt_start(mqtt_settings *settings)
{ {
uint8_t *rb_buf;
if (xMqttTask != NULL) if (xMqttTask != NULL)
return; return;
mqtt_client *client = malloc(sizeof(mqtt_client)); mqtt_client *client = malloc(sizeof(mqtt_client));
memset(client, 0, sizeof(mqtt_client));
if (client == NULL) { if (client == NULL) {
mqtt_error("Memory not enought"); mqtt_error("Memory not enough");
return; return;
} }
memset(client, 0, sizeof(mqtt_client));
client->settings = settings; client->settings = settings;
client->connect_info.client_id = settings->client_id; client->connect_info.client_id = settings->client_id;
client->connect_info.username = settings->username; client->connect_info.username = settings->username;
@@ -167,6 +305,7 @@ void mqtt_start(mqtt_settings *settings)
client->connect_info.will_message = settings->lwt_msg; client->connect_info.will_message = settings->lwt_msg;
client->connect_info.will_qos = settings->lwt_qos; client->connect_info.will_qos = settings->lwt_qos;
client->connect_info.will_retain = settings->lwt_retain; client->connect_info.will_retain = settings->lwt_retain;
client->keepalive_tick = settings->keepalive / 2;
client->connect_info.keepalive = settings->keepalive; client->connect_info.keepalive = settings->keepalive;
client->connect_info.clean_session = settings->clean_session; client->connect_info.clean_session = settings->clean_session;
@@ -177,6 +316,19 @@ void mqtt_start(mqtt_settings *settings)
client->mqtt_state.out_buffer_length = CONFIG_MQTT_BUFFER_SIZE_BYTE; client->mqtt_state.out_buffer_length = CONFIG_MQTT_BUFFER_SIZE_BYTE;
client->mqtt_state.connect_info = &client->connect_info; client->mqtt_state.connect_info = &client->connect_info;
/* Create a queue capable of containing 64 unsigned long values. */
client->xSendingQueue = xQueueCreate(64, sizeof( uint32_t ));
rb_buf = (uint8_t*) malloc(CONFIG_MQTT_QUEUE_BUFFER_SIZE_WORD * 4);
if (rb_buf == NULL) {
mqtt_error("Memory not enough");
return;
}
rb_init(&client->send_rb, rb_buf, CONFIG_MQTT_QUEUE_BUFFER_SIZE_WORD * 4, 1);
mqtt_msg_init(&client->mqtt_state.mqtt_connection, mqtt_msg_init(&client->mqtt_state.mqtt_connection,
client->mqtt_state.out_buffer, client->mqtt_state.out_buffer,
client->mqtt_state.out_buffer_length); client->mqtt_state.out_buffer_length);
@@ -184,6 +336,19 @@ void mqtt_start(mqtt_settings *settings)
xTaskCreate(&mqtt_task, "mqtt_task", 2048, client, CONFIG_MQTT_PRIORITY, &xMqttTask); xTaskCreate(&mqtt_task, "mqtt_task", 2048, client, CONFIG_MQTT_PRIORITY, &xMqttTask);
} }
void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos)
{
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
&client->mqtt_state.pending_msg_id);
mqtt_info("MQTT: queue subscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
rb_write(&client->send_rb,
client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
xQueueSend(client->xSendingQueue, &client->mqtt_state.outbound_message->length, 0);
}
void mqtt_stop() void mqtt_stop()
{ {

106
ringbuf.c Normal file
View File

@@ -0,0 +1,106 @@
/**
* \file
* Ring Buffer library
*/
#include <stdio.h>
#include <string.h>
#include "ringbuf.h"
/**
* \brief init a RINGBUF object
* \param r pointer to a RINGBUF object
* \param buf pointer to a byte array
* \param size size of buf
* \param block_size is size of data as block
* \return 0 if successfull, otherwise failed
*/
int32_t rb_init(RINGBUF *r, uint8_t* buf, int32_t size, int32_t block_size)
{
if (r == 0 || buf == 0 || size < 2) return -1;
if (size % block_size != 0) return -1;
r->p_o = r->p_r = r->p_w = buf;
r->fill_cnt = 0;
r->size = size;
r->block_size = block_size;
return 0;
}
/**
* \brief put a character into ring buffer
* \param r pointer to a ringbuf object
* \param c character to be put
* \return 0 if successfull, otherwise failed
*/
int32_t rb_put(RINGBUF *r, uint8_t *c)
{
int32_t i;
uint8_t *data = c;
if (r->fill_cnt >= r->size)
return -1; // ring buffer is full, this should be atomic operation
r->fill_cnt += r->block_size; // increase filled slots count, this should be atomic operation
for (i = 0; i < r->block_size; i++) {
*r->p_w = *data; // put character into buffer
r->p_w ++;
data ++;
}
if (r->p_w >= r->p_o + r->size) // rollback if write pointer go pass
r->p_w = r->p_o; // the physical boundary
return 0;
}
/**
* \brief get a character from ring buffer
* \param r pointer to a ringbuf object
* \param c read character
* \return 0 if successfull, otherwise failed
*/
int32_t rb_get(RINGBUF *r, uint8_t *c)
{
int32_t i;
uint8_t *data = c;
if (r->fill_cnt <= 0)return -1; // ring buffer is empty, this should be atomic operation
r->fill_cnt -= r->block_size; // decrease filled slots count
for (i = 0; i < r->block_size; i++)
*data++ = *r->p_r++; // get the character out
if (r->p_r >= r->p_o + r->size) // rollback if write pointer go pass
r->p_r = r->p_o; // the physical boundary
return 0;
}
int32_t rb_available(RINGBUF *r)
{
return (r->size - r->fill_cnt);
}
uint32_t rb_read(RINGBUF *r, uint8_t *buf, int len)
{
int n = 0;
uint8_t data;
while (len > 0) {
while (rb_get(r, &data) != 0);
*buf++ = data;
n ++;
len --;
}
return n;
}
uint32_t rb_write(RINGBUF *r, uint8_t *buf, int len)
{
uint32_t wi;
for (wi = 0; wi < len; wi++) {
while (rb_put(r, &buf[wi]) != 0);
}
return 0;
}