Initial release

This commit is contained in:
Tuan PM
2016-09-12 21:15:28 +07:00
parent d22b7d7ada
commit ba966c7a9a
3 changed files with 101 additions and 28 deletions

View File

@@ -2,4 +2,4 @@
This is component based on ESP-IDF for ESP32 This is component based on ESP-IDF for ESP32
Sample project: https://github.com/tuanpmt/esp32-mqtt Full documentation and sample project: https://github.com/tuanpmt/esp32-mqtt

View File

@@ -5,7 +5,9 @@
#include "mqtt_config.h" #include "mqtt_config.h"
#include "mqtt_msg.h" #include "mqtt_msg.h"
#include "ringbuf.h" #include "ringbuf.h"
typedef void (* mqtt_callback)(void *);
typedef void (* mqtt_callback)(void *, void *);
typedef struct { typedef struct {
mqtt_callback connected_cb; mqtt_callback connected_cb;
@@ -29,6 +31,17 @@ typedef struct {
uint32_t keepalive; uint32_t keepalive;
} mqtt_settings; } mqtt_settings;
typedef struct mqtt_event_data_t
{
uint8_t type;
const char* topic;
const char* data;
uint16_t topic_length;
uint16_t data_length;
uint16_t data_offset;
uint16_t data_total_length;
} mqtt_event_data_t;
typedef struct mqtt_state_t typedef struct mqtt_state_t
{ {
uint16_t port; uint16_t port;
@@ -57,9 +70,9 @@ typedef struct {
uint32_t keepalive_tick; uint32_t keepalive_tick;
} mqtt_client; } mqtt_client;
void mqtt_start(mqtt_settings *mqtt_info); mqtt_client *mqtt_start(mqtt_settings *mqtt_info);
void mqtt_task(void *pvParameters); void mqtt_task(void *pvParameters);
void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos); void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos);
void mqtt_publish(); void mqtt_publish(mqtt_client* client, char *topic, char *data, int len, int qos, int retain);
void mqtt_detroy(); void mqtt_detroy();
#endif #endif

108
mqtt.c
View File

@@ -2,7 +2,7 @@
* @Author: Tuan PM * @Author: Tuan PM
* @Date: 2016-09-10 09:33:06 * @Date: 2016-09-10 09:33:06
* @Last Modified by: Tuan PM * @Last Modified by: Tuan PM
* @Last Modified time: 2016-09-12 17:03:56 * @Last Modified time: 2016-09-12 20:58:07
*/ */
#include <stdio.h> #include <stdio.h>
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
@@ -31,7 +31,14 @@ static int resolev_dns(const char *host, struct sockaddr_in *ip) {
memcpy(&ip->sin_addr, addr_list[0], sizeof(ip->sin_addr)); memcpy(&ip->sin_addr, addr_list[0], sizeof(ip->sin_addr));
return 1; return 1;
} }
static void mqtt_queue(mqtt_client *client)
{
// TODO: detect buffer full (ringbuf and queue)
rb_write(&client->send_rb,
client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
xQueueSend(client->xSendingQueue, &client->mqtt_state.outbound_message->length, 0);
}
static int client_connect(const char *stream_host, int stream_port) static int client_connect(const char *stream_host, int stream_port)
{ {
int sock; int sock;
@@ -146,6 +153,7 @@ void mqtt_sending_task(void *pvParameters)
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer);
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len);
write(client->socket, client->mqtt_state.out_buffer, send_len); write(client->socket, client->mqtt_state.out_buffer, send_len);
//TODO: Check sending type, to callback publish message
msg_len -= send_len; msg_len -= send_len;
} }
//invalidate keepalive timer //invalidate keepalive timer
@@ -169,6 +177,41 @@ void mqtt_sending_task(void *pvParameters)
vTaskDelete(NULL); vTaskDelete(NULL);
} }
void deliver_publish(mqtt_client *client, uint8_t *message, int length)
{
mqtt_event_data_t event_data;
int len_read, total_mqtt_len = 0, mqtt_len = 0, mqtt_offset = 0;
do
{
event_data.topic_length = length;
event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);
event_data.data_length = length;
event_data.data = mqtt_get_publish_data(message, &event_data.data_length);
if(total_mqtt_len == 0){
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + event_data.data_length;
mqtt_len = event_data.data_length;
} else {
mqtt_len = len_read;
}
event_data.data_total_length = total_mqtt_len;
event_data.data_offset = mqtt_offset;
event_data.data_length = mqtt_len;
mqtt_info("Data received: %d/%d bytes ", mqtt_len, total_mqtt_len);
if(client->settings->data_cb) {
client->settings->data_cb(client, &event_data);
}
mqtt_offset += mqtt_len;
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length)
break;
len_read = read(client->socket, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE);
client->mqtt_state.message_length_read += len_read;
} while (1);
}
void mqtt_start_receive_schedule(mqtt_client *client) void mqtt_start_receive_schedule(mqtt_client *client)
{ {
int read_len; int read_len;
@@ -189,8 +232,12 @@ void mqtt_start_receive_schedule(mqtt_client *client)
switch (msg_type) switch (msg_type)
{ {
case MQTT_MSG_TYPE_SUBACK: case MQTT_MSG_TYPE_SUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) {
mqtt_info("Subscribe successful"); mqtt_info("Subscribe successful");
if (client->settings->subscribe_cb) {
client->settings->subscribe_cb(client, NULL);
}
}
break; break;
case MQTT_MSG_TYPE_UNSUBACK: case MQTT_MSG_TYPE_UNSUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
@@ -201,12 +248,19 @@ void mqtt_start_receive_schedule(mqtt_client *client)
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
else if (msg_qos == 2) else if (msg_qos == 2)
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
if (msg_qos == 1 || msg_qos == 2) { if (msg_qos == 1 || msg_qos == 2) {
mqtt_info("Queue response QoS: %d", msg_qos); mqtt_info("Queue response QoS: %d", msg_qos);
mqtt_queue(client);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
// mqtt_info("MQTT: Queue full"); // mqtt_info("MQTT: Queue full");
// } // }
} }
client->mqtt_state.message_length_read = read_len;
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
mqtt_info("deliver_publish");
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
// deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); // deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break; break;
case MQTT_MSG_TYPE_PUBACK: case MQTT_MSG_TYPE_PUBACK:
@@ -217,28 +271,24 @@ void mqtt_start_receive_schedule(mqtt_client *client)
break; break;
case MQTT_MSG_TYPE_PUBREC: case MQTT_MSG_TYPE_PUBREC:
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { mqtt_queue(client);
// mqtt_info("MQTT: Queue full");
// }
break; break;
case MQTT_MSG_TYPE_PUBREL: case MQTT_MSG_TYPE_PUBREL:
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { mqtt_queue(client);
// mqtt_info("MQTT: Queue full");
// }
break; break;
case MQTT_MSG_TYPE_PUBCOMP: case MQTT_MSG_TYPE_PUBCOMP:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
mqtt_info("eceive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); mqtt_info("Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
} }
break; break;
case MQTT_MSG_TYPE_PINGREQ: case MQTT_MSG_TYPE_PINGREQ:
client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection); client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
// if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { mqtt_queue(client);
// mqtt_info("MQTT: Queue full");
// }
break; break;
case MQTT_MSG_TYPE_PINGRESP: case MQTT_MSG_TYPE_PINGRESP:
mqtt_info("MQTT_MSG_TYPE_PINGRESP");
// Ignore // Ignore
break; break;
} }
@@ -268,7 +318,7 @@ void mqtt_task(void *pvParameters)
mqtt_info("Connected to MQTT broker, create sending thread before call connected callback"); mqtt_info("Connected to MQTT broker, create sending thread before call connected callback");
xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask); xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask);
if (client->settings->connected_cb) { if (client->settings->connected_cb) {
client->settings->connected_cb(client); client->settings->connected_cb(client, NULL);
} }
mqtt_info("mqtt_start_receive_schedule"); mqtt_info("mqtt_start_receive_schedule");
@@ -284,16 +334,16 @@ void mqtt_task(void *pvParameters)
} }
void mqtt_start(mqtt_settings *settings) mqtt_client *mqtt_start(mqtt_settings *settings)
{ {
uint8_t *rb_buf; uint8_t *rb_buf;
if (xMqttTask != NULL) if (xMqttTask != NULL)
return; return NULL;
mqtt_client *client = malloc(sizeof(mqtt_client)); mqtt_client *client = malloc(sizeof(mqtt_client));
if (client == NULL) { if (client == NULL) {
mqtt_error("Memory not enough"); mqtt_error("Memory not enough");
return; return NULL;
} }
memset(client, 0, sizeof(mqtt_client)); memset(client, 0, sizeof(mqtt_client));
@@ -324,7 +374,7 @@ void mqtt_start(mqtt_settings *settings)
if (rb_buf == NULL) { if (rb_buf == NULL) {
mqtt_error("Memory not enough"); mqtt_error("Memory not enough");
return; return NULL;
} }
rb_init(&client->send_rb, rb_buf, CONFIG_MQTT_QUEUE_BUFFER_SIZE_WORD * 4, 1); rb_init(&client->send_rb, rb_buf, CONFIG_MQTT_QUEUE_BUFFER_SIZE_WORD * 4, 1);
@@ -334,20 +384,30 @@ void mqtt_start(mqtt_settings *settings)
client->mqtt_state.out_buffer_length); client->mqtt_state.out_buffer_length);
xTaskCreate(&mqtt_task, "mqtt_task", 2048, client, CONFIG_MQTT_PRIORITY, &xMqttTask); xTaskCreate(&mqtt_task, "mqtt_task", 2048, client, CONFIG_MQTT_PRIORITY, &xMqttTask);
return client;
} }
void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos) void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos)
{ {
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos, topic, qos,
&client->mqtt_state.pending_msg_id); &client->mqtt_state.pending_msg_id);
mqtt_info("MQTT: queue subscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); mqtt_info("Queue subscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
rb_write(&client->send_rb, mqtt_queue(client);
client->mqtt_state.outbound_message->data, }
client->mqtt_state.outbound_message->length);
xQueueSend(client->xSendingQueue, &client->mqtt_state.outbound_message->length, 0);
void mqtt_publish(mqtt_client* client, char *topic, char *data, int len, int qos, int retain)
{
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len,
qos, retain,
&client->mqtt_state.pending_msg_id);
mqtt_queue(client);
mqtt_info("Queuing publish, length: %d, queue size(%d/%d)\r\n",
client->mqtt_state.outbound_message->length,
client->send_rb.fill_cnt,
client->send_rb.size);
} }
void mqtt_stop() void mqtt_stop()
{ {