MQTT: Add runtime selection of mqtt protocol version

Add config option for selecting protocol version at runtime.

This also fixed MQTT protocol version 3.1 which wasnt working with the original implementation

Closes https://github.com/espressif/esp-idf/issues/4448
Closes IDFGH-2311
Closes IDF-1320
This commit is contained in:
Marius Vikhammer
2020-01-09 11:50:40 +08:00
parent 86fc8b7584
commit 7ac0a42831
5 changed files with 66 additions and 38 deletions

View File

@ -88,6 +88,15 @@ typedef enum {
MQTT_TRANSPORT_OVER_WSS /*!< MQTT over Websocket Secure, using scheme: ``wss`` */
} esp_mqtt_transport_t;
/**
* MQTT protocol version used for connection
*/
typedef enum {
MQTT_PROTOCOL_UNDEFINED = 0,
MQTT_PROTOCOL_V_3_1,
MQTT_PROTOCOL_V_3_1_1
} esp_mqtt_protocol_ver_t;
/**
* @brief MQTT error code structure to be passed as a contextual information into ERROR event
*
@ -170,6 +179,7 @@ typedef struct {
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
const char *clientkey_password; /*!< Client key decryption password string */
int clientkey_password_len; /*!< String length of the password pointed to by clientkey_password */
esp_mqtt_protocol_ver_t protocol_ver; /*!< MQTT protocol version used for connection, defaults to value from menuconfig*/
} esp_mqtt_client_config_t;
/**

View File

@ -8,7 +8,10 @@
#include "sdkconfig.h"
#define MQTT_PROTOCOL_311 CONFIG_MQTT_PROTOCOL_311
#ifdef CONFIG_MQTT_PROTOCOL_311
#define MQTT_PROTOCOL_311
#endif
#define MQTT_RECON_DEFAULT_MS (10*1000)
#define MQTT_POLL_READ_TIMEOUT_MS (1000)

View File

@ -1,6 +1,7 @@
#ifndef MQTT_MSG_H
#define MQTT_MSG_H
#include "mqtt_config.h"
#include "mqtt_client.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -84,6 +85,7 @@ typedef struct mqtt_connect_info {
int will_qos;
int will_retain;
int clean_session;
esp_mqtt_protocol_ver_t protocol_ver;
} mqtt_connect_info_t;

View File

@ -36,6 +36,8 @@
#include "platform.h"
#define MQTT_MAX_FIXED_HEADER_SIZE 5
#define MQTT_3_1_VARIABLE_HEADER_SIZE 12
#define MQTT_3_1_1_VARIABLE_HEADER_SIZE 10
enum mqtt_connect_flag {
MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
@ -45,20 +47,6 @@ enum mqtt_connect_flag {
MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
};
struct __attribute((__packed__)) mqtt_connect_variable_header {
uint8_t lengthMsb;
uint8_t lengthLsb;
#if defined(MQTT_PROTOCOL_311)
uint8_t magic[4];
#else
uint8_t magic[6];
#endif
uint8_t version;
uint8_t flags;
uint8_t keepaliveMsb;
uint8_t keepaliveLsb;
};
static int append_string(mqtt_connection_t *connection, const char *string, int len)
{
if (connection->message.length + len + 2 > connection->buffer_length) {
@ -346,33 +334,45 @@ uint16_t mqtt_get_id(uint8_t *buffer, uint16_t length)
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
{
struct mqtt_connect_variable_header *variable_header;
init_message(connection);
if (connection->message.length + sizeof(*variable_header) > connection->buffer_length) {
int header_len;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
header_len = MQTT_3_1_VARIABLE_HEADER_SIZE;
} else {
header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE;
}
if (connection->message.length + header_len > connection->buffer_length) {
return fail_message(connection);
}
variable_header = (void *)(connection->buffer + connection->message.length);
connection->message.length += sizeof(*variable_header);
char* variable_header = (void *)(connection->buffer + connection->message.length);
connection->message.length += header_len;
variable_header->lengthMsb = 0;
#if defined(CONFIG_MQTT_PROTOCOL_311)
variable_header->lengthLsb = 4;
memcpy(variable_header->magic, "MQTT", 4);
variable_header->version = 4;
#else
variable_header->lengthLsb = 6;
memcpy(variable_header->magic, "MQIsdp", 6);
variable_header->version = 3;
#endif
int header_idx = 0;
variable_header[header_idx++] = 0; // Variable header length MSB
variable_header->flags = 0;
variable_header->keepaliveMsb = info->keepalive >> 8;
variable_header->keepaliveLsb = info->keepalive & 0xff;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
variable_header[header_idx++] = 6; // Variable header length LSB
memcpy(&variable_header[header_idx], "MQIsdp", 6); // Protocol name
header_idx = header_idx + 6;
variable_header[header_idx++] = 3; // Protocol version
} else {
/* Defaults to protocol version 3.1.1 values */
variable_header[header_idx++] = 4; // Variable header length LSB
memcpy(&variable_header[header_idx], "MQTT", 4); // Protocol name
header_idx = header_idx + 4;
variable_header[header_idx++] = 4; // Protocol version
}
int flags_offset = header_idx;
variable_header[header_idx++] = 0; // Flags
variable_header[header_idx++] = info->keepalive >> 8; // Keep-alive MSB
variable_header[header_idx] = info->keepalive & 0xff; // Keep-alive LSB
if (info->clean_session) {
variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
}
if (info->client_id != NULL && info->client_id[0] != '\0') {
@ -392,11 +392,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
return fail_message(connection);
}
variable_header->flags |= MQTT_CONNECT_FLAG_WILL;
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL;
if (info->will_retain) {
variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL_RETAIN;
}
variable_header->flags |= (info->will_qos & 3) << 3;
variable_header[flags_offset] |= (info->will_qos & 3) << 3;
}
if (info->username != NULL && info->username[0] != '\0') {
@ -404,7 +404,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
return fail_message(connection);
}
variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_USERNAME;
}
if (info->password != NULL && info->password[0] != '\0') {
@ -412,7 +412,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
return fail_message(connection);
}
variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_PASSWORD;
}
return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);

View File

@ -190,6 +190,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
} else if (client->connect_info.client_id == NULL) {
client->connect_info.client_id = platform_create_id_string();
}
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id);
@ -233,6 +234,18 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
if (client->connect_info.keepalive == 0) {
client->connect_info.keepalive = MQTT_KEEPALIVE_TICK;
}
if (config->protocol_ver) {
client->connect_info.protocol_ver = config->protocol_ver;
}
if (client->connect_info.protocol_ver== MQTT_PROTOCOL_UNDEFINED) {
#ifdef MQTT_PROTOCOL_311
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
#else
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1;
#endif
}
cfg->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS;
if (config->user_context) {
cfg->user_context = config->user_context;