feat(mosq): Add example with two brokers synced on P2P

Broker-less two chip example which virtual private IoT
networks on MQTT protocol.
This commit is contained in:
David Cermak
2024-12-11 19:21:46 +01:00
parent 269351f41c
commit d57b8c5b29
15 changed files with 759 additions and 4 deletions

View File

@ -17,7 +17,8 @@ jobs:
runs-on: ubuntu-22.04
container: espressif/idf:${{ matrix.idf_ver }}
env:
TEST_DIR: components/mosquitto/examples/broker
TEST_DIR: components/mosquitto/examples
TARGET_TEST: broker
TARGET_TEST_DIR: build_esp32_default
steps:
- name: Checkout esp-protocols
@ -29,14 +30,15 @@ jobs:
run: |
. ${IDF_PATH}/export.sh
pip install idf-component-manager idf-build-apps --upgrade
python ci/build_apps.py ${TEST_DIR}
cd ${TEST_DIR}
python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml
# upload only the target test artifacts
cd ${TEST_DIR}/${TARGET_TEST}
${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR}
zip -qur artifacts.zip ${TARGET_TEST_DIR}
- uses: actions/upload-artifact@v4
with:
name: mosq_target_esp32_${{ matrix.idf_ver }}
path: ${{ env.TEST_DIR }}/artifacts.zip
path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip
if-no-files-found: error
test_mosq:

View File

@ -0,0 +1 @@
components/mosquitto/examples/serverless_mqtt/components/libjuice/port/juice_random.c

View File

@ -0,0 +1,3 @@
components/mosquitto/examples/serverless_mqtt:
disable:
- if: IDF_TARGET not in ["esp32", "esp32s3", "esp32c3"]

View File

@ -0,0 +1,6 @@
# The following five lines of boilerplate have to be in your project's
# CMakeLists in this exact order for cmake to work correctly
cmake_minimum_required(VERSION 3.16)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
project(serverless_mqtt)

View File

@ -0,0 +1,53 @@
# Brokerless MQTT Example
MQTT served by (two) mosquitto's running on two ESP chips.
* Leverages MQTT connectivity between two private networks without cloud premisses.
* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE protocol, by [libjuice](https://github.com/paullouisageneau/libjuice)).
## How it works
This example needs two ESP32 chipsets, that will create two separate Wi-Fi networks (IoT networks) used for IoT devices.
Each IoT network is served by an MQTT server (using mosquitto component).
This example will also synchronize these two MQTT brokers, as if there was only one IoT network with one broker.
This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) to traverse NATs, which enabling direct connection between two private networks behind NATs.
* Diagram
![demo](serverless.png)
Here's a step-by-step procedure of establishing this remote connection:
1) Initialize and start Wi-Fi AP (for IoT networks) and Wi-Fi station (for internet connection)
2) Start mosquitto broker on IoT network
3) Start libjuice to gather connection candidates
4) Synchronize using a public MQTT broker and exchange ICE descriptors
5) Establish ICE UDP connection between the two ESP32 chipsets
6) Start forwarding mqtt messages
- Each remote datagram (received from ICE-UDP channel) is re-published to the local MQTT server
- Each local MQTT message (received from mosquitto on_message callback) is sent in ICE-UDP datagram
## How to use this example
You need two ESP32 devices that support Wi-Fi station and Wi-Fi software access point.
* Configure Wi-Fi credentials for both devices on both interfaces
* These devices would be deployed in distinct Wi-Fi environments, so the Wi-Fi station credentials would likely be different.
* They also create their own IoT network (on the soft-AP interface) Wi-Fi, so the AP credentials would likely be the same, suggesting the IoT networks will be keep synchronized (even though these are two distict Wi-Fi networks).
* Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role.
* Optionally: You can use `idf.py` `-D` and `-B` flag to keep separate build directories and sdkconfigs for these two roles
```
idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor
```
* Flash and run the two devices and wait for them to connect and synchronize.
* Now you can test MQTT connectivity, for example:
* Join PEER1 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics.
* Join PEER2 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics.
* Whenever you publish to a topic, all subscribed clients should receive the message, no matter which Wi-Fi network they're connected to.
## Warning
This example uses libjuice as a dependency:
* libjuice (UDP Interactive Connectivity Establishment): https://github.com/paullouisageneau/libjuice
which is distributed under Mozilla Public License v2.0.

View File

@ -0,0 +1,44 @@
set(LIBJUICE_VERSION "73785387eafe15c02b6a210edb10f722474e8e14")
set(LIBJUICE_URL "https://github.com/paullouisageneau/libjuice/archive/${LIBJUICE_VERSION}.zip")
set(libjuice_dir ${CMAKE_BINARY_DIR}/libjuice/libjuice-${LIBJUICE_VERSION})
# Fetch the library
if(NOT EXISTS ${libjuice_dir})
message(STATUS "Downloading libjuice ${LIBJUICE_VERSION}...")
file(DOWNLOAD ${LIBJUICE_URL} ${CMAKE_BINARY_DIR}/libjuice.zip SHOW_PROGRESS)
execute_process(COMMAND unzip -o ${CMAKE_BINARY_DIR}/libjuice.zip -d ${CMAKE_BINARY_DIR}/libjuice
WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
endif()
set(JUICE_SOURCES ${libjuice_dir}/src/addr.c
${libjuice_dir}/src/agent.c
${libjuice_dir}/src/base64.c
${libjuice_dir}/src/conn.c
${libjuice_dir}/src/conn_mux.c
${libjuice_dir}/src/conn_poll.c
${libjuice_dir}/src/conn_thread.c
${libjuice_dir}/src/const_time.c
${libjuice_dir}/src/crc32.c
${libjuice_dir}/src/hash.c
${libjuice_dir}/src/ice.c
${libjuice_dir}/src/juice.c
${libjuice_dir}/src/log.c
${libjuice_dir}/src/server.c
${libjuice_dir}/src/stun.c
${libjuice_dir}/src/timestamp.c
${libjuice_dir}/src/turn.c
${libjuice_dir}/src/udp.c
# Use hmac from mbedtls and random numbers from esp_random:
# ${libjuice_dir}/src/hmac.c
# ${libjuice_dir}/src/random.c
)
idf_component_register(SRCS port/juice_random.c
${JUICE_SOURCES}
INCLUDE_DIRS "include" "${libjuice_dir}/include" "${libjuice_dir}/include/juice"
REQUIRES esp_netif
PRIV_REQUIRES sock_utils)
target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format")
set_source_files_properties(${libjuice_dir}/src/udp.c PROPERTIES COMPILE_FLAGS -Wno-unused-variable)

View File

@ -0,0 +1,13 @@
/*
* SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#pragma once
// Purpose of this header is to replace udp_sendto() to avoid name conflict with lwip
// added here since ifaddrs.h is included from juice_udp sources
#define udp_sendto juice_udp_sendto
// other than that, let's just include the ifaddrs (from sock_utils)
#include_next "ifaddrs.h"

View File

@ -0,0 +1,40 @@
/**
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include "esp_random.h"
void juice_random(void *buf, size_t size)
{
esp_fill_random(buf, size);
}
void juice_random_str64(char *buf, size_t size)
{
static const char chars64[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
size_t i = 0;
for (i = 0; i + 1 < size; ++i) {
uint8_t byte = 0;
juice_random(&byte, 1);
buf[i] = chars64[byte & 0x3F];
}
buf[i] = '\0';
}
uint32_t juice_rand32(void)
{
uint32_t r = 0;
juice_random(&r, sizeof(r));
return r;
}
uint64_t juice_rand64(void)
{
uint64_t r = 0;
juice_random(&r, sizeof(r));
return r;
}

View File

@ -0,0 +1,4 @@
idf_component_register(SRCS "serverless_mqtt.c"
"wifi_connect.c"
INCLUDE_DIRS "."
REQUIRES libjuice nvs_flash mqtt json esp_wifi)

View File

@ -0,0 +1,85 @@
menu "Example Configuration"
menu "AP Configuration"
comment "AP Configuration"
config EXAMPLE_AP_SSID
string "Wi-Fi SSID"
default "myssid"
help
Set the SSID of Wi-Fi ap interface.
config EXAMPLE_AP_PASSWORD
string "Wi-Fi Password"
default "12345678"
help
Set the password of Wi-Fi ap interface.
endmenu
menu "STA Configuration"
comment "STA Configuration"
config EXAMPLE_STA_SSID
string "WiFi Station SSID"
default "mystationssid"
help
SSID for the example's sta to connect to.
config EXAMPLE_STA_PASSWORD
string "WiFi Station Password"
default "mystationpassword"
help
WiFi station password for the example to use.
endmenu
config EXAMPLE_MQTT_BROKER_URI
string "MQTT Broker URL"
default "mqtt://mqtt.eclipseprojects.io"
help
URL of the mqtt broker use for synchronisation and exchanging
ICE connect info (description and candidates).
config EXAMPLE_MQTT_SYNC_TOPIC
string "MQTT topic for synchronisation"
default "/topic/serverless_mqtt"
help
MQTT topic used fo synchronisation.
config EXAMPLE_STUN_SERVER
string "Hostname of STUN server"
default "stun.l.google.com"
help
STUN server hostname.
config EXAMPLE_MQTT_CLIENT_STACK_SIZE
int "Stack size for mqtt client"
default 16384
help
Set stack size for the mqtt client.
Need more stack, since calling juice API from the handler.
config EXAMPLE_MQTT_BROKER_PORT
int "port for the mosquitto to listen to"
default 1883
help
This is a port which the local mosquitto uses.
choice EXAMPLE_SERVERLESS_ROLE
prompt "Choose your role"
default EXAMPLE_SERVERLESS_ROLE_PEER1
help
Choose either peer1 or peer2.
It's not very important which device is peer1
(peer-1 sends sync messages, peer2 listens for them)
It is important that we have two peers,
one with peer1 config, another one with peer2 config
config EXAMPLE_SERVERLESS_ROLE_PEER1
bool "peer1"
config EXAMPLE_SERVERLESS_ROLE_PEER2
bool "peer2"
endchoice
endmenu

View File

@ -0,0 +1,5 @@
## IDF Component Manager Manifest File
dependencies:
espressif/mosquitto:
override_path: ../../..
espressif/sock_utils: "*"

View File

@ -0,0 +1,374 @@
/*
* SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include <stdio.h>
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "mqtt_client.h"
#include "esp_wifi.h"
#include "esp_log.h"
#include "esp_random.h"
#include "esp_check.h"
#include "esp_sleep.h"
#include "mosq_broker.h"
#include "juice/juice.h"
#include "cJSON.h"
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
#define OUR_PEER "1"
#define THEIR_PEER "2"
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
#define OUR_PEER "2"
#define THEIR_PEER "1"
#endif
#define PEER_SYNC0 BIT(0)
#define PEER_SYNC1 BIT(1)
#define PEER_SYNC2 BIT(2)
#define PEER_FAIL BIT(3)
#define PEER_GATHER_DONE BIT(4)
#define PEER_DESC_PUBLISHED BIT(5)
#define PEER_CONNECTED BIT(6)
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
typedef struct message_wrap {
uint16_t topic_len;
uint16_t data_len;
char data[];
} __attribute__((packed)) message_wrap_t;
static const char *TAG = "serverless_mqtt" OUR_PEER;
static char s_buffer[MAX_BUFFER_SIZE];
static EventGroupHandle_t s_state = NULL;
static juice_agent_t *s_agent = NULL;
static cJSON *s_peer_desc_json = NULL;
static char *s_peer_desc = NULL;
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
char *wifi_get_ipv4(wifi_interface_t interface);
esp_err_t wifi_connect(void);
static esp_err_t sync_peers(void);
static esp_err_t create_candidates(void);
static esp_err_t create_local_client(void);
static esp_err_t create_local_broker(void);
void app_main(void)
{
__attribute__((__unused__)) esp_err_t ret;
ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi");
ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker");
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
if (bits & PEER_CONNECTED) {
ESP_LOGI(TAG, "Peer is connected!");
ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client");
ESP_LOGI(TAG, "Everything is ready, exiting main task");
return;
}
err:
ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)");
esp_deep_sleep(1000000LL * (esp_random() % 20));
}
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
}
xEventGroupSetBits(s_state, PEER_SYNC0);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
break;
}
EventBits_t bits = xEventGroupGetBits(s_state);
if (event->data_len > 1 && s_agent) {
cJSON *root = cJSON_Parse(event->data);
if (root == NULL) {
break;
}
cJSON *desc = cJSON_GetObjectItem(root, "desc");
if (desc == NULL) {
cJSON_Delete(root);
break;
}
printf("desc->valuestring:%s\n", desc->valuestring);
juice_set_remote_description(s_agent, desc->valuestring);
char cand_name[] = "cand0";
while (true) {
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
if (cand == NULL) {
break;
}
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
juice_add_remote_candidate(s_agent, cand->valuestring);
cand_name[4]++;
}
cJSON_Delete(root);
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
// and destroy the mqtt client
}
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
}
#else
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC1);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
}
#endif
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
static esp_err_t sync_peers(void)
{
esp_err_t ret = ESP_OK;
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
};
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
err, TAG, "Failed to register mqtt event handler");
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
ESP_LOGI(TAG, "Waiting for the other peer...");
const int max_sync_retry = 60;
int retry = 0;
while (true) {
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
if (bits & PEER_SYNC2) {
break;
}
if (bits & PEER_SYNC1) {
continue;
}
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish mqtt message");
#endif
}
ESP_LOGI(TAG, "Sync done");
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish peer's description");
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
err:
free(s_peer_desc);
esp_mqtt_client_destroy(client);
return ret;
}
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
{
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
if (state == JUICE_STATE_CONNECTED) {
xEventGroupSetBits(s_state, PEER_CONNECTED);
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
esp_restart();
}
}
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
{
static uint8_t cand_nr = 0;
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
char cand_name[] = "cand0";
cand_name[4] += cand_nr++;
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
}
}
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
{
ESP_LOGI(TAG, "Gathering done");
if (s_state) {
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
}
}
#define ALIGN(size) (((size) + 3U) & ~(3U))
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
{
if (s_local_mqtt) {
message_wrap_t *message = (message_wrap_t *)data;
int topic_len = message->topic_len;
int payload_len = message->data_len;
int topic_len_aligned = ALIGN(topic_len);
char *topic = message->data;
char *payload = message->data + topic_len_aligned;
if (topic_len + topic_len_aligned + 4 > size) {
ESP_LOGE(TAG, "Received invalid message");
return;
}
ESP_LOGI(TAG, "forwarding remote message: topic:%s", topic);
ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload);
esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0);
}
}
static esp_err_t create_candidates(void)
{
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
s_peer_desc_json = cJSON_CreateObject();
esp_err_t ret = ESP_OK;
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
.stun_server_port = 19302,
.cb_state_changed = juice_state,
.cb_candidate = juice_candidate,
.cb_gathering_done = juice_gathering_done,
.cb_recv = juice_recv,
};
s_agent = juice_create(&config);
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to get local description");
ESP_LOGI(TAG, "desc: %s", s_buffer);
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
s_peer_desc = cJSON_Print(s_peer_desc_json);
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
cJSON_Delete(s_peer_desc_json);
return ESP_OK;
err:
juice_destroy(s_agent);
s_agent = NULL;
cJSON_Delete(s_peer_desc_json);
s_peer_desc_json = NULL;
return ret;
}
static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data)
{
switch (id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "local client connected");
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "local client disconnected");
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "local client error");
break;
default:
ESP_LOGI(TAG, "local client event id:%d", (int)id);
break;
}
}
static esp_err_t create_local_client(void)
{
esp_err_t ret = ESP_OK;
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.transport = MQTT_TRANSPORT_OVER_TCP,
.broker.address.hostname = wifi_get_ipv4(WIFI_IF_AP),
.broker.address.port = CONFIG_EXAMPLE_MQTT_BROKER_PORT,
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
.credentials.client_id = "local_mqtt"
};
s_local_mqtt = esp_mqtt_client_init(&mqtt_cfg);
ESP_GOTO_ON_FALSE(s_local_mqtt, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(s_local_mqtt, ESP_EVENT_ANY_ID, local_handler, NULL),
err, TAG, "Failed to register mqtt event handler");
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(s_local_mqtt), err, TAG, "Failed to start mqtt client");
return ESP_OK;
err:
esp_mqtt_client_destroy(s_local_mqtt);
s_local_mqtt = NULL;
return ret;
}
static void handle_message(char *client, char *topic, char *payload, int len, int qos, int retain)
{
if (client && strcmp(client, "local_mqtt") == 0 ) {
// This is our little local client -- do not forward
return;
}
ESP_LOGI(TAG, "handle_message topic:%s", topic);
ESP_LOGI(TAG, "handle_message data:%.*s", len, payload);
ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain);
if (s_local_mqtt && s_agent) {
int topic_len = strlen(topic) + 1; // null term
int topic_len_aligned = ALIGN(topic_len);
int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len;
if (total_msg_len > MAX_BUFFER_SIZE) {
ESP_LOGE(TAG, "Fail to forward, message too long");
return;
}
message_wrap_t *message = (message_wrap_t *)s_buffer;
message->topic_len = topic_len;
message->data_len = len;
memcpy(s_buffer + 4, topic, topic_len);
memcpy(s_buffer + 4 + topic_len_aligned, payload, len);
juice_send(s_agent, s_buffer, total_msg_len);
}
}
static void broker_task(void *ctx)
{
struct mosq_broker_config config = { .host = wifi_get_ipv4(WIFI_IF_AP), .port = CONFIG_EXAMPLE_MQTT_BROKER_PORT, .handle_message_cb = handle_message };
mosq_broker_run(&config);
vTaskDelete(NULL);
}
static esp_err_t create_local_broker(void)
{
return xTaskCreate(broker_task, "mqtt_broker_task", 1024 * 32, NULL, 5, NULL) == pdTRUE ?
ESP_OK : ESP_FAIL;
}

View File

@ -0,0 +1,122 @@
/*
* SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include "nvs_flash.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "esp_check.h"
#include "esp_wifi.h"
#include "esp_mac.h"
#define WIFI_CONNECTED_BIT BIT0
#define WIFI_FAIL_BIT BIT1
static const char *TAG = "serverless_wifi";
static EventGroupHandle_t s_wifi_events;
static int s_retry_num = 0;
static const int s_max_retry = 30;
static void wifi_event_handler(void *arg, esp_event_base_t event_base,
int32_t event_id, void *event_data)
{
if (event_id == WIFI_EVENT_AP_STACONNECTED) {
wifi_event_ap_staconnected_t *event = (wifi_event_ap_staconnected_t *) event_data;
ESP_LOGI(TAG, "station "MACSTR" join, AID=%d",
MAC2STR(event->mac), event->aid);
} else if (event_id == WIFI_EVENT_AP_STADISCONNECTED) {
wifi_event_ap_stadisconnected_t *event = (wifi_event_ap_stadisconnected_t *) event_data;
ESP_LOGI(TAG, "station "MACSTR" leave, AID=%d",
MAC2STR(event->mac), event->aid);
} else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) {
esp_wifi_connect();
} else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) {
if (s_retry_num < s_max_retry) {
esp_wifi_connect();
s_retry_num++;
ESP_LOGI(TAG, "retry to connect to the AP");
} else {
xEventGroupSetBits(s_wifi_events, WIFI_FAIL_BIT);
}
ESP_LOGI(TAG, "Connect to the AP fail");
} else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) {
ip_event_got_ip_t *event = (ip_event_got_ip_t *) event_data;
ESP_LOGI(TAG, "Got ip:" IPSTR, IP2STR(&event->ip_info.ip));
s_retry_num = 0;
xEventGroupSetBits(s_wifi_events, WIFI_CONNECTED_BIT);
}
}
esp_err_t wifi_connect(void)
{
esp_err_t ret = ESP_OK;
ESP_GOTO_ON_FALSE(s_wifi_events = xEventGroupCreate(), ESP_ERR_NO_MEM, err, TAG, "Failed to create wifi_events");
ESP_GOTO_ON_ERROR(nvs_flash_init(), err, TAG, "Failed to init nvs flash");
ESP_GOTO_ON_ERROR(esp_netif_init(), err, TAG, "Failed to init esp_netif");
ESP_GOTO_ON_ERROR(esp_event_loop_create_default(), err, TAG, "Failed to create default event loop");
ESP_GOTO_ON_ERROR(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, wifi_event_handler, NULL),
err, TAG, "Failed to register WiFi event handler");
ESP_GOTO_ON_ERROR(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, wifi_event_handler, NULL),
err, TAG, "Failed to register IP event handler");
// Initialize WiFi
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
ESP_GOTO_ON_ERROR(esp_wifi_init(&cfg), err, TAG, "Failed to initialize WiFi");
ESP_GOTO_ON_ERROR(esp_wifi_set_mode(WIFI_MODE_APSTA), err, TAG, "Failed to set STA+AP mode");
// Initialize AP
esp_netif_t *ap = esp_netif_create_default_wifi_ap();
ESP_GOTO_ON_FALSE(ap, ESP_FAIL, err, TAG, "Failed to create AP network interface");
wifi_config_t wifi_ap_config = {
.ap = {
.ssid = CONFIG_EXAMPLE_AP_SSID,
.password = CONFIG_EXAMPLE_AP_PASSWORD,
.authmode = WIFI_AUTH_WPA2_PSK,
.max_connection = 4,
},
};
ESP_GOTO_ON_ERROR(esp_wifi_set_config(WIFI_IF_AP, &wifi_ap_config), err, TAG, "Failed to set AP config");
// Initialize STA
esp_netif_t *sta = esp_netif_create_default_wifi_sta();
ESP_GOTO_ON_FALSE(sta, ESP_FAIL, err, TAG, "Failed to create WiFi station network interface");
wifi_config_t wifi_sta_config = {
.sta = {
.ssid = CONFIG_EXAMPLE_STA_SSID,
.password = CONFIG_EXAMPLE_STA_PASSWORD,
},
};
ESP_GOTO_ON_ERROR(esp_wifi_set_config(WIFI_IF_STA, &wifi_sta_config), err, TAG, "Failed to set STA config");
// Start WiFi
ESP_GOTO_ON_ERROR(esp_wifi_start(), err, TAG, "Failed to start WiFi");
// Wait for connection
EventBits_t bits = xEventGroupWaitBits(s_wifi_events, WIFI_CONNECTED_BIT | WIFI_FAIL_BIT,
pdFALSE, pdFALSE, pdMS_TO_TICKS(30000));
ESP_GOTO_ON_FALSE((bits & WIFI_CONNECTED_BIT) == WIFI_CONNECTED_BIT, ESP_FAIL, err,
TAG, "Failed to obtain IP address from WiFi station");
return ESP_OK;
err:
esp_wifi_stop();
esp_wifi_deinit();
nvs_flash_deinit();
esp_netif_deinit();
esp_event_loop_delete_default();
return ret;
}
_Thread_local char s_ipv4_addr[4 * 4]; // 4 octets + '.'/term
char *wifi_get_ipv4(wifi_interface_t interface)
{
esp_netif_t *netif = esp_netif_get_handle_from_ifkey(interface == WIFI_IF_AP ? "WIFI_AP_DEF" : "WIFI_STA_DEF");
ESP_RETURN_ON_FALSE(netif, NULL, TAG, "Failed to find default Wi-Fi netif");
esp_netif_ip_info_t ip_info;
ESP_RETURN_ON_FALSE(esp_netif_get_ip_info(netif, &ip_info) == ESP_OK, NULL, TAG, "Failed to get IP from netif");
ESP_RETURN_ON_FALSE(esp_ip4addr_ntoa(&ip_info.ip, s_ipv4_addr, sizeof(s_ipv4_addr)) != NULL, NULL, TAG, "Failed to convert IP");
return s_ipv4_addr;
}

View File

@ -0,0 +1,3 @@
CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y
CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384
CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB