diff --git a/include/mqtt_client.h b/include/mqtt_client.h old mode 100755 new mode 100644 index dd1d7ef..cb06132 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -18,7 +18,7 @@ extern "C" { #endif -typedef struct esp_mqtt_client* esp_mqtt_client_handle_t; +typedef struct esp_mqtt_client *esp_mqtt_client_handle_t; /** * @brief MQTT event types. @@ -77,7 +77,7 @@ typedef struct { int session_present; /*!< MQTT session_present flag for connection event */ } esp_mqtt_event_t; -typedef esp_mqtt_event_t* esp_mqtt_event_handle_t; +typedef esp_mqtt_event_t *esp_mqtt_event_handle_t; typedef esp_err_t (* mqtt_event_callback_t)(esp_mqtt_event_handle_t event); diff --git a/include/mqtt_config.h b/include/mqtt_config.h index 972f2a1..033fc0a 100644 --- a/include/mqtt_config.h +++ b/include/mqtt_config.h @@ -67,13 +67,13 @@ #endif #ifdef CONFIG_MQTT_USE_CORE_0 - #define MQTT_TASK_CORE 0 +#define MQTT_TASK_CORE 0 #else - #ifdef CONFIG_MQTT_USE_CORE_1 - #define MQTT_TASK_CORE 1 - #else - #define MQTT_TASK_CORE 0 - #endif +#ifdef CONFIG_MQTT_USE_CORE_1 +#define MQTT_TASK_CORE 1 +#else +#define MQTT_TASK_CORE 0 +#endif #endif diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index c65e0d4..80ab72f 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -40,8 +40,7 @@ extern "C" { /* Remaining Length */ -enum mqtt_message_type -{ +enum mqtt_message_type { MQTT_MSG_TYPE_CONNECT = 1, MQTT_MSG_TYPE_CONNACK = 2, MQTT_MSG_TYPE_PUBLISH = 3, @@ -58,8 +57,7 @@ enum mqtt_message_type MQTT_MSG_TYPE_DISCONNECT = 14 }; -enum mqtt_connect_return_code -{ +enum mqtt_connect_return_code { CONNECTION_ACCEPTED = 0, CONNECTION_REFUSE_PROTOCOL, CONNECTION_REFUSE_ID_REJECTED, @@ -68,31 +66,28 @@ enum mqtt_connect_return_code CONNECTION_REFUSE_NOT_AUTHORIZED }; -typedef struct mqtt_message -{ - uint8_t* data; +typedef struct mqtt_message { + uint8_t *data; uint32_t length; uint32_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */ uint32_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */ } mqtt_message_t; -typedef struct mqtt_connection -{ +typedef struct mqtt_connection { mqtt_message_t message; uint16_t message_id; - uint8_t* buffer; + uint8_t *buffer; uint16_t buffer_length; } mqtt_connection_t; -typedef struct mqtt_connect_info -{ - char* client_id; - char* username; - char* password; - char* will_topic; - char* will_message; +typedef struct mqtt_connect_info { + char *client_id; + char *username; + char *password; + char *will_topic; + char *will_message; int keepalive; int will_length; int will_qos; @@ -102,33 +97,54 @@ typedef struct mqtt_connect_info } mqtt_connect_info_t; -static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; } -static inline int mqtt_get_connect_session_present(uint8_t* buffer) { return buffer[2] & 0x01; } -static inline int mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; } -static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } -static inline void mqtt_set_dup(uint8_t* buffer) { buffer[0] |= 0x08; } -static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } -static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } +static inline int mqtt_get_type(uint8_t *buffer) +{ + return (buffer[0] & 0xf0) >> 4; +} +static inline int mqtt_get_connect_session_present(uint8_t *buffer) +{ + return buffer[2] & 0x01; +} +static inline int mqtt_get_connect_return_code(uint8_t *buffer) +{ + return buffer[3]; +} +static inline int mqtt_get_dup(uint8_t *buffer) +{ + return (buffer[0] & 0x08) >> 3; +} +static inline void mqtt_set_dup(uint8_t *buffer) +{ + buffer[0] |= 0x08; +} +static inline int mqtt_get_qos(uint8_t *buffer) +{ + return (buffer[0] & 0x06) >> 1; +} +static inline int mqtt_get_retain(uint8_t *buffer) +{ + return (buffer[0] & 0x01); +} -void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); -bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length); -uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len); -char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length); -char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length); -uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length); -int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length); +void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint16_t buffer_length); +bool mqtt_header_complete(uint8_t *buffer, uint16_t buffer_length); +uint32_t mqtt_get_total_length(uint8_t *buffer, uint16_t length, int *fixed_size_len); +char *mqtt_get_publish_topic(uint8_t *buffer, uint32_t *length); +char *mqtt_get_publish_data(uint8_t *buffer, uint32_t *length); +uint16_t mqtt_get_id(uint8_t *buffer, uint16_t length); +int mqtt_has_valid_msg_hdr(uint8_t *buffer, uint16_t length); -mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); -mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); -mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); -mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id); -mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection); -mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection); -mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection); +mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info); +mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id); +mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id); +mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id); +mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id); +mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id); +mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id); +mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id); +mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection); +mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection); +mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection); #ifdef __cplusplus diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 3939d7c..e3a5ba7 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -13,18 +13,18 @@ extern "C" { struct outbox_item; -typedef struct outbox_list_t * outbox_handle_t; -typedef struct outbox_item * outbox_item_handle_t; -typedef struct outbox_message * outbox_message_handle_t; +typedef struct outbox_list_t *outbox_handle_t; +typedef struct outbox_item *outbox_item_handle_t; +typedef struct outbox_message *outbox_message_handle_t; typedef struct outbox_message { - uint8_t *data; - int len; - int msg_id; - int msg_qos; - int msg_type; - uint8_t *remaining_data; - int remaining_len; + uint8_t *data; + int len; + int msg_id; + int msg_qos; + int msg_type; + uint8_t *remaining_data; + int remaining_len; } outbox_message_t; typedef enum pending_state { @@ -37,7 +37,7 @@ outbox_handle_t outbox_init(); outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); -uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); +uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index c7b1fe1..05ffa19 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -37,8 +37,7 @@ #define MQTT_MAX_FIXED_HEADER_SIZE 5 -enum mqtt_connect_flag -{ +enum mqtt_connect_flag { MQTT_CONNECT_FLAG_USERNAME = 1 << 7, MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, @@ -46,8 +45,7 @@ enum mqtt_connect_flag MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 }; -struct __attribute((__packed__)) mqtt_connect_variable_header -{ +struct __attribute((__packed__)) mqtt_connect_variable_header { uint8_t lengthMsb; uint8_t lengthLsb; #if defined(MQTT_PROTOCOL_311) @@ -61,10 +59,11 @@ struct __attribute((__packed__)) mqtt_connect_variable_header uint8_t keepaliveLsb; }; -static int append_string(mqtt_connection_t* connection, const char* string, int len) +static int append_string(mqtt_connection_t *connection, const char *string, int len) { - if (connection->message.length + len + 2 > connection->buffer_length) + if (connection->message.length + len + 2 > connection->buffer_length) { return -1; + } connection->buffer[connection->message.length++] = len >> 8; connection->buffer[connection->message.length++] = len & 0xff; @@ -74,7 +73,7 @@ static int append_string(mqtt_connection_t* connection, const char* string, int return len + 2; } -static uint16_t append_message_id(mqtt_connection_t* connection, uint16_t message_id) +static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id) { // If message_id is zero then we should assign one, otherwise // we'll use the one supplied by the caller @@ -82,8 +81,9 @@ static uint16_t append_message_id(mqtt_connection_t* connection, uint16_t messag message_id = platform_random(65535); } - if (connection->message.length + 2 > connection->buffer_length) + if (connection->message.length + 2 > connection->buffer_length) { return 0; + } connection->buffer[connection->message.length++] = message_id >> 8; connection->buffer[connection->message.length++] = message_id & 0xff; @@ -91,20 +91,20 @@ static uint16_t append_message_id(mqtt_connection_t* connection, uint16_t messag return message_id; } -static int init_message(mqtt_connection_t* connection) +static int init_message(mqtt_connection_t *connection) { connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; return MQTT_MAX_FIXED_HEADER_SIZE; } -static mqtt_message_t* fail_message(mqtt_connection_t* connection) +static mqtt_message_t *fail_message(mqtt_connection_t *connection) { connection->message.data = connection->buffer; connection->message.length = 0; return &connection->message; } -static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) +static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain) { int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; int total_length = message_length; @@ -140,102 +140,101 @@ static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int // type byte connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); // length bytes - for (int j = 0; jbuffer[offs++] = encoded_lens[j]; } return &connection->message; } -void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length) +void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint16_t buffer_length) { memset(connection, 0, sizeof(mqtt_connection_t)); connection->buffer = buffer; connection->buffer_length = buffer_length; } -uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length, int* fixed_size_len) +uint32_t mqtt_get_total_length(uint8_t *buffer, uint16_t length, int *fixed_size_len) { int i; uint32_t totlen = 0; - for (i = 1; i < length; ++i) - { + for (i = 1; i < length; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); - if ((buffer[i] & 0x80) == 0) - { + if ((buffer[i] & 0x80) == 0) { ++i; break; } } totlen += i; - if (fixed_size_len) *fixed_size_len = i; - + if (fixed_size_len) { + *fixed_size_len = i; + } + return totlen; } -bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length) +bool mqtt_header_complete(uint8_t *buffer, uint16_t buffer_length) { uint16_t i; uint16_t topiclen; - for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i) - { - if(i >= buffer_length) + for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i) { + if (i >= buffer_length) { return false; - if ((buffer[i] & 0x80) == 0) - { + } + if ((buffer[i] & 0x80) == 0) { ++i; break; } } // i is now the length of the fixed header - if (i + 2 >= buffer_length) + if (i + 2 >= buffer_length) { return false; + } topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; i += topiclen; - if (mqtt_get_qos(buffer) > 0) - { + if (mqtt_get_qos(buffer) > 0) { i += 2; } // i is now the length of the fixed + variable header return buffer_length >= i; } -char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length) +char *mqtt_get_publish_topic(uint8_t *buffer, uint32_t *length) { int i; int totlen = 0; int topiclen; - for (i = 1; i < *length; ++i) - { + for (i = 1; i < *length; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); - if ((buffer[i] & 0x80) == 0) - { + if ((buffer[i] & 0x80) == 0) { ++i; break; } } totlen += i; - if (i + 2 >= *length) + if (i + 2 >= *length) { return NULL; + } topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; - if (i + topiclen > *length) + if (i + topiclen > *length) { return NULL; + } *length = topiclen; - return (char*)(buffer + i); -} + return (char *)(buffer + i); +} -char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length) +char *mqtt_get_publish_data(uint8_t *buffer, uint32_t *length) { int i; int totlen = 0; @@ -243,116 +242,118 @@ char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length) int blength = *length; *length = 0; - for (i = 1; i < blength; ++i) - { + for (i = 1; i < blength; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); - if ((buffer[i] & 0x80) == 0) - { + if ((buffer[i] & 0x80) == 0) { ++i; break; } } totlen += i; - if (i + 2 >= blength) + if (i + 2 >= blength) { return NULL; + } topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; - if (i + topiclen >= blength) + if (i + topiclen >= blength) { return NULL; + } i += topiclen; - if (mqtt_get_qos(buffer) > 0) - { - if (i + 2 >= blength) + if (mqtt_get_qos(buffer) > 0) { + if (i + 2 >= blength) { return NULL; + } i += 2; } - if (totlen < i) + if (totlen < i) { return NULL; + } - if (totlen <= blength) + if (totlen <= blength) { *length = totlen - i; - else + } else { *length = blength - i; - return (char*)(buffer + i); + } + return (char *)(buffer + i); } -uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length) +uint16_t mqtt_get_id(uint8_t *buffer, uint16_t length) { - if (length < 1) + if (length < 1) { return 0; + } - switch (mqtt_get_type(buffer)) - { - case MQTT_MSG_TYPE_PUBLISH: - { - int i; - int topiclen; + switch (mqtt_get_type(buffer)) { + case MQTT_MSG_TYPE_PUBLISH: { + int i; + int topiclen; - for (i = 1; i < length; ++i) - { - if ((buffer[i] & 0x80) == 0) - { - ++i; - break; - } - } - - if (i + 2 >= length) - return 0; - topiclen = buffer[i++] << 8; - topiclen |= buffer[i++]; - - if (i + topiclen > length) - return 0; - i += topiclen; - - if (mqtt_get_qos(buffer) > 0) - { - if (i + 2 > length) - return 0; - //i += 2; - } else { - return 0; - } - - return (buffer[i] << 8) | buffer[i + 1]; - } - case MQTT_MSG_TYPE_PUBACK: - case MQTT_MSG_TYPE_PUBREC: - case MQTT_MSG_TYPE_PUBREL: - case MQTT_MSG_TYPE_PUBCOMP: - case MQTT_MSG_TYPE_SUBACK: - case MQTT_MSG_TYPE_UNSUBACK: - case MQTT_MSG_TYPE_SUBSCRIBE: - case MQTT_MSG_TYPE_UNSUBSCRIBE: - { - // This requires the remaining length to be encoded in 1 byte, - // which it should be. - if (length >= 4 && (buffer[1] & 0x80) == 0) - return (buffer[2] << 8) | buffer[3]; - else - return 0; + for (i = 1; i < length; ++i) { + if ((buffer[i] & 0x80) == 0) { + ++i; + break; } + } - default: + if (i + 2 >= length) { return 0; + } + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if (i + topiclen > length) { + return 0; + } + i += topiclen; + + if (mqtt_get_qos(buffer) > 0) { + if (i + 2 > length) { + return 0; + } + //i += 2; + } else { + return 0; + } + + return (buffer[i] << 8) | buffer[i + 1]; + } + case MQTT_MSG_TYPE_PUBACK: + case MQTT_MSG_TYPE_PUBREC: + case MQTT_MSG_TYPE_PUBREL: + case MQTT_MSG_TYPE_PUBCOMP: + case MQTT_MSG_TYPE_SUBACK: + case MQTT_MSG_TYPE_UNSUBACK: + case MQTT_MSG_TYPE_SUBSCRIBE: + case MQTT_MSG_TYPE_UNSUBSCRIBE: { + // This requires the remaining length to be encoded in 1 byte, + // which it should be. + if (length >= 4 && (buffer[1] & 0x80) == 0) { + return (buffer[2] << 8) | buffer[3]; + } else { + return 0; + } + } + + default: + return 0; } } -mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info) +mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info) { - struct mqtt_connect_variable_header* variable_header; + struct mqtt_connect_variable_header *variable_header; init_message(connection); - if (connection->message.length + sizeof(*variable_header) > connection->buffer_length) + if (connection->message.length + sizeof(*variable_header) > connection->buffer_length) { return fail_message(connection); - variable_header = (void*)(connection->buffer + connection->message.length); + } + variable_header = (void *)(connection->buffer + connection->message.length); connection->message.length += sizeof(*variable_header); variable_header->lengthMsb = 0; @@ -370,43 +371,46 @@ mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_inf variable_header->keepaliveMsb = info->keepalive >> 8; variable_header->keepaliveLsb = info->keepalive & 0xff; - if (info->clean_session) + if (info->clean_session) { variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; - - if (info->client_id != NULL && info->client_id[0] != '\0') - { - if (append_string(connection, info->client_id, strlen(info->client_id)) < 0) - return fail_message(connection); } - else + + if (info->client_id != NULL && info->client_id[0] != '\0') { + if (append_string(connection, info->client_id, strlen(info->client_id)) < 0) { + return fail_message(connection); + } + } else { return fail_message(connection); + } - if (info->will_topic != NULL && info->will_topic[0] != '\0') - { - if (append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) + if (info->will_topic != NULL && info->will_topic[0] != '\0') { + if (append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) { return fail_message(connection); + } - if (append_string(connection, info->will_message, info->will_length) < 0) + if (append_string(connection, info->will_message, info->will_length) < 0) { return fail_message(connection); + } variable_header->flags |= MQTT_CONNECT_FLAG_WILL; - if (info->will_retain) + if (info->will_retain) { variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; + } variable_header->flags |= (info->will_qos & 3) << 3; } - if (info->username != NULL && info->username[0] != '\0') - { - if (append_string(connection, info->username, strlen(info->username)) < 0) + if (info->username != NULL && info->username[0] != '\0') { + if (append_string(connection, info->username, strlen(info->username)) < 0) { return fail_message(connection); + } variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; } - if (info->password != NULL && info->password[0] != '\0') - { - if (append_string(connection, info->password, strlen(info->password)) < 0) + if (info->password != NULL && info->password[0] != '\0') { + if (append_string(connection, info->password, strlen(info->password)) < 0) { return fail_message(connection); + } variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; } @@ -414,23 +418,25 @@ mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_inf return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0); } -mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id) +mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id) { init_message(connection); - if (topic == NULL || topic[0] == '\0') + if (topic == NULL || topic[0] == '\0') { return fail_message(connection); - - if (append_string(connection, topic, strlen(topic)) < 0) - return fail_message(connection); - - if (qos > 0) - { - if ((*message_id = append_message_id(connection, 0)) == 0) - return fail_message(connection); } - else + + if (append_string(connection, topic, strlen(topic)) < 0) { + return fail_message(connection); + } + + if (qos > 0) { + if ((*message_id = append_message_id(connection, 0)) == 0) { + return fail_message(connection); + } + } else { *message_id = 0; + } if (connection->message.length + data_length > connection->buffer_length) { // Not enough size in buffer -> fragment this message @@ -446,87 +452,98 @@ mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topi return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } -mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id) { init_message(connection); - if (append_message_id(connection, message_id) == 0) + if (append_message_id(connection, message_id) == 0) { return fail_message(connection); + } return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); } -mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id) { init_message(connection); - if (append_message_id(connection, message_id) == 0) + if (append_message_id(connection, message_id) == 0) { return fail_message(connection); + } return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); } -mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id) { init_message(connection); - if (append_message_id(connection, message_id) == 0) + if (append_message_id(connection, message_id) == 0) { return fail_message(connection); + } return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); } -mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id) { init_message(connection); - if (append_message_id(connection, message_id) == 0) + if (append_message_id(connection, message_id) == 0) { return fail_message(connection); + } return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); } -mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id) +mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id) { init_message(connection); - if (topic == NULL || topic[0] == '\0') + if (topic == NULL || topic[0] == '\0') { return fail_message(connection); + } - if ((*message_id = append_message_id(connection, 0)) == 0) + if ((*message_id = append_message_id(connection, 0)) == 0) { return fail_message(connection); + } - if (append_string(connection, topic, strlen(topic)) < 0) + if (append_string(connection, topic, strlen(topic)) < 0) { return fail_message(connection); + } - if (connection->message.length + 1 > connection->buffer_length) + if (connection->message.length + 1 > connection->buffer_length) { return fail_message(connection); + } connection->buffer[connection->message.length++] = qos; return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); } -mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id) +mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id) { init_message(connection); - if (topic == NULL || topic[0] == '\0') + if (topic == NULL || topic[0] == '\0') { return fail_message(connection); + } - if ((*message_id = append_message_id(connection, 0)) == 0) + if ((*message_id = append_message_id(connection, 0)) == 0) { return fail_message(connection); + } - if (append_string(connection, topic, strlen(topic)) < 0) + if (append_string(connection, topic, strlen(topic)) < 0) { return fail_message(connection); + } return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0); } -mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection) +mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection) { init_message(connection); return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); } -mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection) +mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection) { init_message(connection); return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); } -mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection) +mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection) { init_message(connection); return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); @@ -536,39 +553,38 @@ mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection) * check flags: [MQTT-2.2.2-1], [MQTT-2.2.2-2] * returns 0 if flags are invalid, otherwise returns 1 */ -int mqtt_has_valid_msg_hdr(uint8_t* buffer, uint16_t length) +int mqtt_has_valid_msg_hdr(uint8_t *buffer, uint16_t length) { int qos, dup; if (length < 1) { return 0; } - switch (mqtt_get_type(buffer)) - { - case MQTT_MSG_TYPE_CONNECT: - case MQTT_MSG_TYPE_CONNACK: - case MQTT_MSG_TYPE_PUBACK: - case MQTT_MSG_TYPE_PUBREC: - case MQTT_MSG_TYPE_PUBCOMP: - case MQTT_MSG_TYPE_SUBACK: - case MQTT_MSG_TYPE_UNSUBACK: - case MQTT_MSG_TYPE_PINGREQ: - case MQTT_MSG_TYPE_PINGRESP: - case MQTT_MSG_TYPE_DISCONNECT: - return (buffer[0] & 0x0f) == 0; /* all flag bits are 0 */ - case MQTT_MSG_TYPE_PUBREL: - case MQTT_MSG_TYPE_SUBSCRIBE: - case MQTT_MSG_TYPE_UNSUBSCRIBE: - return (buffer[0] & 0x0f) == 0x02; /* only bit 1 is set */ - case MQTT_MSG_TYPE_PUBLISH: - qos = mqtt_get_qos(buffer); - dup = mqtt_get_dup(buffer); - /* - * there is no qos=3 [MQTT-3.3.1-4] - * dup flag must be set to 0 for all qos=0 messages [MQTT-3.3.1-2] - */ - return (qos < 3) && ((qos > 0) || (dup == 0)); - default: - return 0; + switch (mqtt_get_type(buffer)) { + case MQTT_MSG_TYPE_CONNECT: + case MQTT_MSG_TYPE_CONNACK: + case MQTT_MSG_TYPE_PUBACK: + case MQTT_MSG_TYPE_PUBREC: + case MQTT_MSG_TYPE_PUBCOMP: + case MQTT_MSG_TYPE_SUBACK: + case MQTT_MSG_TYPE_UNSUBACK: + case MQTT_MSG_TYPE_PINGREQ: + case MQTT_MSG_TYPE_PINGRESP: + case MQTT_MSG_TYPE_DISCONNECT: + return (buffer[0] & 0x0f) == 0; /* all flag bits are 0 */ + case MQTT_MSG_TYPE_PUBREL: + case MQTT_MSG_TYPE_SUBSCRIBE: + case MQTT_MSG_TYPE_UNSUBSCRIBE: + return (buffer[0] & 0x0f) == 0x02; /* only bit 1 is set */ + case MQTT_MSG_TYPE_PUBLISH: + qos = mqtt_get_qos(buffer); + dup = mqtt_get_dup(buffer); + /* + * there is no qos=3 [MQTT-3.3.1-4] + * dup flag must be set to 0 for all qos=0 messages [MQTT-3.3.1-2] + */ + return (qos < 3) && ((qos > 0) || (dup == 0)); + default: + return 0; } } \ No newline at end of file diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 04bbce6..ac7359a 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -49,7 +49,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl }); memcpy(item->buffer, message->data, message->len); if (message->remaining_data) { - memcpy(item->buffer+message->len, message->remaining_data, message->remaining_len); + memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len); } STAILQ_INSERT_TAIL(outbox, item, next); ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox)); @@ -81,14 +81,14 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend return NULL; } -uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) +uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) { if (item) { *len = item->len; *msg_id = item->msg_id; *msg_type = item->msg_type; *qos = item->msg_qos; - return (uint8_t*)item->buffer; + return (uint8_t *)item->buffer; } return NULL; } @@ -97,7 +97,7 @@ esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) { outbox_item_handle_t item, tmp; STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - if (item->msg_id == msg_id && (0xFF&(item->msg_type)) == msg_type) { + if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) { STAILQ_REMOVE(outbox, item, outbox_item, next); free(item->buffer); free(item); @@ -174,7 +174,7 @@ int outbox_get_size(outbox_handle_t outbox) esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size) { - while(outbox_get_size(outbox) > max_size) { + while (outbox_get_size(outbox) > max_size) { outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED, NULL); if (item == NULL) { return ESP_FAIL; diff --git a/lib/platform_esp32_idf.c b/lib/platform_esp32_idf.c index d640e02..bbe0199 100644 --- a/lib/platform_esp32_idf.c +++ b/lib/platform_esp32_idf.c @@ -21,14 +21,14 @@ char *platform_create_id_string() int platform_random(int max) { - return esp_random()%max; + return esp_random() % max; } long long platform_tick_get_ms() { struct timeval te; gettimeofday(&te, NULL); // get current time - long long milliseconds = te.tv_sec*1000LL + te.tv_usec/1000; // calculate milliseconds + long long milliseconds = te.tv_sec * 1000LL + te.tv_usec / 1000; // calculate milliseconds // printf("milliseconds: %lld\n", milliseconds); return milliseconds; } diff --git a/mqtt_client.c b/mqtt_client.c index 0036703..6afae73 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -27,8 +27,7 @@ static const char *TAG = "MQTT_CLIENT"; -typedef struct mqtt_state -{ +typedef struct mqtt_state { mqtt_connect_info_t *connect_info; uint8_t *in_buffer; uint8_t *out_buffer; @@ -268,9 +267,9 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m client->mqtt_state.pending_msg_id); write_len = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->config->network_timeout_ms); + (char *)client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length, + client->config->network_timeout_ms); if (write_len < 0) { ESP_LOGE(TAG, "Writing failed, errno= %d", errno); return ESP_FAIL; @@ -283,7 +282,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m read_len = mqtt_message_receive(client, client->config->network_timeout_ms); if (read_len <= 0) { ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len); - return ESP_FAIL; + return ESP_FAIL; } if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) { @@ -292,24 +291,24 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m } connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer); switch (connect_rsp_code) { - case CONNECTION_ACCEPTED: - ESP_LOGD(TAG, "Connected"); - return ESP_OK; - case CONNECTION_REFUSE_PROTOCOL: - ESP_LOGW(TAG, "Connection refused, bad protocol"); - return ESP_FAIL; - case CONNECTION_REFUSE_SERVER_UNAVAILABLE: - ESP_LOGW(TAG, "Connection refused, server unavailable"); - return ESP_FAIL; - case CONNECTION_REFUSE_BAD_USERNAME: - ESP_LOGW(TAG, "Connection refused, bad username or password"); - return ESP_FAIL; - case CONNECTION_REFUSE_NOT_AUTHORIZED: - ESP_LOGW(TAG, "Connection refused, not authorized"); - return ESP_FAIL; - default: - ESP_LOGW(TAG, "Connection refused, Unknow reason"); - return ESP_FAIL; + case CONNECTION_ACCEPTED: + ESP_LOGD(TAG, "Connected"); + return ESP_OK; + case CONNECTION_REFUSE_PROTOCOL: + ESP_LOGW(TAG, "Connection refused, bad protocol"); + return ESP_FAIL; + case CONNECTION_REFUSE_SERVER_UNAVAILABLE: + ESP_LOGW(TAG, "Connection refused, server unavailable"); + return ESP_FAIL; + case CONNECTION_REFUSE_BAD_USERNAME: + ESP_LOGW(TAG, "Connection refused, bad username or password"); + return ESP_FAIL; + case CONNECTION_REFUSE_NOT_AUTHORIZED: + ESP_LOGW(TAG, "Connection refused, not authorized"); + return ESP_FAIL; + default: + ESP_LOGW(TAG, "Connection refused, Unknow reason"); + return ESP_FAIL; } return ESP_OK; } @@ -491,7 +490,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u } if (puri.field_data[UF_PORT].len) { - client->config->port = strtol((const char*)(uri + puri.field_data[UF_PORT].off), NULL, 10); + client->config->port = strtol((const char *)(uri + puri.field_data[UF_PORT].off), NULL, 10); } char *user_info = create_string(uri + puri.field_data[UF_USERINFO].off, puri.field_data[UF_USERINFO].len); @@ -513,9 +512,9 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) { int write_len = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->config->network_timeout_ms); + (char *)client->mqtt_state.outbound_message->data, + client->mqtt_state.outbound_message->length, + client->config->network_timeout_ms); // client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); if (write_len <= 0) { ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno); @@ -564,7 +563,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) // get payload msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len); - if(msg_data_len > 0 && msg_data == NULL) { + if (msg_data_len > 0 && msg_data == NULL) { ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__); return ESP_FAIL; } @@ -575,7 +574,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) post_data_event: ESP_LOGD(TAG, "Get data len= %d, topic len=%d, total_data: %d offset: %d", msg_data_len, msg_topic_len, - client->event.total_data_len, msg_data_offset); + client->event.total_data_len, msg_data_offset); client->event.event_id = MQTT_EVENT_DATA; client->event.data = msg_data_len > 0 ? msg_data : NULL; client->event.data_len = msg_data_len; @@ -589,7 +588,7 @@ post_data_event: size_t buf_len = client->mqtt_state.in_buffer_length; esp_transport_handle_t transport = esp_transport_get_payload_transport_handle(client->transport); - msg_data = (char*)client->mqtt_state.in_buffer; + msg_data = (char *)client->mqtt_state.in_buffer; msg_topic = NULL; msg_topic_len = 0; msg_data_offset += msg_data_len; @@ -706,7 +705,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t /* any further reading only the underlying payload */ t = esp_transport_get_payload_transport_handle(t); if ((client->mqtt_state.in_buffer_read_len == 1) || - ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) { + ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) { do { /* * Read the "remaining length" part of mqtt packet fixed header. It @@ -758,8 +757,8 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t } } int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8; - topic_len |= client->mqtt_state.in_buffer[fixed_header_len+1]; - total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer)>0?2:0); + topic_len |= client->mqtt_state.in_buffer[fixed_header_len + 1]; + total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer) > 0 ? 2 : 0); ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len); if (client->mqtt_state.in_buffer_length < total_len) { ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__); @@ -822,75 +821,73 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); - switch (msg_type) - { - case MQTT_MSG_TYPE_SUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { - ESP_LOGD(TAG, "Subscribe successful"); - client->event.event_id = MQTT_EVENT_SUBSCRIBED; - esp_mqtt_dispatch_event_with_msgid(client); - } - break; - case MQTT_MSG_TYPE_UNSUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { - ESP_LOGD(TAG, "UnSubscribe successful"); - client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; - esp_mqtt_dispatch_event_with_msgid(client); - } - break; - case MQTT_MSG_TYPE_PUBLISH: - ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length); - if (deliver_publish(client) != ESP_OK) { - ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id); + switch (msg_type) { + case MQTT_MSG_TYPE_SUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { + ESP_LOGD(TAG, "Subscribe successful"); + client->event.event_id = MQTT_EVENT_SUBSCRIBED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_UNSUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { + ESP_LOGD(TAG, "UnSubscribe successful"); + client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PUBLISH: + ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length); + if (deliver_publish(client) != ESP_OK) { + ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id); + return ESP_FAIL; + } + if (msg_qos == 1) { + client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + } else if (msg_qos == 2) { + client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + } + + if (msg_qos == 1 || msg_qos == 2) { + ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); + + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); return ESP_FAIL; } - if (msg_qos == 1) { - client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); - } - else if (msg_qos == 2) { - client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); - } - - if (msg_qos == 1 || msg_qos == 2) { - ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); - - if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); - return ESP_FAIL; - } - } - break; - case MQTT_MSG_TYPE_PUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); - outbox_set_pending(client->outbox, msg_id, CONFIRMED); - client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event_with_msgid(client); - } - break; - case MQTT_MSG_TYPE_PUBREC: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); - client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + } + break; + case MQTT_MSG_TYPE_PUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); outbox_set_pending(client->outbox, msg_id, CONFIRMED); - mqtt_write_data(client); - break; - case MQTT_MSG_TYPE_PUBREL: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); - client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); - mqtt_write_data(client); - break; - case MQTT_MSG_TYPE_PUBCOMP: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { - ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); - client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event_with_msgid(client); - } - break; - case MQTT_MSG_TYPE_PINGRESP: - ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); - client->wait_for_ping_resp = false; - break; + client->event.event_id = MQTT_EVENT_PUBLISHED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PUBREC: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); + client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + outbox_set_pending(client->outbox, msg_id, CONFIRMED); + mqtt_write_data(client); + break; + case MQTT_MSG_TYPE_PUBREL: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); + client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_write_data(client); + break; + case MQTT_MSG_TYPE_PUBCOMP: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); + client->event.event_id = MQTT_EVENT_PUBLISHED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PINGRESP: + ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); + client->wait_for_ping_resp = false; + break; } client->mqtt_state.in_buffer_read_len = 0; @@ -901,9 +898,9 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item { // decode queued data client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id, - &client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos); + &client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos); // set duplicate flag for QoS-2 message - if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) { + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 2) { mqtt_set_dup(client->mqtt_state.outbound_message->data); } @@ -940,111 +937,111 @@ static void esp_mqtt_task(void *pv) while (client->run) { MQTT_API_LOCK(client); switch ((int)client->state) { - case MQTT_STATE_INIT: - xEventGroupClearBits(client->status_bits, RECONNECT_BIT); - client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; - esp_mqtt_dispatch_event_with_msgid(client); + case MQTT_STATE_INIT: + xEventGroupClearBits(client->status_bits, RECONNECT_BIT); + client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; + esp_mqtt_dispatch_event_with_msgid(client); - if (client->transport == NULL) { - ESP_LOGE(TAG, "There are no transport"); - client->run = false; - } + if (client->transport == NULL) { + ESP_LOGE(TAG, "There are no transport"); + client->run = false; + } - if (esp_transport_connect(client->transport, + if (esp_transport_connect(client->transport, client->config->host, client->config->port, client->config->network_timeout_ms) < 0) { - ESP_LOGE(TAG, "Error transport connect"); - esp_mqtt_abort_connection(client); - break; - } - ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port); - if (esp_mqtt_connect(client, client->config->network_timeout_ms) != ESP_OK) { - ESP_LOGI(TAG, "Error MQTT Connected"); - esp_mqtt_abort_connection(client); - break; - } - client->event.event_id = MQTT_EVENT_CONNECTED; - client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); - client->state = MQTT_STATE_CONNECTED; - esp_mqtt_dispatch_event_with_msgid(client); - client->refresh_connection_tick = platform_tick_get_ms(); - + ESP_LOGE(TAG, "Error transport connect"); + esp_mqtt_abort_connection(client); break; - case MQTT_STATE_CONNECTED: - // receive and process data - if (mqtt_process_receive(client) == ESP_FAIL) { - esp_mqtt_abort_connection(client); - break; - } + } + ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port); + if (esp_mqtt_connect(client, client->config->network_timeout_ms) != ESP_OK) { + ESP_LOGI(TAG, "Error MQTT Connected"); + esp_mqtt_abort_connection(client); + break; + } + client->event.event_id = MQTT_EVENT_CONNECTED; + client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); + client->state = MQTT_STATE_CONNECTED; + esp_mqtt_dispatch_event_with_msgid(client); + client->refresh_connection_tick = platform_tick_get_ms(); - // resend all non-transmitted messages first - outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL); - if (item) { - if (mqtt_resend_queued(client, item) == ESP_OK) { - outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); - } + break; + case MQTT_STATE_CONNECTED: + // receive and process data + if (mqtt_process_receive(client) == ESP_FAIL) { + esp_mqtt_abort_connection(client); + break; + } + + // resend all non-transmitted messages first + outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL); + if (item) { + if (mqtt_resend_queued(client, item) == ESP_OK) { + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); + } // resend other "transmitted" messages after 1s - } else if (platform_tick_get_ms() - last_retransmit > 1000) { - last_retransmit = platform_tick_get_ms(); - item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick); - if (item && (last_retransmit - msg_tick > 1000)) { - mqtt_resend_queued(client, item); - } + } else if (platform_tick_get_ms() - last_retransmit > 1000) { + last_retransmit = platform_tick_get_ms(); + item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick); + if (item && (last_retransmit - msg_tick > 1000)) { + mqtt_resend_queued(client, item); } + } - if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) { - //No ping resp from last ping => Disconnected - if(client->wait_for_ping_resp){ - ESP_LOGE(TAG, "No PING_RESP, disconnected"); - esp_mqtt_abort_connection(client); - client->wait_for_ping_resp = false; - break; - } - if (esp_mqtt_client_ping(client) == ESP_FAIL) { - ESP_LOGE(TAG, "Can't send ping, disconnected"); - esp_mqtt_abort_connection(client); - break; - } else { - client->wait_for_ping_resp = true; - } - ESP_LOGD(TAG, "PING sent"); - } - - if (client->config->refresh_connection_after_ms && - platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) { - ESP_LOGD(TAG, "Refreshing the connection..."); + if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) { + //No ping resp from last ping => Disconnected + if (client->wait_for_ping_resp) { + ESP_LOGE(TAG, "No PING_RESP, disconnected"); esp_mqtt_abort_connection(client); - client->state = MQTT_STATE_INIT; + client->wait_for_ping_resp = false; + break; } + if (esp_mqtt_client_ping(client) == ESP_FAIL) { + ESP_LOGE(TAG, "Can't send ping, disconnected"); + esp_mqtt_abort_connection(client); + break; + } else { + client->wait_for_ping_resp = true; + } + ESP_LOGD(TAG, "PING sent"); + } - //Delete mesaage after 30 senconds - int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); - client->mqtt_state.pending_msg_count -= deleted; - if (client->mqtt_state.pending_msg_count < 0) { - client->mqtt_state.pending_msg_count = 0; - } - // - outbox_cleanup(client->outbox, OUTBOX_MAX_SIZE); + if (client->config->refresh_connection_after_ms && + platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) { + ESP_LOGD(TAG, "Refreshing the connection..."); + esp_mqtt_abort_connection(client); + client->state = MQTT_STATE_INIT; + } + + //Delete mesaage after 30 senconds + int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); + client->mqtt_state.pending_msg_count -= deleted; + if (client->mqtt_state.pending_msg_count < 0) { + client->mqtt_state.pending_msg_count = 0; + } + // + outbox_cleanup(client->outbox, OUTBOX_MAX_SIZE); + break; + case MQTT_STATE_WAIT_TIMEOUT: + + if (!client->config->auto_reconnect) { + client->run = false; + client->state = MQTT_STATE_UNKNOWN; break; - case MQTT_STATE_WAIT_TIMEOUT: - - if (!client->config->auto_reconnect) { - client->run = false; - client->state = MQTT_STATE_UNKNOWN; - break; - } - if (platform_tick_get_ms() - client->reconnect_tick > client->wait_timeout_ms) { - client->state = MQTT_STATE_INIT; - client->reconnect_tick = platform_tick_get_ms(); - ESP_LOGD(TAG, "Reconnecting..."); - break; - } - MQTT_API_UNLOCK(client); - xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true, - client->wait_timeout_ms / 2 / portTICK_RATE_MS); - // continue the while loop insted of break, as the mutex is unlocked - continue; + } + if (platform_tick_get_ms() - client->reconnect_tick > client->wait_timeout_ms) { + client->state = MQTT_STATE_INIT; + client->reconnect_tick = platform_tick_get_ms(); + ESP_LOGD(TAG, "Reconnecting..."); + break; + } + MQTT_API_UNLOCK(client); + xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true, + client->wait_timeout_ms / 2 / portTICK_RATE_MS); + // continue the while loop insted of break, as the mutex is unlocked + continue; } MQTT_API_UNLOCK(client); if (MQTT_STATE_CONNECTED == client->state) { @@ -1072,17 +1069,17 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) return ESP_FAIL; } #if MQTT_CORE_SELECTION_ENABLED - ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE); - if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) { - ESP_LOGE(TAG, "Error create mqtt task"); - return ESP_FAIL; - } + ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE); + if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) { + ESP_LOGE(TAG, "Error create mqtt task"); + return ESP_FAIL; + } #else - ESP_LOGD(TAG, "Core selection disabled"); - if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) { - ESP_LOGE(TAG, "Error create mqtt task"); - return ESP_FAIL; - } + ESP_LOGD(TAG, "Core selection disabled"); + if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) { + ESP_LOGE(TAG, "Error create mqtt task"); + return ESP_FAIL; + } #endif return ESP_OK; } @@ -1109,7 +1106,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) ESP_LOGE(TAG, "Error sending disconnect message"); } MQTT_API_UNLOCK_FROM_OTHER_TASK(client); - + client->run = false; xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY); client->state = MQTT_STATE_UNKNOWN; @@ -1247,7 +1244,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, current_data += data_sent; if (remaining_len > 0) { - mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection; + mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection; ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len); if (connection->message.fragmented_msg_data_offset) { // asked to enqueue oversized message (first time only) @@ -1255,7 +1252,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, connection->message.fragmented_msg_total_length = 0; if (qos > 0) { // internally enqueue all big messages, as they dont fit 'pending msg' structure - mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len); + mqtt_enqueue_oversized(client, (uint8_t *)current_data, remaining_len); } }