Merge branch 'bugfix/outbox_tick_type' into 'master'

Various fixes in mqtt library

See merge request espressif/esp-mqtt!55
This commit is contained in:
David Čermák
2020-01-31 21:11:27 +08:00
21 changed files with 276 additions and 600 deletions

View File

@ -1,19 +0,0 @@
cmake_minimum_required(VERSION 3.5)
get_filename_component(DEV_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE)
set(PROJECT_ROOT "${DEV_ROOT}/")
set(SUBMODULE_ROOT "${DEV_ROOT}/../../../")
set(PROJECT_NAME "mqtt_ssl")
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
set(MAIN_SRCS ${PROJECT_ROOT}/main/app_main.c)
set(EXTRA_COMPONENT_DIRS "${EXTRA_COMPONENT_DIRS} ${SUBMODULE_ROOT}")
set(BUILD_COMPONENTS "${BUILD_COMPONENTS} espmqtt")
project(${PROJECT_NAME})

View File

@ -1,13 +0,0 @@
#
# This is a project Makefile. It is assumed the directory this Makefile resides in is a
# project subdirectory.
#
#
# This is a project Makefile. It is assumed the directory this Makefile resides in is a
# project subdirectory.
#
PROJECT_NAME := emitter_client
EXTRA_COMPONENT_DIRS += $(PROJECT_PATH)/../../../
include $(IDF_PATH)/make/project.mk

View File

@ -1,10 +0,0 @@
# ESPMQTT Emitter client
## Before you run this Example
- Register an account from https://emitter.io/
- Login and create channel key, grant access for the channel `/topic/` as the images bellow
- `make menuconfig` provide Wi-Fi information and CHANNEL_KEY to `MQTT Application example`
- `make flash monitor`
![](generate-key-0.png)
![](generate-key-1.png)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 234 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 61 KiB

View File

@ -1,22 +0,0 @@
menu "MQTT Application example"
config WIFI_SSID
string "WiFi SSID"
default "myssid"
help
SSID (network name) for the example to connect to.
config WIFI_PASSWORD
string "WiFi Password"
default "mypassword"
help
WiFi password (WPA or WPA2) for the example to use.
config EMITTER_CHANNEL_KEY
string "Emitter channel key"
default ""
help
The Emitter channel key using to pub/sub
endmenu

View File

@ -1,156 +0,0 @@
#include <stdio.h>
#include <stdint.h>
#include <stddef.h>
#include <string.h>
#include "esp_wifi.h"
#include "esp_system.h"
#include "nvs_flash.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "freertos/queue.h"
#include "freertos/event_groups.h"
#include "lwip/sockets.h"
#include "lwip/dns.h"
#include "lwip/netdb.h"
#include "esp_log.h"
#include "mqtt_client.h"
static const char *TAG = "MQTTS_SAMPLE";
static EventGroupHandle_t wifi_event_group;
const static int CONNECTED_BIT = BIT0;
static void wifi_event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
switch (event_id) {
case WIFI_EVENT_STA_START:
esp_wifi_connect();
break;
case WIFI_EVENT_STA_DISCONNECTED:
esp_wifi_connect();
xEventGroupClearBits(wifi_event_group, CONNECTED_BIT);
break;
default:
break;
}
return;
}
static void ip_event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
switch (event_id) {
case IP_EVENT_STA_GOT_IP:
xEventGroupSetBits(wifi_event_group, CONNECTED_BIT);
break;
default:
break;
}
return;
}
static void wifi_init(void)
{
tcpip_adapter_init();
wifi_event_group = xEventGroupCreate();
ESP_ERROR_CHECK(esp_event_loop_create_default());
ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, &wifi_event_handler, NULL));
ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &ip_event_handler, NULL));
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
ESP_ERROR_CHECK(esp_wifi_init(&cfg));
ESP_ERROR_CHECK(esp_wifi_set_storage(WIFI_STORAGE_RAM));
wifi_config_t wifi_config = {
.sta = {
.ssid = CONFIG_WIFI_SSID,
.password = CONFIG_WIFI_PASSWORD,
},
};
ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config));
ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******");
ESP_ERROR_CHECK(esp_wifi_start());
ESP_LOGI(TAG, "Waiting for wifi");
xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY);
}
static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
{
esp_mqtt_client_handle_t client = event->client;
int msg_id;
// your_context_t *context = event->context;
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_subscribe(client, CONFIG_EMITTER_CHANNEL_KEY"/topic/", 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, CONFIG_EMITTER_CHANNEL_KEY"/topic/", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
break;
default:
break;
}
return ESP_OK;
}
static void mqtt_app_start(void)
{
const esp_mqtt_client_config_t mqtt_cfg = {
.uri = "mqtts://api.emitter.io:443", // for mqtt over ssl
// .uri = "mqtt://api.emitter.io:8080", //for mqtt over tcp
// .uri = "ws://api.emitter.io:8080", //for mqtt over websocket
// .uri = "wss://api.emitter.io:443", //for mqtt over websocket secure
.event_handle = mqtt_event_handler,
};
ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size());
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_start(client);
}
void app_main()
{
ESP_LOGI(TAG, "[APP] Startup..");
ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size());
ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version());
esp_log_level_set("*", ESP_LOG_INFO);
esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_TCP", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_SSL", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE);
esp_log_level_set("OUTBOX", ESP_LOG_VERBOSE);
nvs_flash_init();
wifi_init();
mqtt_app_start();
}

View File

@ -1,19 +0,0 @@
cmake_minimum_required(VERSION 3.5)
get_filename_component(DEV_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE)
set(PROJECT_ROOT "${DEV_ROOT}/")
set(SUBMODULE_ROOT "${DEV_ROOT}/../../../")
set(PROJECT_NAME "mqtt_ssl_mutual_auth")
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
set(MAIN_SRCS ${PROJECT_ROOT}/main/app_main.c)
set(EXTRA_COMPONENT_DIRS "${EXTRA_COMPONENT_DIRS} ${SUBMODULE_ROOT}")
set(BUILD_COMPONENTS "${BUILD_COMPONENTS} espmqtt")
project(${PROJECT_NAME})

View File

@ -1,13 +0,0 @@
#
# This is a project Makefile. It is assumed the directory this Makefile resides in is a
# project subdirectory.
#
#
# This is a project Makefile. It is assumed the directory this Makefile resides in is a
# project subdirectory.
#
PROJECT_NAME := mqtt_ssl_mutual_auth
EXTRA_COMPONENT_DIRS += $(PROJECT_PATH)/../../../
include $(IDF_PATH)/make/project.mk

View File

@ -1,16 +0,0 @@
# ESPMQTT SSL Sample application
Navigate to the main directory
```
cd main
```
Generate a client key and a CSR. When you are generating the CSR, do not use the default values. At a minimum, the CSR must include the Country, Organisation and Common Name fields.
```
openssl genrsa -out client.key
openssl req -out client.csr -key client.key -new
```
Paste the generated CSR in the [Mosquitto test certificate signer](https://test.mosquitto.org/ssl/index.php), click Submit and copy the downloaded `client.crt` in the `main` directory.

View File

@ -1,15 +0,0 @@
menu "MQTT Application sample"
config WIFI_SSID
string "WiFi SSID"
default "myssid"
help
SSID (network name) for the example to connect to.
config WIFI_PASSWORD
string "WiFi Password"
default "mypassword"
help
WiFi password (WPA or WPA2) for the example to use.
endmenu

View File

@ -1,167 +0,0 @@
#include <stdio.h>
#include <stdint.h>
#include <stddef.h>
#include <string.h>
#include "esp_wifi.h"
#include "esp_system.h"
#include "nvs_flash.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "freertos/queue.h"
#include "freertos/event_groups.h"
#include "lwip/sockets.h"
#include "lwip/dns.h"
#include "lwip/netdb.h"
#include "esp_log.h"
#include "esp_event.h"
#include "tcpip_adapter.h"
#include "mqtt_client.h"
static const char *TAG = "MQTTS_SAMPLE";
static EventGroupHandle_t wifi_event_group;
const static int CONNECTED_BIT = BIT0;
static void wifi_event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
switch (event_id) {
case WIFI_EVENT_STA_START:
esp_wifi_connect();
break;
case WIFI_EVENT_STA_DISCONNECTED:
esp_wifi_connect();
xEventGroupClearBits(wifi_event_group, CONNECTED_BIT);
break;
default:
break;
}
return;
}
static void ip_event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
switch (event_id) {
case IP_EVENT_STA_GOT_IP:
xEventGroupSetBits(wifi_event_group, CONNECTED_BIT);
break;
default:
break;
}
return;
}
static void wifi_init(void)
{
tcpip_adapter_init();
wifi_event_group = xEventGroupCreate();
ESP_ERROR_CHECK(esp_event_loop_create_default());
ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, &wifi_event_handler, NULL));
ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &ip_event_handler, NULL));
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
ESP_ERROR_CHECK(esp_wifi_init(&cfg));
ESP_ERROR_CHECK(esp_wifi_set_storage(WIFI_STORAGE_RAM));
wifi_config_t wifi_config = {
.sta = {
.ssid = CONFIG_WIFI_SSID,
.password = CONFIG_WIFI_PASSWORD,
},
};
ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config));
ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******");
ESP_ERROR_CHECK(esp_wifi_start());
ESP_LOGI(TAG, "Waiting for wifi");
xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY);
}
extern const uint8_t client_cert_pem_start[] asm("_binary_client_crt_start");
extern const uint8_t client_cert_pem_end[] asm("_binary_client_crt_end");
extern const uint8_t client_key_pem_start[] asm("_binary_client_key_start");
extern const uint8_t client_key_pem_end[] asm("_binary_client_key_end");
static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
{
esp_mqtt_client_handle_t client = event->client;
int msg_id;
// your_context_t *context = event->context;
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
break;
default:
break;
}
return ESP_OK;
}
static void mqtt_app_start(void)
{
const esp_mqtt_client_config_t mqtt_cfg = {
.uri = "mqtts://test.mosquitto.org:8884",
.event_handle = mqtt_event_handler,
.client_cert_pem = (const char *)client_cert_pem_start,
.client_key_pem = (const char *)client_key_pem_start,
};
ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size());
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_start(client);
}
void app_main()
{
ESP_LOGI(TAG, "[APP] Startup..");
ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size());
ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version());
esp_log_level_set("*", ESP_LOG_INFO);
esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_TCP", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_SSL", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE);
esp_log_level_set("OUTBOX", ESP_LOG_VERBOSE);
nvs_flash_init();
wifi_init();
mqtt_app_start();
}

View File

@ -1 +0,0 @@
COMPONENT_EMBED_TXTFILES := client.crt client.key

View File

@ -223,6 +223,15 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client);
*/
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client);
/**
* @brief This api is typically used to force disconnection from the broker
*
* @param client mqtt client handle
*
* @return ESP_OK on success
*/
esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
/**
* @brief Stops mqtt client tasks
*

View File

@ -93,19 +93,19 @@ typedef struct mqtt_connect_info {
} mqtt_connect_info_t;
static inline int mqtt_get_type(uint8_t *buffer)
static inline int mqtt_get_type(const uint8_t *buffer)
{
return (buffer[0] & 0xf0) >> 4;
}
static inline int mqtt_get_connect_session_present(uint8_t *buffer)
static inline int mqtt_get_connect_session_present(const uint8_t *buffer)
{
return buffer[2] & 0x01;
}
static inline int mqtt_get_connect_return_code(uint8_t *buffer)
static inline int mqtt_get_connect_return_code(const uint8_t *buffer)
{
return buffer[3];
}
static inline int mqtt_get_dup(uint8_t *buffer)
static inline int mqtt_get_dup(const uint8_t *buffer)
{
return (buffer[0] & 0x08) >> 3;
}
@ -113,18 +113,18 @@ static inline void mqtt_set_dup(uint8_t *buffer)
{
buffer[0] |= 0x08;
}
static inline int mqtt_get_qos(uint8_t *buffer)
static inline int mqtt_get_qos(const uint8_t *buffer)
{
return (buffer[0] & 0x06) >> 1;
}
static inline int mqtt_get_retain(uint8_t *buffer)
static inline int mqtt_get_retain(const uint8_t *buffer)
{
return (buffer[0] & 0x01);
}
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint32_t buffer_length);
bool mqtt_header_complete(uint8_t *buffer, uint32_t buffer_length);
uint32_t mqtt_get_total_length(uint8_t *buffer, uint32_t length, int *fixed_size_len);
uint32_t mqtt_get_total_length(const uint8_t *buffer, uint32_t length, int *fixed_size_len);
char *mqtt_get_publish_topic(uint8_t *buffer, uint32_t *length);
char *mqtt_get_publish_data(uint8_t *buffer, uint32_t *length);
uint16_t mqtt_get_id(uint8_t *buffer, uint32_t length);

View File

@ -16,6 +16,7 @@ struct outbox_item;
typedef struct outbox_list_t *outbox_handle_t;
typedef struct outbox_item *outbox_item_handle_t;
typedef struct outbox_message *outbox_message_handle_t;
typedef long long outbox_tick_t;
typedef struct outbox_message {
uint8_t *data;
@ -35,17 +36,17 @@ typedef enum pending_state {
} pending_state_t;
outbox_handle_t outbox_init(void);
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick);
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick);
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
int outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout);
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, int tick);
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
int outbox_get_size(outbox_handle_t outbox);
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size);
void outbox_destroy(outbox_handle_t outbox);

View File

@ -32,4 +32,10 @@ void ms_to_timeval(int timeout_ms, struct timeval *tv);
ESP_LOGE(TAG,"%s:%d (%s): %s", __FILE__, __LINE__, __FUNCTION__, "Memory exhausted"); \
action; \
}
#define ESP_OK_CHECK(TAG, a, action) if ((a) != ESP_OK) { \
ESP_LOGE(TAG,"%s:%d (%s): %s", __FILE__, __LINE__, __FUNCTION__, "Failed with non ESP_OK err code"); \
action; \
}
#endif

View File

@ -140,7 +140,7 @@ void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint32_t buff
connection->buffer_length = buffer_length;
}
uint32_t mqtt_get_total_length(uint8_t *buffer, uint32_t length, int *fixed_size_len)
uint32_t mqtt_get_total_length(const uint8_t *buffer, uint32_t length, int *fixed_size_len)
{
int i;
uint32_t totlen = 0;

View File

@ -15,7 +15,7 @@ typedef struct outbox_item {
int msg_id;
int msg_type;
int msg_qos;
int tick;
outbox_tick_t tick;
int retry_count;
pending_state_t pending;
STAILQ_ENTRY(outbox_item) next;
@ -32,7 +32,7 @@ outbox_handle_t outbox_init(void)
return outbox;
}
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick)
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
{
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
ESP_MEM_CHECK(TAG, item, return NULL);
@ -67,7 +67,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
return NULL;
}
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick)
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
@ -131,7 +131,7 @@ esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t
return ESP_FAIL;
}
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, int tick)
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
@ -155,7 +155,7 @@ esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
return ESP_OK;
}
int outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout)
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
int deleted_items = 0;
outbox_item_handle_t item, tmp;
@ -199,7 +199,12 @@ esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size)
void outbox_destroy(outbox_handle_t outbox)
{
outbox_cleanup(outbox, 0);
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
}
free(outbox);
}

View File

@ -7,7 +7,6 @@
#include "esp_transport_tcp.h"
#include "esp_transport_ssl.h"
#include "esp_transport_ws.h"
#include "platform.h"
#include "mqtt_outbox.h"
#include "mqtt_supported_features.h"
@ -17,22 +16,12 @@
#ifdef MQTT_DISABLE_API_LOCKS
# define MQTT_API_LOCK(c)
# define MQTT_API_UNLOCK(c)
# define MQTT_API_LOCK_FROM_OTHER_TASK(c)
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c)
#else
# define MQTT_API_LOCK(c) xSemaphoreTake(c->api_lock, portMAX_DELAY)
# define MQTT_API_UNLOCK(c) xSemaphoreGive(c->api_lock)
# define MQTT_API_LOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreTake(c->api_lock, portMAX_DELAY); } }
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreGive(c->api_lock); } }
# define MQTT_API_LOCK(c) xSemaphoreTakeRecursive(c->api_lock, portMAX_DELAY)
# define MQTT_API_UNLOCK(c) xSemaphoreGiveRecursive(c->api_lock)
#endif /* MQTT_USE_API_LOCKS */
#ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
# define MQTT_TRANSPORT_SET_CERT_OR_KEY(setfn, key, len) \
{ if (key) { if (len) { setfn##_der(ssl, key, len); } else { setfn(ssl, key, strlen(key)); } } }
#else
# define MQTT_TRANSPORT_SET_CERT_OR_KEY(setfn, key, len) \
{ if (key) { setfn(ssl, key, strlen(key)); } }
#endif
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
static const char *TAG = "MQTT_CLIENT";
@ -63,9 +52,7 @@ typedef struct mqtt_state
typedef struct {
mqtt_event_callback_t event_handle;
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_handle_t event_loop_handle;
#endif
int task_stack;
int task_prio;
char *uri;
@ -82,6 +69,14 @@ typedef struct {
int num_alpn_protos;
char *clientkey_password;
int clientkey_password_len;
bool use_global_ca_store;
const char *cacert_buf;
size_t cacert_bytes;
const char *clientcert_buf;
size_t clientcert_bytes;
const char *clientkey_buf;
size_t clientkey_bytes;
const struct psk_key_hint *psk_hint_key;
} mqtt_config_storage_t;
typedef enum {
@ -121,6 +116,7 @@ struct esp_mqtt_client {
const static int STOPPED_BIT = BIT0;
const static int RECONNECT_BIT = BIT1;
const static int DISCONNECT_BIT = BIT2;
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client);
@ -131,9 +127,126 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
static char *create_string(const char *ptr, int len);
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms);
#if MQTT_ENABLE_SSL
enum esp_mqtt_ssl_cert_key_api {
MQTT_SSL_DATA_API_CA_CERT,
MQTT_SSL_DATA_API_CLIENT_CERT,
MQTT_SSL_DATA_API_CLIENT_KEY,
MQTT_SSL_DATA_API_MAX,
};
static esp_err_t esp_mqtt_set_cert_key_data(esp_transport_handle_t ssl, enum esp_mqtt_ssl_cert_key_api what, const char *cert_key_data, int cert_key_len)
{
char *data = (char *)cert_key_data;
int ssl_transport_api_id = what;
int len = cert_key_len;
if (!data) {
return ESP_OK;
}
if (len == 0) {
// if length not specified, expect 0-terminated PEM string
// and the original transport_api_id (by convention after the last api_id in the enum)
ssl_transport_api_id += MQTT_SSL_DATA_API_MAX;
len = strlen(data);
}
#ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
else {
ESP_LOGE(TAG, "Explicit cert-/key-len is not available in IDF version %s", IDF_VER);
return ESP_ERR_NOT_SUPPORTED;
}
#endif
// option to force the cert/key config to null (i.e. skip validation) when existing config updates
if (0 == strcmp(data, "NULL")) {
data = NULL;
len = 0;
}
switch (ssl_transport_api_id) {
#ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
case MQTT_SSL_DATA_API_CA_CERT:
esp_transport_ssl_set_cert_data_der(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_CERT:
esp_transport_ssl_set_client_cert_data_der(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_KEY:
esp_transport_ssl_set_client_key_data_der(ssl, data, len);
break;
#endif
case MQTT_SSL_DATA_API_CA_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_cert_data(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_cert_data(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_KEY + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_key_data(ssl, data, len);
break;
default:
return ESP_ERR_INVALID_ARG;
}
return ESP_OK;
}
static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle_t transport_list, mqtt_config_storage_t *cfg)
{
esp_transport_handle_t ssl = esp_transport_list_get_transport(transport_list, "mqtts");
if (cfg->use_global_ca_store == true) {
esp_transport_ssl_enable_global_ca_store(ssl);
} else {
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes),
goto esp_mqtt_set_transport_failed);
}
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_CERT, cfg->clientcert_buf, cfg->clientcert_bytes),
goto esp_mqtt_set_transport_failed);
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_KEY, cfg->clientkey_buf, cfg->clientkey_bytes),
goto esp_mqtt_set_transport_failed);
if (cfg->clientkey_password && cfg->clientkey_password_len) {
#if defined(MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD) && MQTT_ENABLE_SSL
esp_transport_ssl_set_client_key_password(ssl,
cfg->clientkey_password,
cfg->clientkey_password_len);
#else
ESP_LOGE(TAG, "Password protected keys are not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->psk_hint_key) {
#if defined(MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION) && MQTT_ENABLE_SSL
esp_transport_ssl_set_psk_key_hint(ssl, cfg->psk_hint_key);
#else
ESP_LOGE(TAG, "PSK authentication is not available in IDF version %s", IDF_VER);
goto _mqtt_set_config_failed;
#endif
}
if (cfg->alpn_protos) {
#if defined(MQTT_SUPPORTED_FEATURE_ALPN) && MQTT_ENABLE_SSL
ESP_LOGE(TAG, "%p", cfg->alpn_protos);
esp_transport_ssl_set_alpn_protocol(ssl, (const char **)cfg->alpn_protos);
#else
ESP_LOGE(TAG, "APLN is not available in IDF version %s", IDF_VER);
goto _mqtt_set_config_failed;
#endif
}
return ESP_OK;
esp_mqtt_set_transport_failed:
return ESP_FAIL;
}
#endif // MQTT_ENABLE_SSL
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
MQTT_API_LOCK_FROM_OTHER_TASK(client);
MQTT_API_LOCK(client);
//Copy user configurations to client context
esp_err_t err = ESP_OK;
mqtt_config_storage_t *cfg;
@ -142,7 +255,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
} else {
cfg = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, cfg, {
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
});
client->config = cfg;
@ -270,7 +383,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
cfg->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
}
if(config->alpn_protos) {
if (config->alpn_protos) {
for (int i = 0; i < cfg->num_alpn_protos; i++) {
free(cfg->alpn_protos[i]);
}
@ -292,23 +405,62 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
}
}
// configure ssl related parameters
cfg->use_global_ca_store = config->use_global_ca_store;
cfg->cacert_buf = config->cert_pem;
cfg->cacert_bytes = config->cert_len;
cfg->clientcert_buf = config->client_cert_pem;
cfg->clientcert_bytes = config->client_cert_len;
cfg->clientkey_buf = config->client_key_pem;
cfg->clientkey_bytes = config->client_key_len;
cfg->psk_hint_key = config->psk_hint_key;
if (config->clientkey_password && config->clientkey_password_len) {
cfg->clientkey_password_len = config->clientkey_password_len;
cfg->clientkey_password = malloc(cfg->clientkey_password_len);
memcpy(cfg->clientkey_password, config->clientkey_password, cfg->clientkey_password_len);
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
if (config->transport) {
free(client->config->scheme);
if (config->transport == MQTT_TRANSPORT_OVER_WS) {
cfg->scheme = create_string("ws", 2);
ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed);
} else if (config->transport == MQTT_TRANSPORT_OVER_TCP) {
cfg->scheme = create_string("mqtt", 4);
ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed);
} else if (config->transport == MQTT_TRANSPORT_OVER_SSL) {
cfg->scheme = create_string("mqtts", 5);
ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed);
} else if (config->transport == MQTT_TRANSPORT_OVER_WSS) {
cfg->scheme = create_string("wss", 3);
ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed);
}
}
// Set uri at the end of config to override separately configured uri elements
if (config->uri) {
if (esp_mqtt_client_set_uri(client, cfg->uri) != ESP_OK) {
err = ESP_FAIL;
goto _mqtt_set_config_failed;
}
}
MQTT_API_UNLOCK(client);
return ESP_OK;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return err;
}
static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
{
mqtt_config_storage_t *cfg = client->config;
if (cfg == NULL) {
return ESP_ERR_INVALID_STATE;
}
free(cfg->host);
free(cfg->uri);
free(cfg->path);
@ -331,6 +483,7 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
#endif
memset(cfg, 0, sizeof(mqtt_config_storage_t));
free(client->config);
client->config = NULL;
return ESP_OK;
}
@ -436,20 +589,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
free(client);
return NULL;
}
client->api_lock = xSemaphoreCreateMutex();
client->api_lock = xSemaphoreCreateRecursiveMutex();
if (!client->api_lock) {
free(client->event.error_handle);
free(client);
return NULL;
}
esp_mqtt_set_config(client, config);
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = 1,
.task_name = NULL,
};
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
#endif
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
@ -457,10 +603,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
ESP_MEM_CHECK(TAG, tcp, goto _mqtt_init_failed);
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, tcp, "mqtt");
if (config->transport == MQTT_TRANSPORT_OVER_TCP) {
client->config->scheme = create_string("mqtt", 4);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
}
#if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
@ -470,10 +612,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
esp_transport_ws_set_subprotocol(ws, "mqtt");
#endif
esp_transport_list_add(client->transport_list, ws, "ws");
if (config->transport == MQTT_TRANSPORT_OVER_WS) {
client->config->scheme = create_string("ws", 2);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
}
#endif
#if MQTT_ENABLE_SSL
@ -481,51 +619,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
ESP_MEM_CHECK(TAG, ssl, goto _mqtt_init_failed);
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
#ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
if (config->cert_len || config->client_cert_len || config->client_key_len) {
ESP_LOGE(TAG, "Explicit cert-/key-len is not available in IDF version %s", IDF_VER);
goto _mqtt_init_failed;
}
#endif
if (config->use_global_ca_store == true) {
esp_transport_ssl_enable_global_ca_store(ssl);
} else if (config->cert_pem) {
MQTT_TRANSPORT_SET_CERT_OR_KEY(esp_transport_ssl_set_cert_data, config->cert_pem, config->cert_len);
}
MQTT_TRANSPORT_SET_CERT_OR_KEY(esp_transport_ssl_set_client_cert_data, config->client_cert_pem, config->client_cert_len);
MQTT_TRANSPORT_SET_CERT_OR_KEY(esp_transport_ssl_set_client_key_data, config->client_key_pem, config->client_key_len);
#ifdef MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD
if (client->config->clientkey_password && client->config->clientkey_password_len) {
esp_transport_ssl_set_client_key_password(ssl,
client->config->clientkey_password,
client->config->clientkey_password_len);
}
#endif
if (config->psk_hint_key) {
#ifdef MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION
esp_transport_ssl_set_psk_key_hint(ssl, config->psk_hint_key);
#else
ESP_LOGE(TAG, "PSK authentication is not available in IDF version %s", IDF_VER);
goto _mqtt_init_failed;
#endif
}
if (client->config->alpn_protos) {
#ifdef MQTT_SUPPORTED_FEATURE_ALPN
esp_transport_ssl_set_alpn_protocol(ssl, (const char **)client->config->alpn_protos);
#else
ESP_LOGE(TAG, "APLN is not available in IDF version %s", IDF_VER);
goto _mqtt_init_failed;
#endif
}
esp_transport_list_add(client->transport_list, ssl, "mqtts");
if (config->transport == MQTT_TRANSPORT_OVER_SSL) {
client->config->scheme = create_string("mqtts", 5);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
}
#endif
#if MQTT_ENABLE_WSS
@ -536,16 +630,19 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
#endif
esp_transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, wss, "wss");
if (config->transport == MQTT_TRANSPORT_OVER_WSS) {
client->config->scheme = create_string("wss", 3);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
}
#endif
if (client->config->uri) {
if (esp_mqtt_client_set_uri(client, client->config->uri) != ESP_OK) {
goto _mqtt_init_failed;
}
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
if (esp_mqtt_set_config(client, config) != ESP_OK) {
goto _mqtt_init_failed;
}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = 1,
.task_name = NULL,
};
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
#endif
client->keepalive_tick = platform_tick_get_ms();
client->reconnect_tick = platform_tick_get_ms();
@ -625,7 +722,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
}
// This API could be also executed when client is active (need to protect config fields)
MQTT_API_LOCK_FROM_OTHER_TASK(client);
MQTT_API_LOCK(client);
// set uri overrides actual scheme, host, path if configured previously
free(client->config->scheme);
free(client->config->host);
@ -663,7 +760,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
free(user_info);
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return ESP_OK;
}
@ -757,15 +854,13 @@ post_data_event:
esp_mqtt_dispatch_event(client);
if (msg_read_len < msg_total_len) {
// if total data is longer then actual -> read payload only
size_t buf_len = client->mqtt_state.in_buffer_length;
esp_transport_handle_t transport = esp_transport_get_payload_transport_handle(client->transport);
msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = NULL;
msg_topic_len = 0;
msg_data_offset += msg_data_len;
msg_data_len = esp_transport_read(transport, (char *)client->mqtt_state.in_buffer,
msg_data_len = esp_transport_read(client-> transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (msg_data_len <= 0) {
@ -871,8 +966,6 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
buf++;
client->mqtt_state.in_buffer_read_len++;
}
/* any further reading only the underlying payload */
t = esp_transport_get_payload_transport_handle(t);
if ((client->mqtt_state.in_buffer_read_len == 1) ||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
do {
@ -1100,8 +1193,8 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
uint32_t last_retransmit = 0;
int32_t msg_tick = 0;
uint64_t last_retransmit = 0;
outbox_tick_t msg_tick = 0;
client->run = true;
//get transport by scheme
@ -1122,7 +1215,7 @@ static void esp_mqtt_task(void *pv)
MQTT_API_LOCK(client);
switch ((int)client->state) {
case MQTT_STATE_INIT:
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
xEventGroupClearBits(client->status_bits, RECONNECT_BIT | DISCONNECT_BIT);
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
esp_mqtt_dispatch_event_with_msgid(client);
@ -1130,6 +1223,9 @@ static void esp_mqtt_task(void *pv)
ESP_LOGE(TAG, "There is no transport");
client->run = false;
}
#if MQTT_ENABLE_SSL
esp_mqtt_set_ssl_transport_properties(client->transport_list, client->config);
#endif
if (esp_transport_connect(client->transport,
client->config->host,
@ -1162,6 +1258,11 @@ static void esp_mqtt_task(void *pv)
break;
case MQTT_STATE_CONNECTED:
// check for disconnection request
if (xEventGroupWaitBits(client->status_bits, DISCONNECT_BIT, true, true, 0) & DISCONNECT_BIT) {
esp_mqtt_abort_connection(client);
break;
}
// receive and process data
if (mqtt_process_receive(client) == ESP_FAIL) {
esp_mqtt_abort_connection(client);
@ -1257,10 +1358,10 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK_FROM_OTHER_TASK(client);
MQTT_API_LOCK(client);
if (client->state >= MQTT_STATE_INIT) {
ESP_LOGE(TAG, "Client has started");
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
esp_err_t err = ESP_OK;
@ -1277,13 +1378,21 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
err = ESP_FAIL;
}
#endif
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return err;
}
esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client)
{
ESP_LOGI(TAG, "Client asked to disconnect");
xEventGroupSetBits(client->status_bits, DISCONNECT_BIT);
return ESP_OK;
}
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
{
ESP_LOGI(TAG, "Client force reconnect requested");
if (client->state != MQTT_STATE_WAIT_TIMEOUT) {
ESP_LOGD(TAG, "The client is not waiting for reconnection. Ignore the request");
return ESP_FAIL;
@ -1295,7 +1404,7 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
{
MQTT_API_LOCK_FROM_OTHER_TASK(client);
MQTT_API_LOCK(client);
if (client->run) {
// Only send the disconnect message if the client is connected
if(client->state == MQTT_STATE_CONNECTED) {
@ -1303,6 +1412,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Disconnect message cannot be created");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
if (mqtt_write_data(client) != ESP_OK) {
@ -1312,12 +1422,12 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
client->run = false;
client->state = MQTT_STATE_UNKNOWN;
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
return ESP_OK;
} else {
ESP_LOGW(TAG, "Client asked to stop, but was not started");
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
}
@ -1340,10 +1450,10 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
{
MQTT_API_LOCK_FROM_OTHER_TASK(client);
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return -1;
}
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
@ -1361,20 +1471,20 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return -1;
}
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id;
}
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
{
MQTT_API_LOCK_FROM_OTHER_TASK(client);
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_CONNECTED) {
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
@ -1394,12 +1504,12 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return -1;
}
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id;
}
@ -1417,7 +1527,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
len = strlen(data);
}
MQTT_API_LOCK_FROM_OTHER_TASK(client);
MQTT_API_LOCK(client);
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len,
qos, retain,
@ -1425,12 +1535,12 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
if (publish_msg->length == 0) {
ESP_LOGE(TAG, "Publish message cannot be created");
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return -1;
}
/* We have to set as pending all the qos>0 messages */
client->mqtt_state.outbound_message = publish_msg;
if (qos > 0) {
client->mqtt_state.outbound_message = publish_msg;
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
@ -1438,9 +1548,10 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
mqtt_enqueue(client);
} else {
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment);
}
} else {
client->mqtt_state.outbound_message = publish_msg;
}
/* Skip sending if not connected (rely on resending) */
@ -1463,22 +1574,14 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
}
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0;
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
remaining_len -= data_sent;
current_data += data_sent;
if (remaining_len > 0) {
mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection;
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
if (connection->message.fragmented_msg_data_offset) {
// asked to enqueue oversized message (first time only)
connection->message.fragmented_msg_data_offset = 0;
connection->message.fragmented_msg_total_length = 0;
if (qos > 0) {
// internally enqueue all big messages, as they dont fit 'pending msg' structure
mqtt_enqueue_oversized(client, (uint8_t *)current_data, remaining_len);
}
}
if (remaining_len > connection->buffer_length) {
// Continue with sending
memcpy(connection->buffer, current_data, connection->buffer_length);
@ -1502,14 +1605,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return pending_msg_id;
cannot_publish:
// clear out possible fragmented publish if failed or skipped
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
if (qos == 0) {
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
MQTT_API_UNLOCK(client);
return ret;
}