feat(mosq): Update brokerless example to work with esp-peer

* Relax CI criteria to build on v5.2+ (for the brokerless due to
  esp-peer dependency)
This commit is contained in:
David Cermak
2025-07-09 12:55:46 +02:00
parent 462561b8d9
commit 76e45f7254
15 changed files with 882 additions and 259 deletions

View File

@ -14,10 +14,15 @@ jobs:
strategy:
matrix:
idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2", "release-v5.1"]
example: ["broker", "serverless_mqtt"]
exclude:
- idf_ver: "release-v5.1"
example: "serverless_mqtt" # serverless_mqtt is not supported due to esp-peer
runs-on: ubuntu-22.04
container: espressif/idf:${{ matrix.idf_ver }}
env:
TEST_DIR: components/mosquitto/examples
TEST_DIR: components/mosquitto/examples/${{ matrix.example }}
TARGET_TEST: broker
TARGET_TEST_DIR: build_esp32_default
steps:
@ -31,14 +36,17 @@ jobs:
. ${IDF_PATH}/export.sh
pip install idf-component-manager idf-build-apps --upgrade
python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml
# upload only the target test artifacts
cd ${TEST_DIR}/${TARGET_TEST}
${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR}
zip -qur artifacts.zip ${TARGET_TEST_DIR}
if [ "${{ matrix.example }}" == "${TARGET_TEST}" ]; then
# upload only the target test artifacts
cd ${TEST_DIR}
${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR}
zip -qur artifacts.zip ${TARGET_TEST_DIR}
fi
- uses: actions/upload-artifact@v4
if: ${{ matrix.example == 'broker' }}
with:
name: mosq_target_esp32_${{ matrix.idf_ver }}
path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip
path: ${{ env.TEST_DIR }}/artifacts.zip
if-no-files-found: error
test_mosq:

View File

@ -3,4 +3,21 @@
cmake_minimum_required(VERSION 3.16)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
# Setup ESP-PEER from GitHub repo (but it's supported only on certain targets)
set(ESP_PEER_COMPATIBLE_TARGETS "esp32s2" "esp32s3" "esp32p4" "esp32")
if(IDF_TARGET IN_LIST ESP_PEER_COMPATIBLE_TARGETS)
execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup/install.sh
${CMAKE_BINARY_DIR}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
RESULT_VARIABLE script_result)
if(script_result)
message(FATAL_ERROR "Script esp_peer_setup.sh failed with exit code ${script_result}")
endif()
list(APPEND EXTRA_COMPONENT_DIRS "${CMAKE_BINARY_DIR}/esp-peer/components/")
else()
message(STATUS "ESP-PEER is not compatible with this target")
endif()
project(serverless_mqtt)

View File

@ -3,14 +3,19 @@
MQTT served by (two) mosquitto's running on two ESP chips.
* Leverages MQTT connectivity between two private networks without cloud premisses.
* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE protocol, by [libjuice](https://github.com/paullouisageneau/libjuice)).
* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE/WebRTC protocol)
## Peer to peer connection
Could be established either by [libjuice](https://github.com/paullouisageneau/libjuice) or [esp-webRTC](https://github.com/espressif/esp-webrtc-solution). While `juice` is just a low level implementation of ICE-UDP, we need to provide some signalling and synchronization, the `WebRTC` is full-fledged solution to establish a peer connection using standardized signalling, security and transfer protocols.
## How it works
This example needs two ESP32 chipsets, that will create two separate Wi-Fi networks (IoT networks) used for IoT devices.
Each IoT network is served by an MQTT server (using mosquitto component).
This example will also synchronize these two MQTT brokers, as if there was only one IoT network with one broker.
This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) to traverse NATs, which enabling direct connection between two private networks behind NATs.
This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) or esp-webRTC (which implements WebRTC) to traverse NATs, which enabling direct connection between two private networks behind NATs.
* Diagram
@ -19,12 +24,16 @@ This example creates a peer to peer connection between two chipsets to keep them
Here's a step-by-step procedure of establishing this remote connection:
1) Initialize and start Wi-Fi AP (for IoT networks) and Wi-Fi station (for internet connection)
2) Start mosquitto broker on IoT network
3) Start libjuice to gather connection candidates
4) Synchronize using a public MQTT broker and exchange ICE descriptors
5) Establish ICE UDP connection between the two ESP32 chipsets
3) Start peer to peer connection
- In case of `libjuice`
- gather connection candidates
- synchronize using a public MQTT broker and exchange ICE descriptors
- establish ICE UDP connection between the two ESP32 chipsets
- In case of `webRTC` simply start the connection.
6) Start forwarding mqtt messages
- Each remote datagram (received from ICE-UDP channel) is re-published to the local MQTT server
- Each local MQTT message (received from mosquitto on_message callback) is sent in ICE-UDP datagram
- Each local MQTT message (received from mosquitto on_message callback) is sent in a peer message
## How to use this example
@ -33,7 +42,9 @@ You need two ESP32 devices that support Wi-Fi station and Wi-Fi software access
* Configure Wi-Fi credentials for both devices on both interfaces
* These devices would be deployed in distinct Wi-Fi environments, so the Wi-Fi station credentials would likely be different.
* They also create their own IoT network (on the soft-AP interface) Wi-Fi, so the AP credentials would likely be the same, suggesting the IoT networks will be keep synchronized (even though these are two distict Wi-Fi networks).
* Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role.
* Choose the peer library
* Only for `libjuice`:
- Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role.
* Optionally: You can use `idf.py` `-D` and `-B` flag to keep separate build directories and sdkconfigs for these two roles
```
idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor
@ -44,9 +55,129 @@ idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor
* Join PEER2 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics.
* Whenever you publish to a topic, all subscribed clients should receive the message, no matter which Wi-Fi network they're connected to.
## Example output
## With libjuice
```
I (4746) esp_netif_handlers: sta ip: 192.168.0.40, mask: 255.255.255.0, gw: 192.168.0.1
4: mosquitto version v2.0.20~3 starting
4: Using default config.
4: Opening ipv4 listen socket on port 1883.
4: mosquitto version v2.0.20~3 running
I (4756) serverless_mqtt1: desc: a=ice-ufrag:sGdl
a=ice-pwd:R4IPGsFctITbT1dCZbfQTL
a=ice-options:ice2,trickle
00:00:04 INFO agent.c:1100: Changing state to gathering
I (4776) serverless_mqtt1: JUICE state change: gathering
00:00:04 INFO agent.c:1100: Changing state to connecting
I (4786) serverless_mqtt1: JUICE state change: connecting
00:00:04 INFO agent.c:422: Using STUN server stun.l.google.com:19302
00:00:04 INFO agent.c:1378: STUN server binding successful
00:00:04 INFO agent.c:1397: Got STUN mapped address 185.194.44.31:62703 from server
00:00:04 INFO agent.c:2428: Candidate gathering done
I (5066) serverless_mqtt1: Gathering done
I (5066) serverless_mqtt1: desc: {
"desc": "a=ice-ufrag:sGdl\r\na=ice-pwd:R4IPGsFctITbT1dCZbfQTL\r\na=ice-options:ice2,trickle\r\n",
"cand0": "a=candidate:1 1 UDP 2122317823 192.168.0.40 62703 typ host",
"cand1": "a=candidate:2 1 UDP 1686109951 185.194.44.31 62703 typ srflx raddr 0.0.0.0 rport 0"
}
I (5096) serverless_mqtt1: Other event id:7
```
### With esp-peer
```
I (5992) esp_netif_handlers: sta ip: 192.168.0.42, mask: 255.255.255.0, gw: 192.168.0.1
4: mosquitto version v2.0.20~3 starting
4: Using default config.
4: Opening ipv4 listen socket on port 1883.
4: mosquitto version v2.0.20~3 running
I (6702) esp-x509-crt-bundle: Certificate validated
I (7982) APPRTC_SIG: result SUCCESS
Initials set to 1
I (7982) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 911
I (8652) esp-x509-crt-bundle: Certificate validated
Got url:stun:webrtc.espressif.com:3478 user_name: 1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw=
I (10022) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 173
I (10032) serverless_mqtt_webrtc: Signaling ice info handler 0x0
I (10042) DTLS: Init SRTP OK
I (11512) DTLS_SRTP: dtls_srtp init done
I (11522) APPRTC_SIG: Registering signaling channel.
I (11522) APPRTC_SIG: Connecting to wss://webrtc.espressif.com:8089/ws...
I (11532) websocket_client: Started
I (11532) serverless_mqtt_webrtc: Waiting for peer to connect
I (12122) esp-x509-crt-bundle: Certificate validated
I (13502) APPRTC_SIG: WEBSOCKET_EVENT_CONNECTED
I (13502) APPRTC_SIG: send to remote : {"cmd":"register","roomid":"111116","clientid":"93827452"}
I (13502) serverless_mqtt_webrtc: Peer state: 2
I (13522) AGENT: Start agent as Controlling
I (13522) PEER_DEF: Start DTLS role as 1
I (13522) AGENT: Send STUN binding request
I (13522) AGENT: Send allocate now
Got error code 401
I (13802) AGENT: Send allocate now
I (14112) AGENT: 0 Get candidate success user:1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw=
I (14112) APPRTC_SIG: Begin to send offer to https://webrtc.espressif.com/message/111116/93827452
I (14782) esp-x509-crt-bundle: Certificate validated
I (16472) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 21
I (21602) PEER_DEF: A SRC: 4
I (21602) PEER_DEF: Get peer role 0
I (21602) PEER_DEF: Get peer role 0
I (21602) AGENT: 0 Add remote type:2 185.194.44.31:60872
I (21612) AGENT: 0 Add remote type:4 172.31.6.33:59012
I (21612) AGENT: 0 Add remote type:1 192.168.0.41:60872
0 Sorted pair 0 type: 1 local:192.168.0.42:63459 Remote:192.168.0.41:60872
0 Sorted pair 1 type: 2 local:185.194.44.31:63459 Remote:185.194.44.31:60872
0 Sorted pair 2 type: 4 local:172.31.6.33:59013 Remote:172.31.6.33:59012
I (21642) serverless_mqtt_webrtc: Peer state: 3
I (22002) AGENT: 0 Send binding request (cand:0) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:76e5004e797d87a62756c714
I (22002) AGENT: 0 Send binding request (cand:0) local:185.194.44.31:63459 remote:185.194.44.31:60872 id:4c598d93643bd7252b449456
I (22012) AGENT: 0 Send binding request (cand:0) local:172.31.6.33:59013 remote:172.31.6.33:59012 id:6db505597d157d8d71dfed43
I (22022) AGENT: 0 send indication bind request
I (22202) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22202) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22212) AGENT: 0 Select pair192.168.0.41:60872
I (22212) AGENT: 0 Send binding request (cand:1) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:45cb977b6ea3bbbe3a984b90
I (22222) AGENT: 0 PeerIndication recv local:172.31.6.33:59013 remote:172.31.6.33:59012
I (22402) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22402) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22412) AGENT: 0 Candidate responsed
I (22412) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22422) AGENT: 0 Connection OK 192.168.0.41:60872
I (22422) serverless_mqtt_webrtc: Peer state: 5
I (22452) DTLS: Start to do server handshake
Works as 1
I (23842) DTLS: SRTP connected OK
I (23852) DTLS: Server handshake success
I (23852) PEER_DEF: DTLS handshake success
I (23852) serverless_mqtt_webrtc: Peer state: 6
I (23852) serverless_mqtt_webrtc: Peer is connected!
I (23862) serverless_mqtt: local client event id:7
22: New connection from 192.168.4.1:63904 on port 1883.
22: New client connected from 192.168.4.1:63904 as local_mqtt (p2, c1, k120).
I (23892) serverless_mqtt: local client connected
I (23872) serverless_mqtt: Everything is ready, exiting main task
I (23892) main_task: Returned from app_main()
I (24552) SCTP: 0 Receive chunk 1 SCTP_INIT
I (24552) SCTP: 0 state 2
I (24552) SCTP: Send INIT_ACK chunk
I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO
I (24762) SCTP: Send ECHO_ACK chunk
I (24762) SCTP: 0 state 5
I (24762) serverless_mqtt_webrtc: Peer state: 8
I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO
I (24772) SCTP: Send ECHO_ACK chunk
I (24962) SCTP: Get DCEP esp_channel event:3 type:0 si:2
I (24972) serverless_mqtt_webrtc: Peer state: 9
```
## Warning
This example uses libjuice as a dependency:
This example uses `libjuice` as a dependency:
* libjuice (UDP Interactive Connectivity Establishment): https://github.com/paullouisageneau/libjuice

View File

@ -0,0 +1,28 @@
From cdc43a56f5ea1ab1935f55f47f8644f5dd30825e Mon Sep 17 00:00:00 2001
From: David Cermak <cermak@espressif.com>
Date: Thu, 10 Jul 2025 11:09:57 +0200
Subject: [PATCH] fix(media_lib): Remove deprecated freeRTOS header
---
components/media_lib_sal/port/media_lib_os_freertos.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/components/media_lib_sal/port/media_lib_os_freertos.c b/components/media_lib_sal/port/media_lib_os_freertos.c
index d248d59..aea0527 100644
--- a/components/media_lib_sal/port/media_lib_os_freertos.c
+++ b/components/media_lib_sal/port/media_lib_os_freertos.c
@@ -40,8 +40,12 @@
#include "esp_idf_version.h"
#if CONFIG_FREERTOS_ENABLE_TASK_SNAPSHOT
+#if (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 2, 0))
+#include "esp_private/freertos_debug.h"
+#else
#include "freertos/task_snapshot.h"
#endif
+#endif
#ifdef __XTENSA__
#include "esp_debug_helpers.h"
--
2.43.0

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
set -e
echo "bin_dir: $1"
bin_dir="$1"
THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
ESP_PEER_VERSION="ccff3bd65cea750bf6c0abcf9d95b931ba9329f0"
ESP_PEER_URL="https://github.com/espressif/esp-webrtc-solution/archive/${ESP_PEER_VERSION}.zip"
ESP_PEER_DIR="${bin_dir}/esp-peer"
ZIP_PATH="${bin_dir}/esp-peer.zip"
EXTRACTED_DIR="${ESP_PEER_DIR}/esp-webrtc-solution-${ESP_PEER_VERSION}"
COMPONENTS_SRC="${EXTRACTED_DIR}/components"
COMPONENTS_DST="${ESP_PEER_DIR}/components"
PATCH_FILE_1="${THIS_DIR}/Remove-deprecated-freeRTOS-header.patch"
PATCH_FILE_2="${THIS_DIR}/libpeer-Add-direct-dependency-to-libsrtp.patch"
# Download if not exists
if [ ! -d "$EXTRACTED_DIR" ]; then
echo "Downloading esp-peer ${ESP_PEER_VERSION}..."
wget -O "$ZIP_PATH" "$ESP_PEER_URL"
unzip -o "$ZIP_PATH" -d "$ESP_PEER_DIR"
patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_1"
patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_2"
mv ${EXTRACTED_DIR}/components ${ESP_PEER_DIR}
mv ${ESP_PEER_DIR}/components/esp_webrtc/impl/peer_default ${ESP_PEER_DIR}/components
fi

View File

@ -0,0 +1,23 @@
From 695e057000698f4897b6c5802851499842e2fe31 Mon Sep 17 00:00:00 2001
From: David Cermak <cermak@espressif.com>
Date: Fri, 11 Jul 2025 16:59:21 +0200
Subject: [PATCH] fix(libpeer): Add direct dependency to libsrtp
---
components/esp_webrtc/impl/peer_default/CMakeLists.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/components/esp_webrtc/impl/peer_default/CMakeLists.txt b/components/esp_webrtc/impl/peer_default/CMakeLists.txt
index 2af35cf..3fb4615 100644
--- a/components/esp_webrtc/impl/peer_default/CMakeLists.txt
+++ b/components/esp_webrtc/impl/peer_default/CMakeLists.txt
@@ -2,6 +2,6 @@ idf_component_register(INCLUDE_DIRS ./include)
get_filename_component(BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR} NAME)
add_prebuilt_library(${BASE_DIR} "${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}/libpeer_default.a"
- PRIV_REQUIRES ${BASE_DIR} esp_timer)
+ PRIV_REQUIRES ${BASE_DIR} esp_timer espressif__esp_libsrtp)
target_link_libraries(${COMPONENT_LIB} INTERFACE "-L ${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}")
target_link_libraries(${COMPONENT_LIB} INTERFACE peer_default)
--
2.43.0

View File

@ -1,4 +1,14 @@
if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER)
set(PEER_BACKEND_SRC "peer_impl_webrtc.c")
else()
set(PEER_BACKEND_SRC "peer_impl_juice.c")
endif()
idf_component_register(SRCS "serverless_mqtt.c"
"wifi_connect.c"
"${PEER_BACKEND_SRC}"
INCLUDE_DIRS "."
REQUIRES libjuice nvs_flash mqtt json esp_wifi)
if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER)
idf_component_optional_requires(PUBLIC media_lib_sal esp_webrtc peer_default)
endif()

View File

@ -33,8 +33,40 @@ menu "Example Configuration"
WiFi station password for the example to use.
endmenu
choice EXAMPLE_PEER_LIB
prompt "Choose peer library"
default EXAMPLE_PEER_LIB_LIBJUICE
help
Choose the peer library to use for WebRTC communication.
libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling)
esp_peer: Use ESP-IDF specific peer library
config EXAMPLE_PEER_LIB_ESP_PEER
bool "esp_peer"
config EXAMPLE_PEER_LIB_LIBJUICE
bool "libjuice"
endchoice
config EXAMPLE_WEBRTC_URL
string "WebRTC server URL"
depends on EXAMPLE_PEER_LIB_ESP_PEER
default "https://webrtc.espressif.com/join/"
help
URL of WebRTC remote endpoint.
config EXAMPLE_WEBRTC_ROOM_ID
string "WebRTC room ID"
depends on EXAMPLE_PEER_LIB_ESP_PEER
default "12345"
help
Room ID for WebRTC synchronisation.
Could be a random number, but the same for both peers.
config EXAMPLE_MQTT_BROKER_URI
string "MQTT Broker URL"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default "mqtt://mqtt.eclipseprojects.io"
help
URL of the mqtt broker use for synchronisation and exchanging
@ -42,12 +74,14 @@ menu "Example Configuration"
config EXAMPLE_MQTT_SYNC_TOPIC
string "MQTT topic for synchronisation"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default "/topic/serverless_mqtt"
help
MQTT topic used fo synchronisation.
config EXAMPLE_STUN_SERVER
string "Hostname of STUN server"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default "stun.l.google.com"
help
STUN server hostname.
@ -67,6 +101,7 @@ menu "Example Configuration"
choice EXAMPLE_SERVERLESS_ROLE
prompt "Choose your role"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default EXAMPLE_SERVERLESS_ROLE_PEER1
help
Choose either peer1 or peer2.

View File

@ -0,0 +1,18 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include <stdio.h>
#include "esp_random.h"
#include "esp_sleep.h"
#include "mosq_broker.h"
typedef void (*on_peer_recv_t)(const char *data, size_t size);
esp_err_t peer_init(on_peer_recv_t cb);
void peer_get_buffer(char ** buffer, size_t *buffer_len);
void peer_send(char* data, size_t size);

View File

@ -0,0 +1,283 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include <stdio.h>
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "mqtt_client.h"
#include "esp_wifi.h"
#include "esp_log.h"
#include "esp_check.h"
#include "juice/juice.h"
#include "cJSON.h"
#include "peer_impl.h"
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
#define OUR_PEER "1"
#define THEIR_PEER "2"
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
#define OUR_PEER "2"
#define THEIR_PEER "1"
#endif
#define PEER_SYNC0 BIT(0)
#define PEER_SYNC1 BIT(1)
#define PEER_SYNC2 BIT(2)
#define PEER_FAIL BIT(3)
#define PEER_GATHER_DONE BIT(4)
#define PEER_DESC_PUBLISHED BIT(5)
#define PEER_CONNECTED BIT(6)
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
static const char *TAG = "serverless_mqtt" OUR_PEER;
static char s_buffer[MAX_BUFFER_SIZE];
static EventGroupHandle_t s_state = NULL;
static juice_agent_t *s_agent = NULL;
static cJSON *s_peer_desc_json = NULL;
static char *s_peer_desc = NULL;
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
static on_peer_recv_t s_on_recv = NULL;
char *wifi_get_ipv4(wifi_interface_t interface);
static esp_err_t sync_peers(void);
static esp_err_t create_candidates(void);
void peer_get_buffer(char ** buffer, size_t *buffer_len)
{
if (buffer && buffer_len) {
*buffer = s_buffer;
*buffer_len = MAX_BUFFER_SIZE;
}
}
void peer_send(char* data, size_t size)
{
juice_send(s_agent, data, size);
}
esp_err_t peer_init(on_peer_recv_t cb)
{
esp_err_t ret = ESP_FAIL;
ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback");
s_on_recv = cb;
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
if (bits & PEER_CONNECTED) {
ESP_LOGI(TAG, "Peer is connected!");
return ESP_OK;
}
err:
ESP_LOGE(TAG, "Failed to init peer");
return ret;
}
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
}
xEventGroupSetBits(s_state, PEER_SYNC0);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
break;
}
EventBits_t bits = xEventGroupGetBits(s_state);
if (event->data_len > 1 && s_agent) {
cJSON *root = cJSON_Parse(event->data);
if (root == NULL) {
break;
}
cJSON *desc = cJSON_GetObjectItem(root, "desc");
if (desc == NULL) {
cJSON_Delete(root);
break;
}
printf("desc->valuestring:%s\n", desc->valuestring);
juice_set_remote_description(s_agent, desc->valuestring);
char cand_name[] = "cand0";
while (true) {
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
if (cand == NULL) {
break;
}
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
juice_add_remote_candidate(s_agent, cand->valuestring);
cand_name[4]++;
}
cJSON_Delete(root);
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
// and destroy the mqtt client
}
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
}
#else
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC1);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
}
#endif
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
static esp_err_t sync_peers(void)
{
esp_err_t ret = ESP_OK;
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
};
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
err, TAG, "Failed to register mqtt event handler");
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
ESP_LOGI(TAG, "Waiting for the other peer...");
const int max_sync_retry = 60;
int retry = 0;
while (true) {
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
if (bits & PEER_SYNC2) {
break;
}
if (bits & PEER_SYNC1) {
continue;
}
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish mqtt message");
#endif
}
ESP_LOGI(TAG, "Sync done");
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish peer's description");
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
err:
free(s_peer_desc);
esp_mqtt_client_destroy(client);
return ret;
}
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
{
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
if (state == JUICE_STATE_CONNECTED) {
xEventGroupSetBits(s_state, PEER_CONNECTED);
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
esp_restart();
}
}
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
{
static uint8_t cand_nr = 0;
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
char cand_name[] = "cand0";
cand_name[4] += cand_nr++;
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
}
}
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
{
ESP_LOGI(TAG, "Gathering done");
if (s_state) {
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
}
}
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
{
if (s_local_mqtt) {
s_on_recv(data, size);
} else {
ESP_LOGI(TAG, "No local mqtt client, dropping data");
}
}
static esp_err_t create_candidates(void)
{
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
s_peer_desc_json = cJSON_CreateObject();
esp_err_t ret = ESP_OK;
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
.stun_server_port = 19302,
.cb_state_changed = juice_state,
.cb_candidate = juice_candidate,
.cb_gathering_done = juice_gathering_done,
.cb_recv = juice_recv,
};
s_agent = juice_create(&config);
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to get local description");
ESP_LOGI(TAG, "desc: %s", s_buffer);
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
s_peer_desc = cJSON_Print(s_peer_desc_json);
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
cJSON_Delete(s_peer_desc_json);
return ESP_OK;
err:
juice_destroy(s_agent);
s_agent = NULL;
cJSON_Delete(s_peer_desc_json);
s_peer_desc_json = NULL;
return ret;
}

View File

@ -0,0 +1,248 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include "media_lib_os.h"
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "media_lib_adapter.h"
#include "media_lib_os.h"
#include "esp_log.h"
#include "esp_webrtc_defaults.h"
#include "esp_peer_default.h"
#include "common.h"
#include "esp_check.h"
#include "peer_impl.h"
#include "sdkconfig.h"
#define WEBRTC_URL (CONFIG_EXAMPLE_WEBRTC_URL CONFIG_EXAMPLE_WEBRTC_ROOM_ID)
#define PEER_CONNECTED BIT(0)
#define PEER_DISCONNECTED BIT(1)
#define MAX_BUFFER_SIZE (4*1024)
static EventGroupHandle_t s_state = NULL;
static const char *TAG = "serverless_mqtt_webrtc";
void peer_get_buffer(char ** buffer, size_t *buffer_len)
{
static char s_buffer[MAX_BUFFER_SIZE];
if (buffer && buffer_len) {
*buffer = s_buffer;
*buffer_len = MAX_BUFFER_SIZE;
}
}
static int start_webrtc(char *url);
static int stop_webrtc(void);
static on_peer_recv_t s_on_recv = NULL;
static esp_peer_signaling_handle_t signaling = NULL;
static esp_peer_handle_t peer = NULL;
static bool peer_running = false;
static void thread_scheduler(const char *thread_name, media_lib_thread_cfg_t *thread_cfg)
{
if (strcmp(thread_name, "pc_task") == 0) {
thread_cfg->stack_size = 25 * 1024;
thread_cfg->priority = 18;
thread_cfg->core_id = 1;
}
}
esp_err_t peer_init(on_peer_recv_t cb)
{
esp_err_t ret = ESP_OK;
s_on_recv = cb;
s_state = xEventGroupCreate();
media_lib_add_default_adapter();
media_lib_thread_set_schedule_cb(thread_scheduler);
ESP_RETURN_ON_FALSE(s_state, ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
ESP_GOTO_ON_FALSE(start_webrtc(WEBRTC_URL) == ESP_PEER_ERR_NONE, ESP_FAIL, err, TAG, "Failed to start webRTC");
ESP_LOGI(TAG, "Waiting for peer to connect");
int i = 0;
while (1) {
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(1000));
if (bits & PEER_CONNECTED) {
ESP_LOGI(TAG, "Peer is connected!");
return ret;
}
ESP_GOTO_ON_FALSE(i++ < 100, ESP_ERR_TIMEOUT, err, TAG, "Peer connection timeout");
if (peer) {
esp_peer_query(peer);
}
}
err:
vEventGroupDelete(s_state);
return ret;
}
static int peer_state_handler(esp_peer_state_t state, void* ctx)
{
ESP_LOGI(TAG, "Peer state: %d", state);
if (state == ESP_PEER_STATE_CONNECTED) {
xEventGroupSetBits(s_state, PEER_CONNECTED);
} else if (state == ESP_PEER_STATE_DISCONNECTED) {
xEventGroupSetBits(s_state, PEER_DISCONNECTED);
}
return 0;
}
static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx)
{
if (msg->type == ESP_PEER_MSG_TYPE_SDP) {
// Send local SDP to signaling server
esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg);
}
return 0;
}
static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx)
{
return 0;
}
static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx)
{
return 0;
}
static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx)
{
ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]);
return 0;
}
static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx)
{
return 0;
}
static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx)
{
if (frame && frame->size > 0) {
s_on_recv((char*)frame->data, frame->size);
}
return 0;
}
static void pc_task(void *arg)
{
while (peer_running) {
esp_peer_main_loop(peer);
media_lib_thread_sleep(20);
}
media_lib_thread_destroy(NULL);
}
static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx)
{
if (peer == NULL) {
esp_peer_default_cfg_t peer_cfg = {
.agent_recv_timeout = 500,
};
esp_peer_cfg_t cfg = {
.server_lists = &info->server_info,
.server_num = 1,
.audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV,
.audio_info = {
.codec = ESP_PEER_AUDIO_CODEC_G711A,
},
.enable_data_channel = true,
.role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED,
.on_state = peer_state_handler,
.on_msg = peer_msg_handler,
.on_video_info = peer_video_info_handler,
.on_audio_info = peer_audio_info_handler,
.on_video_data = peer_video_data_handler,
.on_audio_data = peer_audio_data_handler,
.on_data = peer_data_handler,
.ctx = ctx,
.extra_cfg = &peer_cfg,
.extra_size = sizeof(esp_peer_default_cfg_t),
};
int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer);
if (ret != ESP_PEER_ERR_NONE) {
return ret;
}
media_lib_thread_handle_t thread = NULL;
peer_running = true;
media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL);
if (thread == NULL) {
peer_running = false;
}
}
return 0;
}
static int signaling_connected_handler(void* ctx)
{
if (peer) {
return esp_peer_new_connection(peer);
}
return 0;
}
static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx)
{
if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) {
esp_peer_close(peer);
peer = NULL;
} else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) {
// Receive remote SDP
if (peer) {
esp_peer_send_msg(peer, (esp_peer_msg_t*)msg);
}
}
return 0;
}
static int signaling_close_handler(void *ctx)
{
return 0;
}
static int start_signaling(char* url)
{
esp_peer_signaling_cfg_t cfg = {
.signal_url = url,
.on_ice_info = signaling_ice_info_handler,
.on_connected = signaling_connected_handler,
.on_msg = signaling_msg_handler,
.on_close = signaling_close_handler,
};
// Use APPRTC signaling
return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling);
}
static int start_webrtc(char *url)
{
stop_webrtc();
return start_signaling(url);
}
static int stop_webrtc(void)
{
peer_running = false;
if (peer) {
esp_peer_close(peer);
peer = NULL;
}
if (signaling) {
esp_peer_signaling_stop(signaling);
signaling = NULL;
}
return 0;
}
void peer_send(char* data, size_t size)
{
esp_peer_data_frame_t data_frame = {
.type = ESP_PEER_DATA_CHANNEL_DATA,
.data = (uint8_t*)data,
.size = size,
};
esp_peer_send_data(peer, &data_frame);
}

View File

@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
* SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
@ -13,30 +13,9 @@
#include "esp_check.h"
#include "esp_sleep.h"
#include "mosq_broker.h"
#include "juice/juice.h"
#include "cJSON.h"
#include "peer_impl.h"
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
#define OUR_PEER "1"
#define THEIR_PEER "2"
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
#define OUR_PEER "2"
#define THEIR_PEER "1"
#endif
#define PEER_SYNC0 BIT(0)
#define PEER_SYNC1 BIT(1)
#define PEER_SYNC2 BIT(2)
#define PEER_FAIL BIT(3)
#define PEER_GATHER_DONE BIT(4)
#define PEER_DESC_PUBLISHED BIT(5)
#define PEER_CONNECTED BIT(6)
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
#define ALIGN(size) (((size) + 3U) & ~(3U))
typedef struct message_wrap {
uint16_t topic_len;
@ -44,196 +23,18 @@ typedef struct message_wrap {
char data[];
} __attribute__((packed)) message_wrap_t;
static const char *TAG = "serverless_mqtt" OUR_PEER;
static char s_buffer[MAX_BUFFER_SIZE];
static EventGroupHandle_t s_state = NULL;
static juice_agent_t *s_agent = NULL;
static cJSON *s_peer_desc_json = NULL;
static char *s_peer_desc = NULL;
static const char *TAG = "serverless_mqtt";
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
char *wifi_get_ipv4(wifi_interface_t interface);
esp_err_t wifi_connect(void);
static esp_err_t sync_peers(void);
static esp_err_t create_candidates(void);
static esp_err_t create_local_client(void);
static esp_err_t create_local_broker(void);
void app_main(void)
{
__attribute__((__unused__)) esp_err_t ret;
ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi");
ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker");
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
if (bits & PEER_CONNECTED) {
ESP_LOGI(TAG, "Peer is connected!");
ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client");
ESP_LOGI(TAG, "Everything is ready, exiting main task");
return;
}
err:
ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)");
esp_deep_sleep(1000000LL * (esp_random() % 20));
}
esp_err_t peer_init(on_peer_recv_t cb);
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
}
xEventGroupSetBits(s_state, PEER_SYNC0);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
break;
}
EventBits_t bits = xEventGroupGetBits(s_state);
if (event->data_len > 1 && s_agent) {
cJSON *root = cJSON_Parse(event->data);
if (root == NULL) {
break;
}
cJSON *desc = cJSON_GetObjectItem(root, "desc");
if (desc == NULL) {
cJSON_Delete(root);
break;
}
printf("desc->valuestring:%s\n", desc->valuestring);
juice_set_remote_description(s_agent, desc->valuestring);
char cand_name[] = "cand0";
while (true) {
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
if (cand == NULL) {
break;
}
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
juice_add_remote_candidate(s_agent, cand->valuestring);
cand_name[4]++;
}
cJSON_Delete(root);
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
// and destroy the mqtt client
}
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
}
#else
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC1);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
}
#endif
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
static esp_err_t sync_peers(void)
{
esp_err_t ret = ESP_OK;
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
};
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
err, TAG, "Failed to register mqtt event handler");
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
ESP_LOGI(TAG, "Waiting for the other peer...");
const int max_sync_retry = 60;
int retry = 0;
while (true) {
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
if (bits & PEER_SYNC2) {
break;
}
if (bits & PEER_SYNC1) {
continue;
}
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish mqtt message");
#endif
}
ESP_LOGI(TAG, "Sync done");
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish peer's description");
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
err:
free(s_peer_desc);
esp_mqtt_client_destroy(client);
return ret;
}
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
{
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
if (state == JUICE_STATE_CONNECTED) {
xEventGroupSetBits(s_state, PEER_CONNECTED);
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
esp_restart();
}
}
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
{
static uint8_t cand_nr = 0;
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
char cand_name[] = "cand0";
cand_name[4] += cand_nr++;
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
}
}
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
{
ESP_LOGI(TAG, "Gathering done");
if (s_state) {
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
}
}
#define ALIGN(size) (((size) + 3U) & ~(3U))
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
static void peer_recv(const char *data, size_t size)
{
if (s_local_mqtt) {
message_wrap_t *message = (message_wrap_t *)data;
@ -250,45 +51,21 @@ static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void
ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload);
esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0);
}
}
static esp_err_t create_candidates(void)
void app_main(void)
{
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
s_peer_desc_json = cJSON_CreateObject();
esp_err_t ret = ESP_OK;
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
.stun_server_port = 19302,
.cb_state_changed = juice_state,
.cb_candidate = juice_candidate,
.cb_gathering_done = juice_gathering_done,
.cb_recv = juice_recv,
};
s_agent = juice_create(&config);
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to get local description");
ESP_LOGI(TAG, "desc: %s", s_buffer);
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
s_peer_desc = cJSON_Print(s_peer_desc_json);
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
cJSON_Delete(s_peer_desc_json);
return ESP_OK;
__attribute__((__unused__)) esp_err_t ret;
ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi");
ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker");
ESP_GOTO_ON_ERROR(peer_init(peer_recv), err, TAG, "Failed to init peer library");
ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client");
ESP_LOGI(TAG, "Everything is ready, exiting main task");
return;
err:
juice_destroy(s_agent);
s_agent = NULL;
cJSON_Delete(s_peer_desc_json);
s_peer_desc_json = NULL;
return ret;
ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)");
esp_deep_sleep(1000000LL * (esp_random() % 20));
}
static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data)
@ -335,18 +112,26 @@ err:
static void handle_message(char *client, char *topic, char *payload, int len, int qos, int retain)
{
if (client && strcmp(client, "local_mqtt") == 0 ) {
if (client && strcmp(client, "local_mqtt") == 0) {
// This is our little local client -- do not forward
return;
}
ESP_LOGI(TAG, "handle_message topic:%s", topic);
ESP_LOGI(TAG, "handle_message data:%.*s", len, payload);
ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain);
if (s_local_mqtt && s_agent) {
if (s_local_mqtt) {
static char *s_buffer = NULL;
static size_t s_buffer_len = 0;
if (s_buffer == NULL || s_buffer_len == 0) {
peer_get_buffer(&s_buffer, &s_buffer_len);
if (s_buffer == NULL || s_buffer_len == 0) {
return;
}
}
int topic_len = strlen(topic) + 1; // null term
int topic_len_aligned = ALIGN(topic_len);
int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len;
if (total_msg_len > MAX_BUFFER_SIZE) {
if (total_msg_len > s_buffer_len) {
ESP_LOGE(TAG, "Fail to forward, message too long");
return;
}
@ -356,7 +141,7 @@ static void handle_message(char *client, char *topic, char *payload, int len, in
memcpy(s_buffer + 4, topic, topic_len);
memcpy(s_buffer + 4 + topic_len_aligned, payload, len);
juice_send(s_agent, s_buffer, total_msg_len);
peer_send(s_buffer, total_msg_len);
}
}

View File

@ -0,0 +1,2 @@
CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=n
CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=y

View File

@ -0,0 +1,5 @@
CONFIG_IDF_TARGET="esp32"
CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=y
CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=n
CONFIG_SPIRAM=y
CONFIG_MBEDTLS_EXTERNAL_MEM_ALLOC=y

View File

@ -1,3 +1,6 @@
CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y
CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384
CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768
CONFIG_LWIP_SNTP_MAX_SERVERS=2
CONFIG_MBEDTLS_SSL_PROTO_DTLS=y
CONFIG_MBEDTLS_SSL_DTLS_SRTP=y