diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 923ceb0..bbcafa4 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -88,6 +88,15 @@ typedef enum { MQTT_TRANSPORT_OVER_WSS /*!< MQTT over Websocket Secure, using scheme: ``wss`` */ } esp_mqtt_transport_t; +/** + * MQTT protocol version used for connection + */ +typedef enum { + MQTT_PROTOCOL_UNDEFINED = 0, + MQTT_PROTOCOL_V_3_1, + MQTT_PROTOCOL_V_3_1_1 +} esp_mqtt_protocol_ver_t; + /** * @brief MQTT error code structure to be passed as a contextual information into ERROR event * @@ -170,6 +179,7 @@ typedef struct { const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */ const char *clientkey_password; /*!< Client key decryption password string */ int clientkey_password_len; /*!< String length of the password pointed to by clientkey_password */ + esp_mqtt_protocol_ver_t protocol_ver; /*!< MQTT protocol version used for connection, defaults to value from menuconfig*/ } esp_mqtt_client_config_t; /** diff --git a/include/mqtt_config.h b/include/mqtt_config.h index 2a5d0a9..3b17bb4 100644 --- a/include/mqtt_config.h +++ b/include/mqtt_config.h @@ -8,7 +8,10 @@ #include "sdkconfig.h" -#define MQTT_PROTOCOL_311 CONFIG_MQTT_PROTOCOL_311 +#ifdef CONFIG_MQTT_PROTOCOL_311 +#define MQTT_PROTOCOL_311 +#endif + #define MQTT_RECON_DEFAULT_MS (10*1000) #define MQTT_POLL_READ_TIMEOUT_MS (1000) diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index aa6be8c..a0d2acc 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -4,6 +4,7 @@ #include #include "mqtt_config.h" +#include "mqtt_client.h" #ifdef __cplusplus extern "C" { #endif @@ -87,6 +88,7 @@ typedef struct mqtt_connect_info { int will_qos; int will_retain; int clean_session; + esp_mqtt_protocol_ver_t protocol_ver; } mqtt_connect_info_t; diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index d1ba224..37b2bd7 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -34,6 +34,8 @@ #include "platform.h" #define MQTT_MAX_FIXED_HEADER_SIZE 5 +#define MQTT_3_1_VARIABLE_HEADER_SIZE 12 +#define MQTT_3_1_1_VARIABLE_HEADER_SIZE 10 enum mqtt_connect_flag { MQTT_CONNECT_FLAG_USERNAME = 1 << 7, @@ -43,20 +45,6 @@ enum mqtt_connect_flag { MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 }; -struct __attribute((__packed__)) mqtt_connect_variable_header { - uint8_t lengthMsb; - uint8_t lengthLsb; -#if defined(MQTT_PROTOCOL_311) - uint8_t magic[4]; -#else - uint8_t magic[6]; -#endif - uint8_t version; - uint8_t flags; - uint8_t keepaliveMsb; - uint8_t keepaliveLsb; -}; - static int append_string(mqtt_connection_t *connection, const char *string, int len) { if (connection->message.length + len + 2 > connection->buffer_length) { @@ -344,33 +332,45 @@ uint16_t mqtt_get_id(uint8_t *buffer, uint32_t length) mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info) { - struct mqtt_connect_variable_header *variable_header; init_message(connection); - if (connection->message.length + sizeof(*variable_header) > connection->buffer_length) { + int header_len; + if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) { + header_len = MQTT_3_1_VARIABLE_HEADER_SIZE; + } else { + header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE; + } + + if (connection->message.length + header_len > connection->buffer_length) { return fail_message(connection); } - variable_header = (void *)(connection->buffer + connection->message.length); - connection->message.length += sizeof(*variable_header); + char* variable_header = (void *)(connection->buffer + connection->message.length); + connection->message.length += header_len; - variable_header->lengthMsb = 0; -#if defined(CONFIG_MQTT_PROTOCOL_311) - variable_header->lengthLsb = 4; - memcpy(variable_header->magic, "MQTT", 4); - variable_header->version = 4; -#else - variable_header->lengthLsb = 6; - memcpy(variable_header->magic, "MQIsdp", 6); - variable_header->version = 3; -#endif + int header_idx = 0; + variable_header[header_idx++] = 0; // Variable header length MSB - variable_header->flags = 0; - variable_header->keepaliveMsb = info->keepalive >> 8; - variable_header->keepaliveLsb = info->keepalive & 0xff; + if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) { + variable_header[header_idx++] = 6; // Variable header length LSB + memcpy(&variable_header[header_idx], "MQIsdp", 6); // Protocol name + header_idx = header_idx + 6; + variable_header[header_idx++] = 3; // Protocol version + } else { + /* Defaults to protocol version 3.1.1 values */ + variable_header[header_idx++] = 4; // Variable header length LSB + memcpy(&variable_header[header_idx], "MQTT", 4); // Protocol name + header_idx = header_idx + 4; + variable_header[header_idx++] = 4; // Protocol version + } + + int flags_offset = header_idx; + variable_header[header_idx++] = 0; // Flags + variable_header[header_idx++] = info->keepalive >> 8; // Keep-alive MSB + variable_header[header_idx] = info->keepalive & 0xff; // Keep-alive LSB if (info->clean_session) { - variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; + variable_header[flags_offset] |= MQTT_CONNECT_FLAG_CLEAN_SESSION; } if (info->client_id != NULL && info->client_id[0] != '\0') { @@ -390,11 +390,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf return fail_message(connection); } - variable_header->flags |= MQTT_CONNECT_FLAG_WILL; + variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL; if (info->will_retain) { - variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; + variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL_RETAIN; } - variable_header->flags |= (info->will_qos & 3) << 3; + variable_header[flags_offset] |= (info->will_qos & 3) << 3; } if (info->username != NULL && info->username[0] != '\0') { @@ -402,7 +402,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf return fail_message(connection); } - variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; + variable_header[flags_offset] |= MQTT_CONNECT_FLAG_USERNAME; } if (info->password != NULL && info->password[0] != '\0') { @@ -410,7 +410,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf return fail_message(connection); } - variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; + variable_header[flags_offset] |= MQTT_CONNECT_FLAG_PASSWORD; } return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0); diff --git a/mqtt_client.c b/mqtt_client.c index e13d77e..d5ab1c3 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -190,6 +190,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } else if (client->connect_info.client_id == NULL) { client->connect_info.client_id = platform_create_id_string(); } + ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed); ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id); @@ -233,6 +234,18 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl if (client->connect_info.keepalive == 0) { client->connect_info.keepalive = MQTT_KEEPALIVE_TICK; } + + if (config->protocol_ver) { + client->connect_info.protocol_ver = config->protocol_ver; + } + if (client->connect_info.protocol_ver== MQTT_PROTOCOL_UNDEFINED) { +#ifdef MQTT_PROTOCOL_311 + client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1_1; +#else + client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1; +#endif + } + cfg->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS; if (config->user_context) { cfg->user_context = config->user_context;