diff --git a/README.md b/README.md index 20f6b35..be5a1bb 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,4 @@ This is component based on ESP-IDF for ESP32 -Sample project: https://github.com/tuanpmt/esp32-mqtt +Full documentation and sample project: https://github.com/tuanpmt/esp32-mqtt diff --git a/include/mqtt.h b/include/mqtt.h index ec4a4d0..0006ced 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -5,7 +5,9 @@ #include "mqtt_config.h" #include "mqtt_msg.h" #include "ringbuf.h" -typedef void (* mqtt_callback)(void *); + + +typedef void (* mqtt_callback)(void *, void *); typedef struct { mqtt_callback connected_cb; @@ -29,6 +31,17 @@ typedef struct { uint32_t keepalive; } mqtt_settings; +typedef struct mqtt_event_data_t +{ + uint8_t type; + const char* topic; + const char* data; + uint16_t topic_length; + uint16_t data_length; + uint16_t data_offset; + uint16_t data_total_length; +} mqtt_event_data_t; + typedef struct mqtt_state_t { uint16_t port; @@ -57,9 +70,9 @@ typedef struct { uint32_t keepalive_tick; } mqtt_client; -void mqtt_start(mqtt_settings *mqtt_info); +mqtt_client *mqtt_start(mqtt_settings *mqtt_info); void mqtt_task(void *pvParameters); void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos); -void mqtt_publish(); +void mqtt_publish(mqtt_client* client, char *topic, char *data, int len, int qos, int retain); void mqtt_detroy(); #endif diff --git a/mqtt.c b/mqtt.c index 7db39d9..c2f0e60 100644 --- a/mqtt.c +++ b/mqtt.c @@ -2,7 +2,7 @@ * @Author: Tuan PM * @Date: 2016-09-10 09:33:06 * @Last Modified by: Tuan PM -* @Last Modified time: 2016-09-12 17:03:56 +* @Last Modified time: 2016-09-12 20:58:07 */ #include #include "freertos/FreeRTOS.h" @@ -31,7 +31,14 @@ static int resolev_dns(const char *host, struct sockaddr_in *ip) { memcpy(&ip->sin_addr, addr_list[0], sizeof(ip->sin_addr)); return 1; } - +static void mqtt_queue(mqtt_client *client) +{ +// TODO: detect buffer full (ringbuf and queue) + rb_write(&client->send_rb, + client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length); + xQueueSend(client->xSendingQueue, &client->mqtt_state.outbound_message->length, 0); +} static int client_connect(const char *stream_host, int stream_port) { int sock; @@ -146,6 +153,7 @@ void mqtt_sending_task(void *pvParameters) client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer); client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len); write(client->socket, client->mqtt_state.out_buffer, send_len); + //TODO: Check sending type, to callback publish message msg_len -= send_len; } //invalidate keepalive timer @@ -169,6 +177,41 @@ void mqtt_sending_task(void *pvParameters) vTaskDelete(NULL); } +void deliver_publish(mqtt_client *client, uint8_t *message, int length) +{ + mqtt_event_data_t event_data; + int len_read, total_mqtt_len = 0, mqtt_len = 0, mqtt_offset = 0; + + do + { + event_data.topic_length = length; + event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length); + event_data.data_length = length; + event_data.data = mqtt_get_publish_data(message, &event_data.data_length); + + if(total_mqtt_len == 0){ + total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + event_data.data_length; + mqtt_len = event_data.data_length; + } else { + mqtt_len = len_read; + } + + event_data.data_total_length = total_mqtt_len; + event_data.data_offset = mqtt_offset; + event_data.data_length = mqtt_len; + + mqtt_info("Data received: %d/%d bytes ", mqtt_len, total_mqtt_len); + if(client->settings->data_cb) { + client->settings->data_cb(client, &event_data); + } + mqtt_offset += mqtt_len; + if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) + break; + len_read = read(client->socket, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE); + client->mqtt_state.message_length_read += len_read; + } while (1); + +} void mqtt_start_receive_schedule(mqtt_client *client) { int read_len; @@ -189,8 +232,12 @@ void mqtt_start_receive_schedule(mqtt_client *client) switch (msg_type) { case MQTT_MSG_TYPE_SUBACK: - if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) { mqtt_info("Subscribe successful"); + if (client->settings->subscribe_cb) { + client->settings->subscribe_cb(client, NULL); + } + } break; case MQTT_MSG_TYPE_UNSUBACK: if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) @@ -201,12 +248,19 @@ void mqtt_start_receive_schedule(mqtt_client *client) client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); else if (msg_qos == 2) client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + if (msg_qos == 1 || msg_qos == 2) { mqtt_info("Queue response QoS: %d", msg_qos); + mqtt_queue(client); // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { // mqtt_info("MQTT: Queue full"); // } } + client->mqtt_state.message_length_read = read_len; + client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + mqtt_info("deliver_publish"); + + deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); // deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); break; case MQTT_MSG_TYPE_PUBACK: @@ -217,28 +271,24 @@ void mqtt_start_receive_schedule(mqtt_client *client) break; case MQTT_MSG_TYPE_PUBREC: client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); - // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { - // mqtt_info("MQTT: Queue full"); - // } + mqtt_queue(client); break; case MQTT_MSG_TYPE_PUBREL: client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); - // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { - // mqtt_info("MQTT: Queue full"); - // } + mqtt_queue(client); + break; case MQTT_MSG_TYPE_PUBCOMP: if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { - mqtt_info("eceive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); + mqtt_info("Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); } break; case MQTT_MSG_TYPE_PINGREQ: client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection); - // if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { - // mqtt_info("MQTT: Queue full"); - // } + mqtt_queue(client); break; case MQTT_MSG_TYPE_PINGRESP: + mqtt_info("MQTT_MSG_TYPE_PINGRESP"); // Ignore break; } @@ -268,7 +318,7 @@ void mqtt_task(void *pvParameters) mqtt_info("Connected to MQTT broker, create sending thread before call connected callback"); xTaskCreate(&mqtt_sending_task, "mqtt_sending_task", 2048, client, CONFIG_MQTT_PRIORITY + 1, &xMqttSendingTask); if (client->settings->connected_cb) { - client->settings->connected_cb(client); + client->settings->connected_cb(client, NULL); } mqtt_info("mqtt_start_receive_schedule"); @@ -284,16 +334,16 @@ void mqtt_task(void *pvParameters) } -void mqtt_start(mqtt_settings *settings) +mqtt_client *mqtt_start(mqtt_settings *settings) { uint8_t *rb_buf; if (xMqttTask != NULL) - return; + return NULL; mqtt_client *client = malloc(sizeof(mqtt_client)); if (client == NULL) { mqtt_error("Memory not enough"); - return; + return NULL; } memset(client, 0, sizeof(mqtt_client)); @@ -324,7 +374,7 @@ void mqtt_start(mqtt_settings *settings) if (rb_buf == NULL) { mqtt_error("Memory not enough"); - return; + return NULL; } rb_init(&client->send_rb, rb_buf, CONFIG_MQTT_QUEUE_BUFFER_SIZE_WORD * 4, 1); @@ -334,20 +384,30 @@ void mqtt_start(mqtt_settings *settings) client->mqtt_state.out_buffer_length); xTaskCreate(&mqtt_task, "mqtt_task", 2048, client, CONFIG_MQTT_PRIORITY, &xMqttTask); + return client; } void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos) { - client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, topic, qos, &client->mqtt_state.pending_msg_id); - mqtt_info("MQTT: queue subscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); - rb_write(&client->send_rb, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); - xQueueSend(client->xSendingQueue, &client->mqtt_state.outbound_message->length, 0); + mqtt_info("Queue subscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); + mqtt_queue(client); +} +void mqtt_publish(mqtt_client* client, char *topic, char *data, int len, int qos, int retain) +{ + + client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, + topic, data, len, + qos, retain, + &client->mqtt_state.pending_msg_id); + mqtt_queue(client); + mqtt_info("Queuing publish, length: %d, queue size(%d/%d)\r\n", + client->mqtt_state.outbound_message->length, + client->send_rb.fill_cnt, + client->send_rb.size); } void mqtt_stop() {