mirror of
https://github.com/espressif/esp-protocols.git
synced 2025-07-29 10:17:30 +02:00
Merge pull request #840 from david-cermak/feat/mosq_esp_peer
[mosq]: Add support for esp-peer in brokerless example
This commit is contained in:
14
.github/workflows/mosq__build.yml
vendored
14
.github/workflows/mosq__build.yml
vendored
@ -14,10 +14,15 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2", "release-v5.1"]
|
||||
example: ["broker", "serverless_mqtt"]
|
||||
exclude:
|
||||
- idf_ver: "release-v5.1"
|
||||
example: "serverless_mqtt" # serverless_mqtt is not supported due to esp-peer
|
||||
|
||||
runs-on: ubuntu-22.04
|
||||
container: espressif/idf:${{ matrix.idf_ver }}
|
||||
env:
|
||||
TEST_DIR: components/mosquitto/examples
|
||||
TEST_DIR: components/mosquitto/examples/${{ matrix.example }}
|
||||
TARGET_TEST: broker
|
||||
TARGET_TEST_DIR: build_esp32_default
|
||||
steps:
|
||||
@ -31,14 +36,17 @@ jobs:
|
||||
. ${IDF_PATH}/export.sh
|
||||
pip install idf-component-manager idf-build-apps --upgrade
|
||||
python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml
|
||||
if [ "${{ matrix.example }}" == "${TARGET_TEST}" ]; then
|
||||
# upload only the target test artifacts
|
||||
cd ${TEST_DIR}/${TARGET_TEST}
|
||||
cd ${TEST_DIR}
|
||||
${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR}
|
||||
zip -qur artifacts.zip ${TARGET_TEST_DIR}
|
||||
fi
|
||||
- uses: actions/upload-artifact@v4
|
||||
if: ${{ matrix.example == 'broker' }}
|
||||
with:
|
||||
name: mosq_target_esp32_${{ matrix.idf_ver }}
|
||||
path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip
|
||||
path: ${{ env.TEST_DIR }}/artifacts.zip
|
||||
if-no-files-found: error
|
||||
|
||||
test_mosq:
|
||||
|
@ -3,4 +3,21 @@
|
||||
cmake_minimum_required(VERSION 3.16)
|
||||
|
||||
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
|
||||
|
||||
# Setup ESP-PEER from GitHub repo (but it's supported only on certain targets)
|
||||
set(ESP_PEER_COMPATIBLE_TARGETS "esp32s2" "esp32s3" "esp32p4" "esp32")
|
||||
if(IDF_TARGET IN_LIST ESP_PEER_COMPATIBLE_TARGETS)
|
||||
execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup/install.sh
|
||||
${CMAKE_BINARY_DIR}
|
||||
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
|
||||
RESULT_VARIABLE script_result)
|
||||
|
||||
if(script_result)
|
||||
message(FATAL_ERROR "Script esp_peer_setup.sh failed with exit code ${script_result}")
|
||||
endif()
|
||||
list(APPEND EXTRA_COMPONENT_DIRS "${CMAKE_BINARY_DIR}/esp-peer/components/")
|
||||
else()
|
||||
message(STATUS "ESP-PEER is not compatible with this target")
|
||||
endif()
|
||||
|
||||
project(serverless_mqtt)
|
||||
|
@ -3,14 +3,19 @@
|
||||
MQTT served by (two) mosquitto's running on two ESP chips.
|
||||
|
||||
* Leverages MQTT connectivity between two private networks without cloud premisses.
|
||||
* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE protocol, by [libjuice](https://github.com/paullouisageneau/libjuice)).
|
||||
* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE/WebRTC protocol)
|
||||
|
||||
## Peer to peer connection
|
||||
|
||||
Could be established either by [libjuice](https://github.com/paullouisageneau/libjuice) or [esp-webRTC](https://github.com/espressif/esp-webrtc-solution). While `juice` is just a low level implementation of ICE-UDP, we need to provide some signalling and synchronization, the `WebRTC` is full-fledged solution to establish a peer connection using standardized signalling, security and transfer protocols.
|
||||
|
||||
|
||||
## How it works
|
||||
|
||||
This example needs two ESP32 chipsets, that will create two separate Wi-Fi networks (IoT networks) used for IoT devices.
|
||||
Each IoT network is served by an MQTT server (using mosquitto component).
|
||||
This example will also synchronize these two MQTT brokers, as if there was only one IoT network with one broker.
|
||||
This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) to traverse NATs, which enabling direct connection between two private networks behind NATs.
|
||||
This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) or esp-webRTC (which implements WebRTC) to traverse NATs, which enabling direct connection between two private networks behind NATs.
|
||||
|
||||
* Diagram
|
||||
|
||||
@ -19,12 +24,16 @@ This example creates a peer to peer connection between two chipsets to keep them
|
||||
Here's a step-by-step procedure of establishing this remote connection:
|
||||
1) Initialize and start Wi-Fi AP (for IoT networks) and Wi-Fi station (for internet connection)
|
||||
2) Start mosquitto broker on IoT network
|
||||
3) Start libjuice to gather connection candidates
|
||||
4) Synchronize using a public MQTT broker and exchange ICE descriptors
|
||||
5) Establish ICE UDP connection between the two ESP32 chipsets
|
||||
3) Start peer to peer connection
|
||||
- In case of `libjuice`
|
||||
- gather connection candidates
|
||||
- synchronize using a public MQTT broker and exchange ICE descriptors
|
||||
- establish ICE UDP connection between the two ESP32 chipsets
|
||||
- In case of `webRTC` simply start the connection.
|
||||
6) Start forwarding mqtt messages
|
||||
- Each remote datagram (received from ICE-UDP channel) is re-published to the local MQTT server
|
||||
- Each local MQTT message (received from mosquitto on_message callback) is sent in ICE-UDP datagram
|
||||
- Each local MQTT message (received from mosquitto on_message callback) is sent in a peer message
|
||||
|
||||
|
||||
## How to use this example
|
||||
|
||||
@ -33,7 +42,9 @@ You need two ESP32 devices that support Wi-Fi station and Wi-Fi software access
|
||||
* Configure Wi-Fi credentials for both devices on both interfaces
|
||||
* These devices would be deployed in distinct Wi-Fi environments, so the Wi-Fi station credentials would likely be different.
|
||||
* They also create their own IoT network (on the soft-AP interface) Wi-Fi, so the AP credentials would likely be the same, suggesting the IoT networks will be keep synchronized (even though these are two distict Wi-Fi networks).
|
||||
* Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role.
|
||||
* Choose the peer library
|
||||
* Only for `libjuice`:
|
||||
- Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role.
|
||||
* Optionally: You can use `idf.py` `-D` and `-B` flag to keep separate build directories and sdkconfigs for these two roles
|
||||
```
|
||||
idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor
|
||||
@ -44,9 +55,129 @@ idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor
|
||||
* Join PEER2 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics.
|
||||
* Whenever you publish to a topic, all subscribed clients should receive the message, no matter which Wi-Fi network they're connected to.
|
||||
|
||||
## Example output
|
||||
|
||||
## With libjuice
|
||||
|
||||
```
|
||||
I (4746) esp_netif_handlers: sta ip: 192.168.0.40, mask: 255.255.255.0, gw: 192.168.0.1
|
||||
4: mosquitto version v2.0.20~3 starting
|
||||
4: Using default config.
|
||||
4: Opening ipv4 listen socket on port 1883.
|
||||
4: mosquitto version v2.0.20~3 running
|
||||
I (4756) serverless_mqtt1: desc: a=ice-ufrag:sGdl
|
||||
a=ice-pwd:R4IPGsFctITbT1dCZbfQTL
|
||||
a=ice-options:ice2,trickle
|
||||
|
||||
00:00:04 INFO agent.c:1100: Changing state to gathering
|
||||
I (4776) serverless_mqtt1: JUICE state change: gathering
|
||||
00:00:04 INFO agent.c:1100: Changing state to connecting
|
||||
I (4786) serverless_mqtt1: JUICE state change: connecting
|
||||
00:00:04 INFO agent.c:422: Using STUN server stun.l.google.com:19302
|
||||
00:00:04 INFO agent.c:1378: STUN server binding successful
|
||||
00:00:04 INFO agent.c:1397: Got STUN mapped address 185.194.44.31:62703 from server
|
||||
00:00:04 INFO agent.c:2428: Candidate gathering done
|
||||
I (5066) serverless_mqtt1: Gathering done
|
||||
I (5066) serverless_mqtt1: desc: {
|
||||
"desc": "a=ice-ufrag:sGdl\r\na=ice-pwd:R4IPGsFctITbT1dCZbfQTL\r\na=ice-options:ice2,trickle\r\n",
|
||||
"cand0": "a=candidate:1 1 UDP 2122317823 192.168.0.40 62703 typ host",
|
||||
"cand1": "a=candidate:2 1 UDP 1686109951 185.194.44.31 62703 typ srflx raddr 0.0.0.0 rport 0"
|
||||
}
|
||||
I (5096) serverless_mqtt1: Other event id:7
|
||||
```
|
||||
|
||||
### With esp-peer
|
||||
|
||||
```
|
||||
I (5992) esp_netif_handlers: sta ip: 192.168.0.42, mask: 255.255.255.0, gw: 192.168.0.1
|
||||
4: mosquitto version v2.0.20~3 starting
|
||||
4: Using default config.
|
||||
4: Opening ipv4 listen socket on port 1883.
|
||||
4: mosquitto version v2.0.20~3 running
|
||||
I (6702) esp-x509-crt-bundle: Certificate validated
|
||||
I (7982) APPRTC_SIG: result SUCCESS
|
||||
Initials set to 1
|
||||
I (7982) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 911
|
||||
I (8652) esp-x509-crt-bundle: Certificate validated
|
||||
Got url:stun:webrtc.espressif.com:3478 user_name: 1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw=
|
||||
I (10022) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 173
|
||||
I (10032) serverless_mqtt_webrtc: Signaling ice info handler 0x0
|
||||
I (10042) DTLS: Init SRTP OK
|
||||
I (11512) DTLS_SRTP: dtls_srtp init done
|
||||
I (11522) APPRTC_SIG: Registering signaling channel.
|
||||
I (11522) APPRTC_SIG: Connecting to wss://webrtc.espressif.com:8089/ws...
|
||||
I (11532) websocket_client: Started
|
||||
I (11532) serverless_mqtt_webrtc: Waiting for peer to connect
|
||||
I (12122) esp-x509-crt-bundle: Certificate validated
|
||||
I (13502) APPRTC_SIG: WEBSOCKET_EVENT_CONNECTED
|
||||
I (13502) APPRTC_SIG: send to remote : {"cmd":"register","roomid":"111116","clientid":"93827452"}
|
||||
I (13502) serverless_mqtt_webrtc: Peer state: 2
|
||||
I (13522) AGENT: Start agent as Controlling
|
||||
I (13522) PEER_DEF: Start DTLS role as 1
|
||||
I (13522) AGENT: Send STUN binding request
|
||||
I (13522) AGENT: Send allocate now
|
||||
Got error code 401
|
||||
I (13802) AGENT: Send allocate now
|
||||
I (14112) AGENT: 0 Get candidate success user:1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw=
|
||||
I (14112) APPRTC_SIG: Begin to send offer to https://webrtc.espressif.com/message/111116/93827452
|
||||
I (14782) esp-x509-crt-bundle: Certificate validated
|
||||
I (16472) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 21
|
||||
I (21602) PEER_DEF: A SRC: 4
|
||||
I (21602) PEER_DEF: Get peer role 0
|
||||
I (21602) PEER_DEF: Get peer role 0
|
||||
I (21602) AGENT: 0 Add remote type:2 185.194.44.31:60872
|
||||
I (21612) AGENT: 0 Add remote type:4 172.31.6.33:59012
|
||||
I (21612) AGENT: 0 Add remote type:1 192.168.0.41:60872
|
||||
0 Sorted pair 0 type: 1 local:192.168.0.42:63459 Remote:192.168.0.41:60872
|
||||
0 Sorted pair 1 type: 2 local:185.194.44.31:63459 Remote:185.194.44.31:60872
|
||||
0 Sorted pair 2 type: 4 local:172.31.6.33:59013 Remote:172.31.6.33:59012
|
||||
I (21642) serverless_mqtt_webrtc: Peer state: 3
|
||||
I (22002) AGENT: 0 Send binding request (cand:0) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:76e5004e797d87a62756c714
|
||||
I (22002) AGENT: 0 Send binding request (cand:0) local:185.194.44.31:63459 remote:185.194.44.31:60872 id:4c598d93643bd7252b449456
|
||||
I (22012) AGENT: 0 Send binding request (cand:0) local:172.31.6.33:59013 remote:172.31.6.33:59012 id:6db505597d157d8d71dfed43
|
||||
I (22022) AGENT: 0 send indication bind request
|
||||
|
||||
I (22202) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
|
||||
I (22202) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872
|
||||
I (22212) AGENT: 0 Select pair192.168.0.41:60872
|
||||
I (22212) AGENT: 0 Send binding request (cand:1) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:45cb977b6ea3bbbe3a984b90
|
||||
I (22222) AGENT: 0 PeerIndication recv local:172.31.6.33:59013 remote:172.31.6.33:59012
|
||||
I (22402) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
|
||||
I (22402) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872
|
||||
I (22412) AGENT: 0 Candidate responsed
|
||||
I (22412) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
|
||||
I (22422) AGENT: 0 Connection OK 192.168.0.41:60872
|
||||
I (22422) serverless_mqtt_webrtc: Peer state: 5
|
||||
I (22452) DTLS: Start to do server handshake
|
||||
|
||||
Works as 1
|
||||
I (23842) DTLS: SRTP connected OK
|
||||
I (23852) DTLS: Server handshake success
|
||||
I (23852) PEER_DEF: DTLS handshake success
|
||||
I (23852) serverless_mqtt_webrtc: Peer state: 6
|
||||
I (23852) serverless_mqtt_webrtc: Peer is connected!
|
||||
I (23862) serverless_mqtt: local client event id:7
|
||||
22: New connection from 192.168.4.1:63904 on port 1883.
|
||||
22: New client connected from 192.168.4.1:63904 as local_mqtt (p2, c1, k120).
|
||||
I (23892) serverless_mqtt: local client connected
|
||||
I (23872) serverless_mqtt: Everything is ready, exiting main task
|
||||
I (23892) main_task: Returned from app_main()
|
||||
I (24552) SCTP: 0 Receive chunk 1 SCTP_INIT
|
||||
I (24552) SCTP: 0 state 2
|
||||
I (24552) SCTP: Send INIT_ACK chunk
|
||||
I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO
|
||||
I (24762) SCTP: Send ECHO_ACK chunk
|
||||
I (24762) SCTP: 0 state 5
|
||||
I (24762) serverless_mqtt_webrtc: Peer state: 8
|
||||
I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO
|
||||
I (24772) SCTP: Send ECHO_ACK chunk
|
||||
I (24962) SCTP: Get DCEP esp_channel event:3 type:0 si:2
|
||||
I (24972) serverless_mqtt_webrtc: Peer state: 9
|
||||
```
|
||||
|
||||
## Warning
|
||||
|
||||
This example uses libjuice as a dependency:
|
||||
This example uses `libjuice` as a dependency:
|
||||
|
||||
* libjuice (UDP Interactive Connectivity Establishment): https://github.com/paullouisageneau/libjuice
|
||||
|
||||
|
@ -0,0 +1,28 @@
|
||||
From cdc43a56f5ea1ab1935f55f47f8644f5dd30825e Mon Sep 17 00:00:00 2001
|
||||
From: David Cermak <cermak@espressif.com>
|
||||
Date: Thu, 10 Jul 2025 11:09:57 +0200
|
||||
Subject: [PATCH] fix(media_lib): Remove deprecated freeRTOS header
|
||||
|
||||
---
|
||||
components/media_lib_sal/port/media_lib_os_freertos.c | 4 ++++
|
||||
1 file changed, 4 insertions(+)
|
||||
|
||||
diff --git a/components/media_lib_sal/port/media_lib_os_freertos.c b/components/media_lib_sal/port/media_lib_os_freertos.c
|
||||
index d248d59..aea0527 100644
|
||||
--- a/components/media_lib_sal/port/media_lib_os_freertos.c
|
||||
+++ b/components/media_lib_sal/port/media_lib_os_freertos.c
|
||||
@@ -40,8 +40,12 @@
|
||||
#include "esp_idf_version.h"
|
||||
|
||||
#if CONFIG_FREERTOS_ENABLE_TASK_SNAPSHOT
|
||||
+#if (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 2, 0))
|
||||
+#include "esp_private/freertos_debug.h"
|
||||
+#else
|
||||
#include "freertos/task_snapshot.h"
|
||||
#endif
|
||||
+#endif
|
||||
|
||||
#ifdef __XTENSA__
|
||||
#include "esp_debug_helpers.h"
|
||||
--
|
||||
2.43.0
|
27
components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh
Executable file
27
components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh
Executable file
@ -0,0 +1,27 @@
|
||||
#!/usr/bin/env bash
|
||||
set -e
|
||||
echo "bin_dir: $1"
|
||||
|
||||
bin_dir="$1"
|
||||
THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
ESP_PEER_VERSION="ccff3bd65cea750bf6c0abcf9d95b931ba9329f0"
|
||||
|
||||
ESP_PEER_URL="https://github.com/espressif/esp-webrtc-solution/archive/${ESP_PEER_VERSION}.zip"
|
||||
ESP_PEER_DIR="${bin_dir}/esp-peer"
|
||||
ZIP_PATH="${bin_dir}/esp-peer.zip"
|
||||
EXTRACTED_DIR="${ESP_PEER_DIR}/esp-webrtc-solution-${ESP_PEER_VERSION}"
|
||||
COMPONENTS_SRC="${EXTRACTED_DIR}/components"
|
||||
COMPONENTS_DST="${ESP_PEER_DIR}/components"
|
||||
PATCH_FILE_1="${THIS_DIR}/Remove-deprecated-freeRTOS-header.patch"
|
||||
PATCH_FILE_2="${THIS_DIR}/libpeer-Add-direct-dependency-to-libsrtp.patch"
|
||||
|
||||
# Download if not exists
|
||||
if [ ! -d "$EXTRACTED_DIR" ]; then
|
||||
echo "Downloading esp-peer ${ESP_PEER_VERSION}..."
|
||||
wget -O "$ZIP_PATH" "$ESP_PEER_URL"
|
||||
unzip -o "$ZIP_PATH" -d "$ESP_PEER_DIR"
|
||||
patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_1"
|
||||
patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_2"
|
||||
mv ${EXTRACTED_DIR}/components ${ESP_PEER_DIR}
|
||||
mv ${ESP_PEER_DIR}/components/esp_webrtc/impl/peer_default ${ESP_PEER_DIR}/components
|
||||
fi
|
@ -0,0 +1,23 @@
|
||||
From 695e057000698f4897b6c5802851499842e2fe31 Mon Sep 17 00:00:00 2001
|
||||
From: David Cermak <cermak@espressif.com>
|
||||
Date: Fri, 11 Jul 2025 16:59:21 +0200
|
||||
Subject: [PATCH] fix(libpeer): Add direct dependency to libsrtp
|
||||
|
||||
---
|
||||
components/esp_webrtc/impl/peer_default/CMakeLists.txt | 2 +-
|
||||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||||
|
||||
diff --git a/components/esp_webrtc/impl/peer_default/CMakeLists.txt b/components/esp_webrtc/impl/peer_default/CMakeLists.txt
|
||||
index 2af35cf..3fb4615 100644
|
||||
--- a/components/esp_webrtc/impl/peer_default/CMakeLists.txt
|
||||
+++ b/components/esp_webrtc/impl/peer_default/CMakeLists.txt
|
||||
@@ -2,6 +2,6 @@ idf_component_register(INCLUDE_DIRS ./include)
|
||||
|
||||
get_filename_component(BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR} NAME)
|
||||
add_prebuilt_library(${BASE_DIR} "${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}/libpeer_default.a"
|
||||
- PRIV_REQUIRES ${BASE_DIR} esp_timer)
|
||||
+ PRIV_REQUIRES ${BASE_DIR} esp_timer espressif__esp_libsrtp)
|
||||
target_link_libraries(${COMPONENT_LIB} INTERFACE "-L ${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}")
|
||||
target_link_libraries(${COMPONENT_LIB} INTERFACE peer_default)
|
||||
--
|
||||
2.43.0
|
@ -1,4 +1,14 @@
|
||||
if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER)
|
||||
set(PEER_BACKEND_SRC "peer_impl_webrtc.c")
|
||||
else()
|
||||
set(PEER_BACKEND_SRC "peer_impl_juice.c")
|
||||
endif()
|
||||
|
||||
idf_component_register(SRCS "serverless_mqtt.c"
|
||||
"wifi_connect.c"
|
||||
"${PEER_BACKEND_SRC}"
|
||||
INCLUDE_DIRS "."
|
||||
REQUIRES libjuice nvs_flash mqtt json esp_wifi)
|
||||
if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER)
|
||||
idf_component_optional_requires(PUBLIC media_lib_sal esp_webrtc peer_default)
|
||||
endif()
|
||||
|
@ -33,8 +33,40 @@ menu "Example Configuration"
|
||||
WiFi station password for the example to use.
|
||||
endmenu
|
||||
|
||||
choice EXAMPLE_PEER_LIB
|
||||
prompt "Choose peer library"
|
||||
default EXAMPLE_PEER_LIB_LIBJUICE
|
||||
help
|
||||
Choose the peer library to use for WebRTC communication.
|
||||
libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling)
|
||||
esp_peer: Use ESP-IDF specific peer library
|
||||
|
||||
config EXAMPLE_PEER_LIB_ESP_PEER
|
||||
bool "esp_peer"
|
||||
|
||||
config EXAMPLE_PEER_LIB_LIBJUICE
|
||||
bool "libjuice"
|
||||
endchoice
|
||||
|
||||
config EXAMPLE_WEBRTC_URL
|
||||
string "WebRTC server URL"
|
||||
depends on EXAMPLE_PEER_LIB_ESP_PEER
|
||||
default "https://webrtc.espressif.com/join/"
|
||||
help
|
||||
URL of WebRTC remote endpoint.
|
||||
|
||||
config EXAMPLE_WEBRTC_ROOM_ID
|
||||
string "WebRTC room ID"
|
||||
depends on EXAMPLE_PEER_LIB_ESP_PEER
|
||||
default "12345"
|
||||
help
|
||||
Room ID for WebRTC synchronisation.
|
||||
Could be a random number, but the same for both peers.
|
||||
|
||||
|
||||
config EXAMPLE_MQTT_BROKER_URI
|
||||
string "MQTT Broker URL"
|
||||
depends on EXAMPLE_PEER_LIB_LIBJUICE
|
||||
default "mqtt://mqtt.eclipseprojects.io"
|
||||
help
|
||||
URL of the mqtt broker use for synchronisation and exchanging
|
||||
@ -42,12 +74,14 @@ menu "Example Configuration"
|
||||
|
||||
config EXAMPLE_MQTT_SYNC_TOPIC
|
||||
string "MQTT topic for synchronisation"
|
||||
depends on EXAMPLE_PEER_LIB_LIBJUICE
|
||||
default "/topic/serverless_mqtt"
|
||||
help
|
||||
MQTT topic used fo synchronisation.
|
||||
|
||||
config EXAMPLE_STUN_SERVER
|
||||
string "Hostname of STUN server"
|
||||
depends on EXAMPLE_PEER_LIB_LIBJUICE
|
||||
default "stun.l.google.com"
|
||||
help
|
||||
STUN server hostname.
|
||||
@ -67,6 +101,7 @@ menu "Example Configuration"
|
||||
|
||||
choice EXAMPLE_SERVERLESS_ROLE
|
||||
prompt "Choose your role"
|
||||
depends on EXAMPLE_PEER_LIB_LIBJUICE
|
||||
default EXAMPLE_SERVERLESS_ROLE_PEER1
|
||||
help
|
||||
Choose either peer1 or peer2.
|
||||
|
@ -0,0 +1,18 @@
|
||||
/*
|
||||
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
|
||||
*
|
||||
* SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include "esp_random.h"
|
||||
#include "esp_sleep.h"
|
||||
#include "mosq_broker.h"
|
||||
|
||||
typedef void (*on_peer_recv_t)(const char *data, size_t size);
|
||||
|
||||
esp_err_t peer_init(on_peer_recv_t cb);
|
||||
|
||||
void peer_get_buffer(char ** buffer, size_t *buffer_len);
|
||||
|
||||
void peer_send(char* data, size_t size);
|
@ -0,0 +1,283 @@
|
||||
/*
|
||||
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
|
||||
*
|
||||
* SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
*/
|
||||
#include <stdio.h>
|
||||
#include "freertos/FreeRTOS.h"
|
||||
#include "freertos/event_groups.h"
|
||||
#include "mqtt_client.h"
|
||||
#include "esp_wifi.h"
|
||||
#include "esp_log.h"
|
||||
#include "esp_check.h"
|
||||
#include "juice/juice.h"
|
||||
#include "cJSON.h"
|
||||
#include "peer_impl.h"
|
||||
|
||||
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
|
||||
#define OUR_PEER "1"
|
||||
#define THEIR_PEER "2"
|
||||
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
|
||||
#define OUR_PEER "2"
|
||||
#define THEIR_PEER "1"
|
||||
#endif
|
||||
|
||||
#define PEER_SYNC0 BIT(0)
|
||||
#define PEER_SYNC1 BIT(1)
|
||||
#define PEER_SYNC2 BIT(2)
|
||||
#define PEER_FAIL BIT(3)
|
||||
#define PEER_GATHER_DONE BIT(4)
|
||||
#define PEER_DESC_PUBLISHED BIT(5)
|
||||
#define PEER_CONNECTED BIT(6)
|
||||
|
||||
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
|
||||
|
||||
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
|
||||
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
|
||||
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
|
||||
|
||||
static const char *TAG = "serverless_mqtt" OUR_PEER;
|
||||
static char s_buffer[MAX_BUFFER_SIZE];
|
||||
static EventGroupHandle_t s_state = NULL;
|
||||
static juice_agent_t *s_agent = NULL;
|
||||
static cJSON *s_peer_desc_json = NULL;
|
||||
static char *s_peer_desc = NULL;
|
||||
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
|
||||
static on_peer_recv_t s_on_recv = NULL;
|
||||
|
||||
char *wifi_get_ipv4(wifi_interface_t interface);
|
||||
static esp_err_t sync_peers(void);
|
||||
static esp_err_t create_candidates(void);
|
||||
|
||||
void peer_get_buffer(char ** buffer, size_t *buffer_len)
|
||||
{
|
||||
if (buffer && buffer_len) {
|
||||
*buffer = s_buffer;
|
||||
*buffer_len = MAX_BUFFER_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
void peer_send(char* data, size_t size)
|
||||
{
|
||||
juice_send(s_agent, data, size);
|
||||
}
|
||||
|
||||
esp_err_t peer_init(on_peer_recv_t cb)
|
||||
{
|
||||
esp_err_t ret = ESP_FAIL;
|
||||
ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback");
|
||||
s_on_recv = cb;
|
||||
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
|
||||
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
|
||||
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
|
||||
if (bits & PEER_CONNECTED) {
|
||||
ESP_LOGI(TAG, "Peer is connected!");
|
||||
return ESP_OK;
|
||||
}
|
||||
err:
|
||||
ESP_LOGE(TAG, "Failed to init peer");
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
|
||||
{
|
||||
esp_mqtt_event_handle_t event = event_data;
|
||||
esp_mqtt_client_handle_t client = event->client;
|
||||
switch ((esp_mqtt_event_id_t)event_id) {
|
||||
case MQTT_EVENT_CONNECTED:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
|
||||
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
|
||||
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
|
||||
}
|
||||
xEventGroupSetBits(s_state, PEER_SYNC0);
|
||||
break;
|
||||
case MQTT_EVENT_DISCONNECTED:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_DATA:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
|
||||
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
|
||||
printf("DATA=%.*s\r\n", event->data_len, event->data);
|
||||
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
|
||||
break;
|
||||
}
|
||||
EventBits_t bits = xEventGroupGetBits(s_state);
|
||||
if (event->data_len > 1 && s_agent) {
|
||||
cJSON *root = cJSON_Parse(event->data);
|
||||
if (root == NULL) {
|
||||
break;
|
||||
}
|
||||
cJSON *desc = cJSON_GetObjectItem(root, "desc");
|
||||
if (desc == NULL) {
|
||||
cJSON_Delete(root);
|
||||
break;
|
||||
}
|
||||
printf("desc->valuestring:%s\n", desc->valuestring);
|
||||
juice_set_remote_description(s_agent, desc->valuestring);
|
||||
char cand_name[] = "cand0";
|
||||
while (true) {
|
||||
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
|
||||
if (cand == NULL) {
|
||||
break;
|
||||
}
|
||||
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
|
||||
juice_add_remote_candidate(s_agent, cand->valuestring);
|
||||
cand_name[4]++;
|
||||
}
|
||||
cJSON_Delete(root);
|
||||
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
|
||||
// and destroy the mqtt client
|
||||
}
|
||||
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
|
||||
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
|
||||
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
|
||||
xEventGroupSetBits(s_state, PEER_SYNC2);
|
||||
} else {
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
}
|
||||
}
|
||||
#else
|
||||
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
|
||||
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
|
||||
xEventGroupSetBits(s_state, PEER_SYNC1);
|
||||
} else {
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
}
|
||||
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
|
||||
xEventGroupSetBits(s_state, PEER_SYNC2);
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
case MQTT_EVENT_ERROR:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
break;
|
||||
default:
|
||||
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static esp_err_t sync_peers(void)
|
||||
{
|
||||
esp_err_t ret = ESP_OK;
|
||||
esp_mqtt_client_config_t mqtt_cfg = {
|
||||
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
|
||||
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
|
||||
};
|
||||
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
|
||||
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
|
||||
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
|
||||
err, TAG, "Failed to register mqtt event handler");
|
||||
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
|
||||
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
|
||||
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
|
||||
ESP_LOGI(TAG, "Waiting for the other peer...");
|
||||
const int max_sync_retry = 60;
|
||||
int retry = 0;
|
||||
while (true) {
|
||||
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
|
||||
if (bits & PEER_SYNC2) {
|
||||
break;
|
||||
}
|
||||
if (bits & PEER_SYNC1) {
|
||||
continue;
|
||||
}
|
||||
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
|
||||
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
|
||||
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
|
||||
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
|
||||
ESP_FAIL, TAG, "Failed to publish mqtt message");
|
||||
#endif
|
||||
}
|
||||
ESP_LOGI(TAG, "Sync done");
|
||||
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
|
||||
ESP_FAIL, TAG, "Failed to publish peer's description");
|
||||
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
|
||||
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
|
||||
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
|
||||
err:
|
||||
free(s_peer_desc);
|
||||
esp_mqtt_client_destroy(client);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
|
||||
{
|
||||
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
|
||||
if (state == JUICE_STATE_CONNECTED) {
|
||||
xEventGroupSetBits(s_state, PEER_CONNECTED);
|
||||
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
|
||||
esp_restart();
|
||||
}
|
||||
}
|
||||
|
||||
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
|
||||
{
|
||||
static uint8_t cand_nr = 0;
|
||||
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
|
||||
char cand_name[] = "cand0";
|
||||
cand_name[4] += cand_nr++;
|
||||
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
|
||||
}
|
||||
}
|
||||
|
||||
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
|
||||
{
|
||||
ESP_LOGI(TAG, "Gathering done");
|
||||
if (s_state) {
|
||||
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
|
||||
{
|
||||
if (s_local_mqtt) {
|
||||
s_on_recv(data, size);
|
||||
} else {
|
||||
ESP_LOGI(TAG, "No local mqtt client, dropping data");
|
||||
}
|
||||
}
|
||||
|
||||
static esp_err_t create_candidates(void)
|
||||
{
|
||||
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
|
||||
s_peer_desc_json = cJSON_CreateObject();
|
||||
esp_err_t ret = ESP_OK;
|
||||
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
|
||||
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
|
||||
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
|
||||
.stun_server_port = 19302,
|
||||
.cb_state_changed = juice_state,
|
||||
.cb_candidate = juice_candidate,
|
||||
.cb_gathering_done = juice_gathering_done,
|
||||
.cb_recv = juice_recv,
|
||||
};
|
||||
|
||||
s_agent = juice_create(&config);
|
||||
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
|
||||
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
|
||||
ESP_FAIL, err, TAG, "Failed to get local description");
|
||||
ESP_LOGI(TAG, "desc: %s", s_buffer);
|
||||
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
|
||||
|
||||
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
|
||||
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
|
||||
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
|
||||
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
|
||||
s_peer_desc = cJSON_Print(s_peer_desc_json);
|
||||
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
|
||||
cJSON_Delete(s_peer_desc_json);
|
||||
return ESP_OK;
|
||||
|
||||
err:
|
||||
juice_destroy(s_agent);
|
||||
s_agent = NULL;
|
||||
cJSON_Delete(s_peer_desc_json);
|
||||
s_peer_desc_json = NULL;
|
||||
return ret;
|
||||
}
|
@ -0,0 +1,248 @@
|
||||
/*
|
||||
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
|
||||
*
|
||||
* SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
*/
|
||||
#include "media_lib_os.h"
|
||||
#include "freertos/FreeRTOS.h"
|
||||
#include "freertos/event_groups.h"
|
||||
#include "media_lib_adapter.h"
|
||||
#include "media_lib_os.h"
|
||||
#include "esp_log.h"
|
||||
#include "esp_webrtc_defaults.h"
|
||||
#include "esp_peer_default.h"
|
||||
#include "common.h"
|
||||
#include "esp_check.h"
|
||||
#include "peer_impl.h"
|
||||
#include "sdkconfig.h"
|
||||
|
||||
#define WEBRTC_URL (CONFIG_EXAMPLE_WEBRTC_URL CONFIG_EXAMPLE_WEBRTC_ROOM_ID)
|
||||
#define PEER_CONNECTED BIT(0)
|
||||
#define PEER_DISCONNECTED BIT(1)
|
||||
#define MAX_BUFFER_SIZE (4*1024)
|
||||
|
||||
static EventGroupHandle_t s_state = NULL;
|
||||
static const char *TAG = "serverless_mqtt_webrtc";
|
||||
|
||||
void peer_get_buffer(char ** buffer, size_t *buffer_len)
|
||||
{
|
||||
static char s_buffer[MAX_BUFFER_SIZE];
|
||||
if (buffer && buffer_len) {
|
||||
*buffer = s_buffer;
|
||||
*buffer_len = MAX_BUFFER_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
static int start_webrtc(char *url);
|
||||
static int stop_webrtc(void);
|
||||
|
||||
static on_peer_recv_t s_on_recv = NULL;
|
||||
static esp_peer_signaling_handle_t signaling = NULL;
|
||||
static esp_peer_handle_t peer = NULL;
|
||||
static bool peer_running = false;
|
||||
|
||||
static void thread_scheduler(const char *thread_name, media_lib_thread_cfg_t *thread_cfg)
|
||||
{
|
||||
if (strcmp(thread_name, "pc_task") == 0) {
|
||||
thread_cfg->stack_size = 25 * 1024;
|
||||
thread_cfg->priority = 18;
|
||||
thread_cfg->core_id = 1;
|
||||
}
|
||||
}
|
||||
|
||||
esp_err_t peer_init(on_peer_recv_t cb)
|
||||
{
|
||||
esp_err_t ret = ESP_OK;
|
||||
s_on_recv = cb;
|
||||
s_state = xEventGroupCreate();
|
||||
media_lib_add_default_adapter();
|
||||
media_lib_thread_set_schedule_cb(thread_scheduler);
|
||||
ESP_RETURN_ON_FALSE(s_state, ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
|
||||
ESP_GOTO_ON_FALSE(start_webrtc(WEBRTC_URL) == ESP_PEER_ERR_NONE, ESP_FAIL, err, TAG, "Failed to start webRTC");
|
||||
ESP_LOGI(TAG, "Waiting for peer to connect");
|
||||
int i = 0;
|
||||
while (1) {
|
||||
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(1000));
|
||||
if (bits & PEER_CONNECTED) {
|
||||
ESP_LOGI(TAG, "Peer is connected!");
|
||||
return ret;
|
||||
}
|
||||
ESP_GOTO_ON_FALSE(i++ < 100, ESP_ERR_TIMEOUT, err, TAG, "Peer connection timeout");
|
||||
if (peer) {
|
||||
esp_peer_query(peer);
|
||||
}
|
||||
}
|
||||
|
||||
err:
|
||||
vEventGroupDelete(s_state);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
static int peer_state_handler(esp_peer_state_t state, void* ctx)
|
||||
{
|
||||
ESP_LOGI(TAG, "Peer state: %d", state);
|
||||
if (state == ESP_PEER_STATE_CONNECTED) {
|
||||
xEventGroupSetBits(s_state, PEER_CONNECTED);
|
||||
} else if (state == ESP_PEER_STATE_DISCONNECTED) {
|
||||
xEventGroupSetBits(s_state, PEER_DISCONNECTED);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx)
|
||||
{
|
||||
if (msg->type == ESP_PEER_MSG_TYPE_SDP) {
|
||||
// Send local SDP to signaling server
|
||||
esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx)
|
||||
{
|
||||
ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx)
|
||||
{
|
||||
if (frame && frame->size > 0) {
|
||||
s_on_recv((char*)frame->data, frame->size);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void pc_task(void *arg)
|
||||
{
|
||||
while (peer_running) {
|
||||
esp_peer_main_loop(peer);
|
||||
media_lib_thread_sleep(20);
|
||||
}
|
||||
media_lib_thread_destroy(NULL);
|
||||
}
|
||||
|
||||
static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx)
|
||||
{
|
||||
if (peer == NULL) {
|
||||
esp_peer_default_cfg_t peer_cfg = {
|
||||
.agent_recv_timeout = 500,
|
||||
};
|
||||
esp_peer_cfg_t cfg = {
|
||||
.server_lists = &info->server_info,
|
||||
.server_num = 1,
|
||||
.audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV,
|
||||
.audio_info = {
|
||||
.codec = ESP_PEER_AUDIO_CODEC_G711A,
|
||||
},
|
||||
.enable_data_channel = true,
|
||||
.role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED,
|
||||
.on_state = peer_state_handler,
|
||||
.on_msg = peer_msg_handler,
|
||||
.on_video_info = peer_video_info_handler,
|
||||
.on_audio_info = peer_audio_info_handler,
|
||||
.on_video_data = peer_video_data_handler,
|
||||
.on_audio_data = peer_audio_data_handler,
|
||||
.on_data = peer_data_handler,
|
||||
.ctx = ctx,
|
||||
.extra_cfg = &peer_cfg,
|
||||
.extra_size = sizeof(esp_peer_default_cfg_t),
|
||||
};
|
||||
int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer);
|
||||
if (ret != ESP_PEER_ERR_NONE) {
|
||||
return ret;
|
||||
}
|
||||
media_lib_thread_handle_t thread = NULL;
|
||||
peer_running = true;
|
||||
media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL);
|
||||
if (thread == NULL) {
|
||||
peer_running = false;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int signaling_connected_handler(void* ctx)
|
||||
{
|
||||
if (peer) {
|
||||
return esp_peer_new_connection(peer);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx)
|
||||
{
|
||||
if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) {
|
||||
esp_peer_close(peer);
|
||||
peer = NULL;
|
||||
} else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) {
|
||||
// Receive remote SDP
|
||||
if (peer) {
|
||||
esp_peer_send_msg(peer, (esp_peer_msg_t*)msg);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int signaling_close_handler(void *ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int start_signaling(char* url)
|
||||
{
|
||||
esp_peer_signaling_cfg_t cfg = {
|
||||
.signal_url = url,
|
||||
.on_ice_info = signaling_ice_info_handler,
|
||||
.on_connected = signaling_connected_handler,
|
||||
.on_msg = signaling_msg_handler,
|
||||
.on_close = signaling_close_handler,
|
||||
};
|
||||
// Use APPRTC signaling
|
||||
return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling);
|
||||
}
|
||||
|
||||
static int start_webrtc(char *url)
|
||||
{
|
||||
stop_webrtc();
|
||||
return start_signaling(url);
|
||||
}
|
||||
|
||||
static int stop_webrtc(void)
|
||||
{
|
||||
peer_running = false;
|
||||
if (peer) {
|
||||
esp_peer_close(peer);
|
||||
peer = NULL;
|
||||
}
|
||||
if (signaling) {
|
||||
esp_peer_signaling_stop(signaling);
|
||||
signaling = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void peer_send(char* data, size_t size)
|
||||
{
|
||||
esp_peer_data_frame_t data_frame = {
|
||||
.type = ESP_PEER_DATA_CHANNEL_DATA,
|
||||
.data = (uint8_t*)data,
|
||||
.size = size,
|
||||
};
|
||||
esp_peer_send_data(peer, &data_frame);
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
|
||||
* SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
|
||||
*
|
||||
* SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
*/
|
||||
@ -13,30 +13,9 @@
|
||||
#include "esp_check.h"
|
||||
#include "esp_sleep.h"
|
||||
#include "mosq_broker.h"
|
||||
#include "juice/juice.h"
|
||||
#include "cJSON.h"
|
||||
#include "peer_impl.h"
|
||||
|
||||
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
|
||||
#define OUR_PEER "1"
|
||||
#define THEIR_PEER "2"
|
||||
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
|
||||
#define OUR_PEER "2"
|
||||
#define THEIR_PEER "1"
|
||||
#endif
|
||||
|
||||
#define PEER_SYNC0 BIT(0)
|
||||
#define PEER_SYNC1 BIT(1)
|
||||
#define PEER_SYNC2 BIT(2)
|
||||
#define PEER_FAIL BIT(3)
|
||||
#define PEER_GATHER_DONE BIT(4)
|
||||
#define PEER_DESC_PUBLISHED BIT(5)
|
||||
#define PEER_CONNECTED BIT(6)
|
||||
|
||||
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
|
||||
|
||||
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
|
||||
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
|
||||
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
|
||||
#define ALIGN(size) (((size) + 3U) & ~(3U))
|
||||
|
||||
typedef struct message_wrap {
|
||||
uint16_t topic_len;
|
||||
@ -44,196 +23,18 @@ typedef struct message_wrap {
|
||||
char data[];
|
||||
} __attribute__((packed)) message_wrap_t;
|
||||
|
||||
static const char *TAG = "serverless_mqtt" OUR_PEER;
|
||||
static char s_buffer[MAX_BUFFER_SIZE];
|
||||
static EventGroupHandle_t s_state = NULL;
|
||||
static juice_agent_t *s_agent = NULL;
|
||||
static cJSON *s_peer_desc_json = NULL;
|
||||
static char *s_peer_desc = NULL;
|
||||
static const char *TAG = "serverless_mqtt";
|
||||
|
||||
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
|
||||
|
||||
char *wifi_get_ipv4(wifi_interface_t interface);
|
||||
esp_err_t wifi_connect(void);
|
||||
static esp_err_t sync_peers(void);
|
||||
static esp_err_t create_candidates(void);
|
||||
static esp_err_t create_local_client(void);
|
||||
static esp_err_t create_local_broker(void);
|
||||
|
||||
void app_main(void)
|
||||
{
|
||||
__attribute__((__unused__)) esp_err_t ret;
|
||||
ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi");
|
||||
ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker");
|
||||
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
|
||||
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
|
||||
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
|
||||
if (bits & PEER_CONNECTED) {
|
||||
ESP_LOGI(TAG, "Peer is connected!");
|
||||
ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client");
|
||||
ESP_LOGI(TAG, "Everything is ready, exiting main task");
|
||||
return;
|
||||
}
|
||||
err:
|
||||
ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)");
|
||||
esp_deep_sleep(1000000LL * (esp_random() % 20));
|
||||
}
|
||||
esp_err_t peer_init(on_peer_recv_t cb);
|
||||
|
||||
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
|
||||
{
|
||||
esp_mqtt_event_handle_t event = event_data;
|
||||
esp_mqtt_client_handle_t client = event->client;
|
||||
switch ((esp_mqtt_event_id_t)event_id) {
|
||||
case MQTT_EVENT_CONNECTED:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
|
||||
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
|
||||
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
|
||||
}
|
||||
xEventGroupSetBits(s_state, PEER_SYNC0);
|
||||
break;
|
||||
case MQTT_EVENT_DISCONNECTED:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_DATA:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
|
||||
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
|
||||
printf("DATA=%.*s\r\n", event->data_len, event->data);
|
||||
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
|
||||
break;
|
||||
}
|
||||
EventBits_t bits = xEventGroupGetBits(s_state);
|
||||
if (event->data_len > 1 && s_agent) {
|
||||
cJSON *root = cJSON_Parse(event->data);
|
||||
if (root == NULL) {
|
||||
break;
|
||||
}
|
||||
cJSON *desc = cJSON_GetObjectItem(root, "desc");
|
||||
if (desc == NULL) {
|
||||
cJSON_Delete(root);
|
||||
break;
|
||||
}
|
||||
printf("desc->valuestring:%s\n", desc->valuestring);
|
||||
juice_set_remote_description(s_agent, desc->valuestring);
|
||||
char cand_name[] = "cand0";
|
||||
while (true) {
|
||||
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
|
||||
if (cand == NULL) {
|
||||
break;
|
||||
}
|
||||
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
|
||||
juice_add_remote_candidate(s_agent, cand->valuestring);
|
||||
cand_name[4]++;
|
||||
}
|
||||
cJSON_Delete(root);
|
||||
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
|
||||
// and destroy the mqtt client
|
||||
}
|
||||
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
|
||||
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
|
||||
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
|
||||
xEventGroupSetBits(s_state, PEER_SYNC2);
|
||||
} else {
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
}
|
||||
}
|
||||
#else
|
||||
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
|
||||
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
|
||||
xEventGroupSetBits(s_state, PEER_SYNC1);
|
||||
} else {
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
}
|
||||
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
|
||||
xEventGroupSetBits(s_state, PEER_SYNC2);
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
case MQTT_EVENT_ERROR:
|
||||
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
|
||||
xEventGroupSetBits(s_state, PEER_FAIL);
|
||||
break;
|
||||
default:
|
||||
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static esp_err_t sync_peers(void)
|
||||
{
|
||||
esp_err_t ret = ESP_OK;
|
||||
esp_mqtt_client_config_t mqtt_cfg = {
|
||||
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
|
||||
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
|
||||
};
|
||||
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
|
||||
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
|
||||
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
|
||||
err, TAG, "Failed to register mqtt event handler");
|
||||
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
|
||||
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
|
||||
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
|
||||
ESP_LOGI(TAG, "Waiting for the other peer...");
|
||||
const int max_sync_retry = 60;
|
||||
int retry = 0;
|
||||
while (true) {
|
||||
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
|
||||
if (bits & PEER_SYNC2) {
|
||||
break;
|
||||
}
|
||||
if (bits & PEER_SYNC1) {
|
||||
continue;
|
||||
}
|
||||
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
|
||||
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
|
||||
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
|
||||
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
|
||||
ESP_FAIL, TAG, "Failed to publish mqtt message");
|
||||
#endif
|
||||
}
|
||||
ESP_LOGI(TAG, "Sync done");
|
||||
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
|
||||
ESP_FAIL, TAG, "Failed to publish peer's description");
|
||||
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
|
||||
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
|
||||
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
|
||||
err:
|
||||
free(s_peer_desc);
|
||||
esp_mqtt_client_destroy(client);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
|
||||
{
|
||||
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
|
||||
if (state == JUICE_STATE_CONNECTED) {
|
||||
xEventGroupSetBits(s_state, PEER_CONNECTED);
|
||||
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
|
||||
esp_restart();
|
||||
}
|
||||
}
|
||||
|
||||
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
|
||||
{
|
||||
static uint8_t cand_nr = 0;
|
||||
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
|
||||
char cand_name[] = "cand0";
|
||||
cand_name[4] += cand_nr++;
|
||||
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
|
||||
}
|
||||
}
|
||||
|
||||
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
|
||||
{
|
||||
ESP_LOGI(TAG, "Gathering done");
|
||||
if (s_state) {
|
||||
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
|
||||
}
|
||||
}
|
||||
|
||||
#define ALIGN(size) (((size) + 3U) & ~(3U))
|
||||
|
||||
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
|
||||
static void peer_recv(const char *data, size_t size)
|
||||
{
|
||||
if (s_local_mqtt) {
|
||||
message_wrap_t *message = (message_wrap_t *)data;
|
||||
@ -250,45 +51,21 @@ static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void
|
||||
ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload);
|
||||
esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static esp_err_t create_candidates(void)
|
||||
void app_main(void)
|
||||
{
|
||||
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
|
||||
s_peer_desc_json = cJSON_CreateObject();
|
||||
esp_err_t ret = ESP_OK;
|
||||
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
|
||||
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
|
||||
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
|
||||
.stun_server_port = 19302,
|
||||
.cb_state_changed = juice_state,
|
||||
.cb_candidate = juice_candidate,
|
||||
.cb_gathering_done = juice_gathering_done,
|
||||
.cb_recv = juice_recv,
|
||||
};
|
||||
|
||||
s_agent = juice_create(&config);
|
||||
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
|
||||
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
|
||||
ESP_FAIL, err, TAG, "Failed to get local description");
|
||||
ESP_LOGI(TAG, "desc: %s", s_buffer);
|
||||
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
|
||||
|
||||
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
|
||||
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
|
||||
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
|
||||
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
|
||||
s_peer_desc = cJSON_Print(s_peer_desc_json);
|
||||
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
|
||||
cJSON_Delete(s_peer_desc_json);
|
||||
return ESP_OK;
|
||||
|
||||
__attribute__((__unused__)) esp_err_t ret;
|
||||
ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi");
|
||||
ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker");
|
||||
ESP_GOTO_ON_ERROR(peer_init(peer_recv), err, TAG, "Failed to init peer library");
|
||||
ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client");
|
||||
ESP_LOGI(TAG, "Everything is ready, exiting main task");
|
||||
return;
|
||||
err:
|
||||
juice_destroy(s_agent);
|
||||
s_agent = NULL;
|
||||
cJSON_Delete(s_peer_desc_json);
|
||||
s_peer_desc_json = NULL;
|
||||
return ret;
|
||||
ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)");
|
||||
esp_deep_sleep(1000000LL * (esp_random() % 20));
|
||||
}
|
||||
|
||||
static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data)
|
||||
@ -342,11 +119,19 @@ static void handle_message(char *client, char *topic, char *payload, int len, in
|
||||
ESP_LOGI(TAG, "handle_message topic:%s", topic);
|
||||
ESP_LOGI(TAG, "handle_message data:%.*s", len, payload);
|
||||
ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain);
|
||||
if (s_local_mqtt && s_agent) {
|
||||
if (s_local_mqtt) {
|
||||
static char *s_buffer = NULL;
|
||||
static size_t s_buffer_len = 0;
|
||||
if (s_buffer == NULL || s_buffer_len == 0) {
|
||||
peer_get_buffer(&s_buffer, &s_buffer_len);
|
||||
if (s_buffer == NULL || s_buffer_len == 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
int topic_len = strlen(topic) + 1; // null term
|
||||
int topic_len_aligned = ALIGN(topic_len);
|
||||
int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len;
|
||||
if (total_msg_len > MAX_BUFFER_SIZE) {
|
||||
if (total_msg_len > s_buffer_len) {
|
||||
ESP_LOGE(TAG, "Fail to forward, message too long");
|
||||
return;
|
||||
}
|
||||
@ -356,7 +141,7 @@ static void handle_message(char *client, char *topic, char *payload, int len, in
|
||||
|
||||
memcpy(s_buffer + 4, topic, topic_len);
|
||||
memcpy(s_buffer + 4 + topic_len_aligned, payload, len);
|
||||
juice_send(s_agent, s_buffer, total_msg_len);
|
||||
peer_send(s_buffer, total_msg_len);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,2 @@
|
||||
CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=n
|
||||
CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=y
|
@ -0,0 +1,5 @@
|
||||
CONFIG_IDF_TARGET="esp32"
|
||||
CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=y
|
||||
CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=n
|
||||
CONFIG_SPIRAM=y
|
||||
CONFIG_MBEDTLS_EXTERNAL_MEM_ALLOC=y
|
@ -1,3 +1,6 @@
|
||||
CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y
|
||||
CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384
|
||||
CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768
|
||||
CONFIG_LWIP_SNTP_MAX_SERVERS=2
|
||||
CONFIG_MBEDTLS_SSL_PROTO_DTLS=y
|
||||
CONFIG_MBEDTLS_SSL_DTLS_SRTP=y
|
||||
|
Reference in New Issue
Block a user