support for custom implementation of msg outbox

This commit is contained in:
David Cermak
2018-08-15 17:24:46 +02:00
parent fa7ade77b1
commit 0c25441fdd
7 changed files with 56 additions and 40 deletions

View File

@@ -93,4 +93,10 @@ choice
bool "Core 1" bool "Core 1"
endchoice endchoice
config MQTT_CUSTOM_OUTBOX
bool "Enable custom outbox implementation"
default n
help
Set to true if a specific implementation of message outbox is needed (e.g. persistant outbox in NVM or similar).
endmenu endmenu

View File

@@ -18,4 +18,8 @@ config BROKER_URL
help help
URL of the broker to connect to URL of the broker to connect to
config BROKER_URL_FROM_STDIN
bool
default y if BROKER_URL = "FROM_STDIN"
endmenu endmenu

View File

@@ -1,5 +1,4 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <stdint.h> #include <stdint.h>
#include <stddef.h> #include <stddef.h>
#include <string.h> #include <string.h>
@@ -38,14 +37,14 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0); msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
// ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
// ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
// ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break; break;
case MQTT_EVENT_DISCONNECTED: case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
@@ -53,8 +52,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
case MQTT_EVENT_SUBSCRIBED: case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
// msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
// ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break; break;
case MQTT_EVENT_UNSUBSCRIBED: case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
@@ -118,13 +117,15 @@ static void wifi_init(void)
static void mqtt_app_start(void) static void mqtt_app_start(void)
{ {
char line[128];
esp_mqtt_client_config_t mqtt_cfg = { esp_mqtt_client_config_t mqtt_cfg = {
.uri = CONFIG_BROKER_URL, .uri = CONFIG_BROKER_URL,
.event_handle = mqtt_event_handler, .event_handle = mqtt_event_handler,
// .user_context = (void *)your_context // .user_context = (void *)your_context
}; };
#if CONFIG_BROKER_URL_FROM_STDIN
char line[128];
if (strcmp(mqtt_cfg.uri, "FROM_STDIN") == 0) { if (strcmp(mqtt_cfg.uri, "FROM_STDIN") == 0) {
int count = 0; int count = 0;
printf("Please enter url of mqtt broker\n"); printf("Please enter url of mqtt broker\n");
@@ -141,7 +142,11 @@ static void mqtt_app_start(void)
} }
mqtt_cfg.uri = line; mqtt_cfg.uri = line;
printf("Broker url: %s\n", line); printf("Broker url: %s\n", line);
} else {
ESP_LOGE(TAG, "Configuration mismatch: wrong broker url");
abort();
} }
#endif /* CONFIG_BROKER_URL_FROM_STDIN */
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_start(client); esp_mqtt_client_start(client);

View File

@@ -40,8 +40,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
// ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break; break;
case MQTT_EVENT_DISCONNECTED: case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
@@ -49,10 +49,7 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
case MQTT_EVENT_SUBSCRIBED: case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_1", 0, 1, 0); msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
printf("sending msgid=%d\n", msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data_0", 0, 0, 0);
printf("sending msgid=%d\n", msg_id);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break; break;
case MQTT_EVENT_UNSUBSCRIBED: case MQTT_EVENT_UNSUBSCRIBED:

View File

@@ -11,21 +11,10 @@
extern "C" { extern "C" {
#endif #endif
typedef struct outbox_item { struct outbox_item;
char *buffer;
int len;
int msg_id;
int msg_type;
int tick;
int retry_count;
bool pending;
STAILQ_ENTRY(outbox_item) next;
} outbox_item_t;
STAILQ_HEAD(outbox_list_t, outbox_item);
typedef struct outbox_list_t * outbox_handle_t; typedef struct outbox_list_t * outbox_handle_t;
typedef outbox_item_t *outbox_item_handle_t; typedef struct outbox_item * outbox_item_handle_t;
outbox_handle_t outbox_init(); outbox_handle_t outbox_init();
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick); outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick);

View File

@@ -4,8 +4,25 @@
#include "rom/queue.h" #include "rom/queue.h"
#include "esp_log.h" #include "esp_log.h"
#ifndef CONFIG_MQTT_CUSTOM_OUTBOX
static const char *TAG = "OUTBOX"; static const char *TAG = "OUTBOX";
typedef struct outbox_item {
char *buffer;
int len;
int msg_id;
int msg_type;
int tick;
int retry_count;
bool pending;
STAILQ_ENTRY(outbox_item) next;
} outbox_item_t;
STAILQ_HEAD(outbox_list_t, outbox_item);
outbox_handle_t outbox_init() outbox_handle_t outbox_init()
{ {
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t)); outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t));
@@ -149,3 +166,5 @@ void outbox_destroy(outbox_handle_t outbox)
outbox_cleanup(outbox, 0); outbox_cleanup(outbox, 0);
free(outbox); free(outbox);
} }
#endif /* CONFIG_MQTT_CUSTOM_OUTBOX */

View File

@@ -844,10 +844,6 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
if (len <= 0) { if (len <= 0) {
len = strlen(data); len = strlen(data);
} }
// Note: Need to enqueue the qos>0 msg
// if (qos > 0) {
// mqtt_enqueue(client);
// }
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len, topic, data, len,
@@ -857,7 +853,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_msg_count ++; client->mqtt_state.pending_msg_count ++;
// Note: Need to enqueue the qos>0 msg to pass https://code.google.com/archive/p/mqtt4erl/wikis/QualityOfServiceUseCases.wiki /* Have to enqueue all the qos>0 messages) */
mqtt_enqueue(client); mqtt_enqueue(client);
} }