diff --git a/.github/workflows/mosq__build.yml b/.github/workflows/mosq__build.yml index 0182a9f33..ee28aa98a 100644 --- a/.github/workflows/mosq__build.yml +++ b/.github/workflows/mosq__build.yml @@ -14,10 +14,15 @@ jobs: strategy: matrix: idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2", "release-v5.1"] + example: ["broker", "serverless_mqtt"] + exclude: + - idf_ver: "release-v5.1" + example: "serverless_mqtt" # serverless_mqtt is not supported due to esp-peer + runs-on: ubuntu-22.04 container: espressif/idf:${{ matrix.idf_ver }} env: - TEST_DIR: components/mosquitto/examples + TEST_DIR: components/mosquitto/examples/${{ matrix.example }} TARGET_TEST: broker TARGET_TEST_DIR: build_esp32_default steps: @@ -31,14 +36,17 @@ jobs: . ${IDF_PATH}/export.sh pip install idf-component-manager idf-build-apps --upgrade python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml - # upload only the target test artifacts - cd ${TEST_DIR}/${TARGET_TEST} - ${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR} - zip -qur artifacts.zip ${TARGET_TEST_DIR} + if [ "${{ matrix.example }}" == "${TARGET_TEST}" ]; then + # upload only the target test artifacts + cd ${TEST_DIR} + ${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR} + zip -qur artifacts.zip ${TARGET_TEST_DIR} + fi - uses: actions/upload-artifact@v4 + if: ${{ matrix.example == 'broker' }} with: name: mosq_target_esp32_${{ matrix.idf_ver }} - path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip + path: ${{ env.TEST_DIR }}/artifacts.zip if-no-files-found: error test_mosq: diff --git a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt index c9935c956..beea8a1ff 100644 --- a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt @@ -3,4 +3,21 @@ cmake_minimum_required(VERSION 3.16) include($ENV{IDF_PATH}/tools/cmake/project.cmake) + +# Setup ESP-PEER from GitHub repo (but it's supported only on certain targets) +set(ESP_PEER_COMPATIBLE_TARGETS "esp32s2" "esp32s3" "esp32p4" "esp32") +if(IDF_TARGET IN_LIST ESP_PEER_COMPATIBLE_TARGETS) +execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup/install.sh + ${CMAKE_BINARY_DIR} + WORKING_DIRECTORY ${CMAKE_BINARY_DIR} + RESULT_VARIABLE script_result) + +if(script_result) + message(FATAL_ERROR "Script esp_peer_setup.sh failed with exit code ${script_result}") +endif() +list(APPEND EXTRA_COMPONENT_DIRS "${CMAKE_BINARY_DIR}/esp-peer/components/") +else() + message(STATUS "ESP-PEER is not compatible with this target") +endif() + project(serverless_mqtt) diff --git a/components/mosquitto/examples/serverless_mqtt/README.md b/components/mosquitto/examples/serverless_mqtt/README.md index 5ee8369e6..492e2e570 100644 --- a/components/mosquitto/examples/serverless_mqtt/README.md +++ b/components/mosquitto/examples/serverless_mqtt/README.md @@ -3,14 +3,19 @@ MQTT served by (two) mosquitto's running on two ESP chips. * Leverages MQTT connectivity between two private networks without cloud premisses. -* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE protocol, by [libjuice](https://github.com/paullouisageneau/libjuice)). +* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE/WebRTC protocol) + +## Peer to peer connection + +Could be established either by [libjuice](https://github.com/paullouisageneau/libjuice) or [esp-webRTC](https://github.com/espressif/esp-webrtc-solution). While `juice` is just a low level implementation of ICE-UDP, we need to provide some signalling and synchronization, the `WebRTC` is full-fledged solution to establish a peer connection using standardized signalling, security and transfer protocols. + ## How it works This example needs two ESP32 chipsets, that will create two separate Wi-Fi networks (IoT networks) used for IoT devices. Each IoT network is served by an MQTT server (using mosquitto component). This example will also synchronize these two MQTT brokers, as if there was only one IoT network with one broker. -This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) to traverse NATs, which enabling direct connection between two private networks behind NATs. +This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) or esp-webRTC (which implements WebRTC) to traverse NATs, which enabling direct connection between two private networks behind NATs. * Diagram @@ -19,12 +24,16 @@ This example creates a peer to peer connection between two chipsets to keep them Here's a step-by-step procedure of establishing this remote connection: 1) Initialize and start Wi-Fi AP (for IoT networks) and Wi-Fi station (for internet connection) 2) Start mosquitto broker on IoT network -3) Start libjuice to gather connection candidates -4) Synchronize using a public MQTT broker and exchange ICE descriptors -5) Establish ICE UDP connection between the two ESP32 chipsets +3) Start peer to peer connection + - In case of `libjuice` + - gather connection candidates + - synchronize using a public MQTT broker and exchange ICE descriptors + - establish ICE UDP connection between the two ESP32 chipsets + - In case of `webRTC` simply start the connection. 6) Start forwarding mqtt messages - Each remote datagram (received from ICE-UDP channel) is re-published to the local MQTT server - - Each local MQTT message (received from mosquitto on_message callback) is sent in ICE-UDP datagram + - Each local MQTT message (received from mosquitto on_message callback) is sent in a peer message + ## How to use this example @@ -33,7 +42,9 @@ You need two ESP32 devices that support Wi-Fi station and Wi-Fi software access * Configure Wi-Fi credentials for both devices on both interfaces * These devices would be deployed in distinct Wi-Fi environments, so the Wi-Fi station credentials would likely be different. * They also create their own IoT network (on the soft-AP interface) Wi-Fi, so the AP credentials would likely be the same, suggesting the IoT networks will be keep synchronized (even though these are two distict Wi-Fi networks). -* Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role. +* Choose the peer library +* Only for `libjuice`: + - Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role. * Optionally: You can use `idf.py` `-D` and `-B` flag to keep separate build directories and sdkconfigs for these two roles ``` idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor @@ -44,9 +55,129 @@ idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor * Join PEER2 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics. * Whenever you publish to a topic, all subscribed clients should receive the message, no matter which Wi-Fi network they're connected to. +## Example output + +## With libjuice + +``` +I (4746) esp_netif_handlers: sta ip: 192.168.0.40, mask: 255.255.255.0, gw: 192.168.0.1 +4: mosquitto version v2.0.20~3 starting +4: Using default config. +4: Opening ipv4 listen socket on port 1883. +4: mosquitto version v2.0.20~3 running +I (4756) serverless_mqtt1: desc: a=ice-ufrag:sGdl +a=ice-pwd:R4IPGsFctITbT1dCZbfQTL +a=ice-options:ice2,trickle + +00:00:04 INFO agent.c:1100: Changing state to gathering +I (4776) serverless_mqtt1: JUICE state change: gathering +00:00:04 INFO agent.c:1100: Changing state to connecting +I (4786) serverless_mqtt1: JUICE state change: connecting +00:00:04 INFO agent.c:422: Using STUN server stun.l.google.com:19302 +00:00:04 INFO agent.c:1378: STUN server binding successful +00:00:04 INFO agent.c:1397: Got STUN mapped address 185.194.44.31:62703 from server +00:00:04 INFO agent.c:2428: Candidate gathering done +I (5066) serverless_mqtt1: Gathering done +I (5066) serverless_mqtt1: desc: { + "desc": "a=ice-ufrag:sGdl\r\na=ice-pwd:R4IPGsFctITbT1dCZbfQTL\r\na=ice-options:ice2,trickle\r\n", + "cand0": "a=candidate:1 1 UDP 2122317823 192.168.0.40 62703 typ host", + "cand1": "a=candidate:2 1 UDP 1686109951 185.194.44.31 62703 typ srflx raddr 0.0.0.0 rport 0" +} +I (5096) serverless_mqtt1: Other event id:7 +``` + +### With esp-peer + +``` +I (5992) esp_netif_handlers: sta ip: 192.168.0.42, mask: 255.255.255.0, gw: 192.168.0.1 +4: mosquitto version v2.0.20~3 starting +4: Using default config. +4: Opening ipv4 listen socket on port 1883. +4: mosquitto version v2.0.20~3 running +I (6702) esp-x509-crt-bundle: Certificate validated +I (7982) APPRTC_SIG: result SUCCESS +Initials set to 1 +I (7982) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 911 +I (8652) esp-x509-crt-bundle: Certificate validated +Got url:stun:webrtc.espressif.com:3478 user_name: 1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw= +I (10022) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 173 +I (10032) serverless_mqtt_webrtc: Signaling ice info handler 0x0 +I (10042) DTLS: Init SRTP OK +I (11512) DTLS_SRTP: dtls_srtp init done +I (11522) APPRTC_SIG: Registering signaling channel. +I (11522) APPRTC_SIG: Connecting to wss://webrtc.espressif.com:8089/ws... +I (11532) websocket_client: Started +I (11532) serverless_mqtt_webrtc: Waiting for peer to connect +I (12122) esp-x509-crt-bundle: Certificate validated +I (13502) APPRTC_SIG: WEBSOCKET_EVENT_CONNECTED +I (13502) APPRTC_SIG: send to remote : {"cmd":"register","roomid":"111116","clientid":"93827452"} +I (13502) serverless_mqtt_webrtc: Peer state: 2 +I (13522) AGENT: Start agent as Controlling +I (13522) PEER_DEF: Start DTLS role as 1 +I (13522) AGENT: Send STUN binding request +I (13522) AGENT: Send allocate now +Got error code 401 +I (13802) AGENT: Send allocate now +I (14112) AGENT: 0 Get candidate success user:1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw= +I (14112) APPRTC_SIG: Begin to send offer to https://webrtc.espressif.com/message/111116/93827452 +I (14782) esp-x509-crt-bundle: Certificate validated +I (16472) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 21 +I (21602) PEER_DEF: A SRC: 4 +I (21602) PEER_DEF: Get peer role 0 +I (21602) PEER_DEF: Get peer role 0 +I (21602) AGENT: 0 Add remote type:2 185.194.44.31:60872 +I (21612) AGENT: 0 Add remote type:4 172.31.6.33:59012 +I (21612) AGENT: 0 Add remote type:1 192.168.0.41:60872 +0 Sorted pair 0 type: 1 local:192.168.0.42:63459 Remote:192.168.0.41:60872 +0 Sorted pair 1 type: 2 local:185.194.44.31:63459 Remote:185.194.44.31:60872 +0 Sorted pair 2 type: 4 local:172.31.6.33:59013 Remote:172.31.6.33:59012 +I (21642) serverless_mqtt_webrtc: Peer state: 3 +I (22002) AGENT: 0 Send binding request (cand:0) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:76e5004e797d87a62756c714 +I (22002) AGENT: 0 Send binding request (cand:0) local:185.194.44.31:63459 remote:185.194.44.31:60872 id:4c598d93643bd7252b449456 +I (22012) AGENT: 0 Send binding request (cand:0) local:172.31.6.33:59013 remote:172.31.6.33:59012 id:6db505597d157d8d71dfed43 +I (22022) AGENT: 0 send indication bind request + +I (22202) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872 +I (22202) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872 +I (22212) AGENT: 0 Select pair192.168.0.41:60872 +I (22212) AGENT: 0 Send binding request (cand:1) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:45cb977b6ea3bbbe3a984b90 +I (22222) AGENT: 0 PeerIndication recv local:172.31.6.33:59013 remote:172.31.6.33:59012 +I (22402) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872 +I (22402) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872 +I (22412) AGENT: 0 Candidate responsed +I (22412) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872 +I (22422) AGENT: 0 Connection OK 192.168.0.41:60872 +I (22422) serverless_mqtt_webrtc: Peer state: 5 +I (22452) DTLS: Start to do server handshake + +Works as 1 +I (23842) DTLS: SRTP connected OK +I (23852) DTLS: Server handshake success +I (23852) PEER_DEF: DTLS handshake success +I (23852) serverless_mqtt_webrtc: Peer state: 6 +I (23852) serverless_mqtt_webrtc: Peer is connected! +I (23862) serverless_mqtt: local client event id:7 +22: New connection from 192.168.4.1:63904 on port 1883. +22: New client connected from 192.168.4.1:63904 as local_mqtt (p2, c1, k120). +I (23892) serverless_mqtt: local client connected +I (23872) serverless_mqtt: Everything is ready, exiting main task +I (23892) main_task: Returned from app_main() +I (24552) SCTP: 0 Receive chunk 1 SCTP_INIT +I (24552) SCTP: 0 state 2 +I (24552) SCTP: Send INIT_ACK chunk +I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO +I (24762) SCTP: Send ECHO_ACK chunk +I (24762) SCTP: 0 state 5 +I (24762) serverless_mqtt_webrtc: Peer state: 8 +I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO +I (24772) SCTP: Send ECHO_ACK chunk +I (24962) SCTP: Get DCEP esp_channel event:3 type:0 si:2 +I (24972) serverless_mqtt_webrtc: Peer state: 9 +``` + ## Warning -This example uses libjuice as a dependency: +This example uses `libjuice` as a dependency: * libjuice (UDP Interactive Connectivity Establishment): https://github.com/paullouisageneau/libjuice diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch new file mode 100644 index 000000000..37c4d797f --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch @@ -0,0 +1,28 @@ +From cdc43a56f5ea1ab1935f55f47f8644f5dd30825e Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Thu, 10 Jul 2025 11:09:57 +0200 +Subject: [PATCH] fix(media_lib): Remove deprecated freeRTOS header + +--- + components/media_lib_sal/port/media_lib_os_freertos.c | 4 ++++ + 1 file changed, 4 insertions(+) + +diff --git a/components/media_lib_sal/port/media_lib_os_freertos.c b/components/media_lib_sal/port/media_lib_os_freertos.c +index d248d59..aea0527 100644 +--- a/components/media_lib_sal/port/media_lib_os_freertos.c ++++ b/components/media_lib_sal/port/media_lib_os_freertos.c +@@ -40,8 +40,12 @@ + #include "esp_idf_version.h" + + #if CONFIG_FREERTOS_ENABLE_TASK_SNAPSHOT ++#if (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 2, 0)) ++#include "esp_private/freertos_debug.h" ++#else + #include "freertos/task_snapshot.h" + #endif ++#endif + + #ifdef __XTENSA__ + #include "esp_debug_helpers.h" +-- +2.43.0 diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh new file mode 100755 index 000000000..64d09fce4 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +set -e +echo "bin_dir: $1" + +bin_dir="$1" +THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +ESP_PEER_VERSION="ccff3bd65cea750bf6c0abcf9d95b931ba9329f0" + +ESP_PEER_URL="https://github.com/espressif/esp-webrtc-solution/archive/${ESP_PEER_VERSION}.zip" +ESP_PEER_DIR="${bin_dir}/esp-peer" +ZIP_PATH="${bin_dir}/esp-peer.zip" +EXTRACTED_DIR="${ESP_PEER_DIR}/esp-webrtc-solution-${ESP_PEER_VERSION}" +COMPONENTS_SRC="${EXTRACTED_DIR}/components" +COMPONENTS_DST="${ESP_PEER_DIR}/components" +PATCH_FILE_1="${THIS_DIR}/Remove-deprecated-freeRTOS-header.patch" +PATCH_FILE_2="${THIS_DIR}/libpeer-Add-direct-dependency-to-libsrtp.patch" + +# Download if not exists +if [ ! -d "$EXTRACTED_DIR" ]; then + echo "Downloading esp-peer ${ESP_PEER_VERSION}..." + wget -O "$ZIP_PATH" "$ESP_PEER_URL" + unzip -o "$ZIP_PATH" -d "$ESP_PEER_DIR" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_1" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_2" + mv ${EXTRACTED_DIR}/components ${ESP_PEER_DIR} + mv ${ESP_PEER_DIR}/components/esp_webrtc/impl/peer_default ${ESP_PEER_DIR}/components +fi diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch new file mode 100644 index 000000000..2275383f2 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch @@ -0,0 +1,23 @@ +From 695e057000698f4897b6c5802851499842e2fe31 Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Fri, 11 Jul 2025 16:59:21 +0200 +Subject: [PATCH] fix(libpeer): Add direct dependency to libsrtp + +--- + components/esp_webrtc/impl/peer_default/CMakeLists.txt | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/components/esp_webrtc/impl/peer_default/CMakeLists.txt b/components/esp_webrtc/impl/peer_default/CMakeLists.txt +index 2af35cf..3fb4615 100644 +--- a/components/esp_webrtc/impl/peer_default/CMakeLists.txt ++++ b/components/esp_webrtc/impl/peer_default/CMakeLists.txt +@@ -2,6 +2,6 @@ idf_component_register(INCLUDE_DIRS ./include) + + get_filename_component(BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR} NAME) + add_prebuilt_library(${BASE_DIR} "${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}/libpeer_default.a" +- PRIV_REQUIRES ${BASE_DIR} esp_timer) ++ PRIV_REQUIRES ${BASE_DIR} esp_timer espressif__esp_libsrtp) + target_link_libraries(${COMPONENT_LIB} INTERFACE "-L ${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}") + target_link_libraries(${COMPONENT_LIB} INTERFACE peer_default) +-- +2.43.0 diff --git a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt index b757b7286..52a905ee2 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt @@ -1,4 +1,14 @@ +if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) + set(PEER_BACKEND_SRC "peer_impl_webrtc.c") +else() + set(PEER_BACKEND_SRC "peer_impl_juice.c") +endif() + idf_component_register(SRCS "serverless_mqtt.c" "wifi_connect.c" + "${PEER_BACKEND_SRC}" INCLUDE_DIRS "." REQUIRES libjuice nvs_flash mqtt json esp_wifi) +if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) + idf_component_optional_requires(PUBLIC media_lib_sal esp_webrtc peer_default) +endif() diff --git a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild index 7e0e97de0..071409662 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild +++ b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild @@ -33,8 +33,40 @@ menu "Example Configuration" WiFi station password for the example to use. endmenu + choice EXAMPLE_PEER_LIB + prompt "Choose peer library" + default EXAMPLE_PEER_LIB_LIBJUICE + help + Choose the peer library to use for WebRTC communication. + libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling) + esp_peer: Use ESP-IDF specific peer library + + config EXAMPLE_PEER_LIB_ESP_PEER + bool "esp_peer" + + config EXAMPLE_PEER_LIB_LIBJUICE + bool "libjuice" + endchoice + + config EXAMPLE_WEBRTC_URL + string "WebRTC server URL" + depends on EXAMPLE_PEER_LIB_ESP_PEER + default "https://webrtc.espressif.com/join/" + help + URL of WebRTC remote endpoint. + + config EXAMPLE_WEBRTC_ROOM_ID + string "WebRTC room ID" + depends on EXAMPLE_PEER_LIB_ESP_PEER + default "12345" + help + Room ID for WebRTC synchronisation. + Could be a random number, but the same for both peers. + + config EXAMPLE_MQTT_BROKER_URI string "MQTT Broker URL" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "mqtt://mqtt.eclipseprojects.io" help URL of the mqtt broker use for synchronisation and exchanging @@ -42,12 +74,14 @@ menu "Example Configuration" config EXAMPLE_MQTT_SYNC_TOPIC string "MQTT topic for synchronisation" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "/topic/serverless_mqtt" help MQTT topic used fo synchronisation. config EXAMPLE_STUN_SERVER string "Hostname of STUN server" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "stun.l.google.com" help STUN server hostname. @@ -67,6 +101,7 @@ menu "Example Configuration" choice EXAMPLE_SERVERLESS_ROLE prompt "Choose your role" + depends on EXAMPLE_PEER_LIB_LIBJUICE default EXAMPLE_SERVERLESS_ROLE_PEER1 help Choose either peer1 or peer2. diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h new file mode 100644 index 000000000..e96202c4b --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h @@ -0,0 +1,18 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ + +#include +#include "esp_random.h" +#include "esp_sleep.h" +#include "mosq_broker.h" + +typedef void (*on_peer_recv_t)(const char *data, size_t size); + +esp_err_t peer_init(on_peer_recv_t cb); + +void peer_get_buffer(char ** buffer, size_t *buffer_len); + +void peer_send(char* data, size_t size); diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c new file mode 100644 index 000000000..2897c69ad --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c @@ -0,0 +1,283 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#include "mqtt_client.h" +#include "esp_wifi.h" +#include "esp_log.h" +#include "esp_check.h" +#include "juice/juice.h" +#include "cJSON.h" +#include "peer_impl.h" + +#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1) +#define OUR_PEER "1" +#define THEIR_PEER "2" +#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2) +#define OUR_PEER "2" +#define THEIR_PEER "1" +#endif + +#define PEER_SYNC0 BIT(0) +#define PEER_SYNC1 BIT(1) +#define PEER_SYNC2 BIT(2) +#define PEER_FAIL BIT(3) +#define PEER_GATHER_DONE BIT(4) +#define PEER_DESC_PUBLISHED BIT(5) +#define PEER_CONNECTED BIT(6) + +#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL) + +#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER +#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER +#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN + +static const char *TAG = "serverless_mqtt" OUR_PEER; +static char s_buffer[MAX_BUFFER_SIZE]; +static EventGroupHandle_t s_state = NULL; +static juice_agent_t *s_agent = NULL; +static cJSON *s_peer_desc_json = NULL; +static char *s_peer_desc = NULL; +static esp_mqtt_client_handle_t s_local_mqtt = NULL; +static on_peer_recv_t s_on_recv = NULL; + +char *wifi_get_ipv4(wifi_interface_t interface); +static esp_err_t sync_peers(void); +static esp_err_t create_candidates(void); + +void peer_get_buffer(char ** buffer, size_t *buffer_len) +{ + if (buffer && buffer_len) { + *buffer = s_buffer; + *buffer_len = MAX_BUFFER_SIZE; + } +} + +void peer_send(char* data, size_t size) +{ + juice_send(s_agent, data, size); +} + +esp_err_t peer_init(on_peer_recv_t cb) +{ + esp_err_t ret = ESP_FAIL; + ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback"); + s_on_recv = cb; + ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); + ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); + EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); + if (bits & PEER_CONNECTED) { + ESP_LOGI(TAG, "Peer is connected!"); + return ESP_OK; + } +err: + ESP_LOGE(TAG, "Failed to init peer"); + return ret; +} + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) +{ + esp_mqtt_event_handle_t event = event_data; + esp_mqtt_client_handle_t client = event->client; + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) { + ESP_LOGE(TAG, "Failed to subscribe to the sync topic"); + } + xEventGroupSetBits(s_state, PEER_SYNC0); + break; + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); + if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) { + break; + } + EventBits_t bits = xEventGroupGetBits(s_state); + if (event->data_len > 1 && s_agent) { + cJSON *root = cJSON_Parse(event->data); + if (root == NULL) { + break; + } + cJSON *desc = cJSON_GetObjectItem(root, "desc"); + if (desc == NULL) { + cJSON_Delete(root); + break; + } + printf("desc->valuestring:%s\n", desc->valuestring); + juice_set_remote_description(s_agent, desc->valuestring); + char cand_name[] = "cand0"; + while (true) { + cJSON *cand = cJSON_GetObjectItem(root, cand_name); + if (cand == NULL) { + break; + } + printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring); + juice_add_remote_candidate(s_agent, cand->valuestring); + cand_name[4]++; + } + cJSON_Delete(root); + xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process + // and destroy the mqtt client + } +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } +#else + if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC1); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } +#endif + break; + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } +} + +static esp_err_t sync_peers(void) +{ + esp_err_t ret = ESP_OK; + esp_mqtt_client_config_t mqtt_cfg = { + .broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI, + .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, + }; + esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); + ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL), + err, TAG, "Failed to register mqtt event handler"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + ESP_LOGI(TAG, "Waiting for the other peer..."); + const int max_sync_retry = 60; + int retry = 0; + while (true) { + EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000)); + if (bits & PEER_SYNC2) { + break; + } + if (bits & PEER_SYNC1) { + continue; + } + ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer"); + ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry); +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish mqtt message"); +#endif + } + ESP_LOGI(TAG, "Sync done"); + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish peer's description"); + ESP_LOGI(TAG, "Waiting for the other peer description and candidates..."); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates"); +err: + free(s_peer_desc); + esp_mqtt_client_destroy(client); + return ret; +} + +static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr) +{ + ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state)); + if (state == JUICE_STATE_CONNECTED) { + xEventGroupSetBits(s_state, PEER_CONNECTED); + } else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) { + esp_restart(); + } +} + +static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) +{ + static uint8_t cand_nr = 0; + if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates + char cand_name[] = "cand0"; + cand_name[4] += cand_nr++; + cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp); + } +} + +static void juice_gathering_done(juice_agent_t *agent, void *user_ptr) +{ + ESP_LOGI(TAG, "Gathering done"); + if (s_state) { + xEventGroupSetBits(s_state, PEER_GATHER_DONE); + } +} + + + +static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) +{ + if (s_local_mqtt) { + s_on_recv(data, size); + } else { + ESP_LOGI(TAG, "No local mqtt client, dropping data"); + } +} + +static esp_err_t create_candidates(void) +{ + ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); + s_peer_desc_json = cJSON_CreateObject(); + esp_err_t ret = ESP_OK; + juice_set_log_level(JUICE_LOG_LEVEL_INFO); + juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER, + .bind_address = wifi_get_ipv4(WIFI_IF_STA), + .stun_server_port = 19302, + .cb_state_changed = juice_state, + .cb_candidate = juice_candidate, + .cb_gathering_done = juice_gathering_done, + .cb_recv = juice_recv, + }; + + s_agent = juice_create(&config); + ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent"); + ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to get local description"); + ESP_LOGI(TAG, "desc: %s", s_buffer); + cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer); + + ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to start gathering candidates"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + s_peer_desc = cJSON_Print(s_peer_desc_json); + ESP_LOGI(TAG, "desc: %s", s_peer_desc); + cJSON_Delete(s_peer_desc_json); + return ESP_OK; + +err: + juice_destroy(s_agent); + s_agent = NULL; + cJSON_Delete(s_peer_desc_json); + s_peer_desc_json = NULL; + return ret; +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c new file mode 100644 index 000000000..beb39ee60 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c @@ -0,0 +1,248 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include "media_lib_os.h" +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#include "media_lib_adapter.h" +#include "media_lib_os.h" +#include "esp_log.h" +#include "esp_webrtc_defaults.h" +#include "esp_peer_default.h" +#include "common.h" +#include "esp_check.h" +#include "peer_impl.h" +#include "sdkconfig.h" + +#define WEBRTC_URL (CONFIG_EXAMPLE_WEBRTC_URL CONFIG_EXAMPLE_WEBRTC_ROOM_ID) +#define PEER_CONNECTED BIT(0) +#define PEER_DISCONNECTED BIT(1) +#define MAX_BUFFER_SIZE (4*1024) + +static EventGroupHandle_t s_state = NULL; +static const char *TAG = "serverless_mqtt_webrtc"; + +void peer_get_buffer(char ** buffer, size_t *buffer_len) +{ + static char s_buffer[MAX_BUFFER_SIZE]; + if (buffer && buffer_len) { + *buffer = s_buffer; + *buffer_len = MAX_BUFFER_SIZE; + } +} + +static int start_webrtc(char *url); +static int stop_webrtc(void); + +static on_peer_recv_t s_on_recv = NULL; +static esp_peer_signaling_handle_t signaling = NULL; +static esp_peer_handle_t peer = NULL; +static bool peer_running = false; + +static void thread_scheduler(const char *thread_name, media_lib_thread_cfg_t *thread_cfg) +{ + if (strcmp(thread_name, "pc_task") == 0) { + thread_cfg->stack_size = 25 * 1024; + thread_cfg->priority = 18; + thread_cfg->core_id = 1; + } +} + +esp_err_t peer_init(on_peer_recv_t cb) +{ + esp_err_t ret = ESP_OK; + s_on_recv = cb; + s_state = xEventGroupCreate(); + media_lib_add_default_adapter(); + media_lib_thread_set_schedule_cb(thread_scheduler); + ESP_RETURN_ON_FALSE(s_state, ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); + ESP_GOTO_ON_FALSE(start_webrtc(WEBRTC_URL) == ESP_PEER_ERR_NONE, ESP_FAIL, err, TAG, "Failed to start webRTC"); + ESP_LOGI(TAG, "Waiting for peer to connect"); + int i = 0; + while (1) { + EventBits_t bits = xEventGroupWaitBits(s_state, PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(1000)); + if (bits & PEER_CONNECTED) { + ESP_LOGI(TAG, "Peer is connected!"); + return ret; + } + ESP_GOTO_ON_FALSE(i++ < 100, ESP_ERR_TIMEOUT, err, TAG, "Peer connection timeout"); + if (peer) { + esp_peer_query(peer); + } + } + +err: + vEventGroupDelete(s_state); + return ret; +} + + +static int peer_state_handler(esp_peer_state_t state, void* ctx) +{ + ESP_LOGI(TAG, "Peer state: %d", state); + if (state == ESP_PEER_STATE_CONNECTED) { + xEventGroupSetBits(s_state, PEER_CONNECTED); + } else if (state == ESP_PEER_STATE_DISCONNECTED) { + xEventGroupSetBits(s_state, PEER_DISCONNECTED); + } + return 0; +} + +static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_MSG_TYPE_SDP) { + // Send local SDP to signaling server + esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg); + } + return 0; +} + +static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx) +{ + ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]); + return 0; +} + +static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx) +{ + return 0; +} + +static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx) +{ + if (frame && frame->size > 0) { + s_on_recv((char*)frame->data, frame->size); + } + return 0; +} + +static void pc_task(void *arg) +{ + while (peer_running) { + esp_peer_main_loop(peer); + media_lib_thread_sleep(20); + } + media_lib_thread_destroy(NULL); +} + +static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx) +{ + if (peer == NULL) { + esp_peer_default_cfg_t peer_cfg = { + .agent_recv_timeout = 500, + }; + esp_peer_cfg_t cfg = { + .server_lists = &info->server_info, + .server_num = 1, + .audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV, + .audio_info = { + .codec = ESP_PEER_AUDIO_CODEC_G711A, + }, + .enable_data_channel = true, + .role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED, + .on_state = peer_state_handler, + .on_msg = peer_msg_handler, + .on_video_info = peer_video_info_handler, + .on_audio_info = peer_audio_info_handler, + .on_video_data = peer_video_data_handler, + .on_audio_data = peer_audio_data_handler, + .on_data = peer_data_handler, + .ctx = ctx, + .extra_cfg = &peer_cfg, + .extra_size = sizeof(esp_peer_default_cfg_t), + }; + int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer); + if (ret != ESP_PEER_ERR_NONE) { + return ret; + } + media_lib_thread_handle_t thread = NULL; + peer_running = true; + media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL); + if (thread == NULL) { + peer_running = false; + } + } + return 0; +} + +static int signaling_connected_handler(void* ctx) +{ + if (peer) { + return esp_peer_new_connection(peer); + } + return 0; +} + +static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) { + esp_peer_close(peer); + peer = NULL; + } else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) { + // Receive remote SDP + if (peer) { + esp_peer_send_msg(peer, (esp_peer_msg_t*)msg); + } + } + return 0; +} + +static int signaling_close_handler(void *ctx) +{ + return 0; +} + +static int start_signaling(char* url) +{ + esp_peer_signaling_cfg_t cfg = { + .signal_url = url, + .on_ice_info = signaling_ice_info_handler, + .on_connected = signaling_connected_handler, + .on_msg = signaling_msg_handler, + .on_close = signaling_close_handler, + }; + // Use APPRTC signaling + return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling); +} + +static int start_webrtc(char *url) +{ + stop_webrtc(); + return start_signaling(url); +} + +static int stop_webrtc(void) +{ + peer_running = false; + if (peer) { + esp_peer_close(peer); + peer = NULL; + } + if (signaling) { + esp_peer_signaling_stop(signaling); + signaling = NULL; + } + return 0; +} + +void peer_send(char* data, size_t size) +{ + esp_peer_data_frame_t data_frame = { + .type = ESP_PEER_DATA_CHANNEL_DATA, + .data = (uint8_t*)data, + .size = size, + }; + esp_peer_send_data(peer, &data_frame); +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c index 8dd0bee7c..bd8b469c6 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c +++ b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD + * SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD * * SPDX-License-Identifier: Unlicense OR CC0-1.0 */ @@ -13,30 +13,9 @@ #include "esp_check.h" #include "esp_sleep.h" #include "mosq_broker.h" -#include "juice/juice.h" -#include "cJSON.h" +#include "peer_impl.h" -#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1) -#define OUR_PEER "1" -#define THEIR_PEER "2" -#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2) -#define OUR_PEER "2" -#define THEIR_PEER "1" -#endif - -#define PEER_SYNC0 BIT(0) -#define PEER_SYNC1 BIT(1) -#define PEER_SYNC2 BIT(2) -#define PEER_FAIL BIT(3) -#define PEER_GATHER_DONE BIT(4) -#define PEER_DESC_PUBLISHED BIT(5) -#define PEER_CONNECTED BIT(6) - -#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL) - -#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER -#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER -#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN +#define ALIGN(size) (((size) + 3U) & ~(3U)) typedef struct message_wrap { uint16_t topic_len; @@ -44,196 +23,18 @@ typedef struct message_wrap { char data[]; } __attribute__((packed)) message_wrap_t; -static const char *TAG = "serverless_mqtt" OUR_PEER; -static char s_buffer[MAX_BUFFER_SIZE]; -static EventGroupHandle_t s_state = NULL; -static juice_agent_t *s_agent = NULL; -static cJSON *s_peer_desc_json = NULL; -static char *s_peer_desc = NULL; +static const char *TAG = "serverless_mqtt"; + static esp_mqtt_client_handle_t s_local_mqtt = NULL; char *wifi_get_ipv4(wifi_interface_t interface); esp_err_t wifi_connect(void); -static esp_err_t sync_peers(void); -static esp_err_t create_candidates(void); static esp_err_t create_local_client(void); static esp_err_t create_local_broker(void); -void app_main(void) -{ - __attribute__((__unused__)) esp_err_t ret; - ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi"); - ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker"); - ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); - ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); - EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); - if (bits & PEER_CONNECTED) { - ESP_LOGI(TAG, "Peer is connected!"); - ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client"); - ESP_LOGI(TAG, "Everything is ready, exiting main task"); - return; - } -err: - ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)"); - esp_deep_sleep(1000000LL * (esp_random() % 20)); -} +esp_err_t peer_init(on_peer_recv_t cb); -static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) -{ - esp_mqtt_event_handle_t event = event_data; - esp_mqtt_client_handle_t client = event->client; - switch ((esp_mqtt_event_id_t)event_id) { - case MQTT_EVENT_CONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); - if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) { - ESP_LOGE(TAG, "Failed to subscribe to the sync topic"); - } - xEventGroupSetBits(s_state, PEER_SYNC0); - break; - case MQTT_EVENT_DISCONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - xEventGroupSetBits(s_state, PEER_FAIL); - break; - - case MQTT_EVENT_DATA: - ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); - printf("DATA=%.*s\r\n", event->data_len, event->data); - if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) { - break; - } - EventBits_t bits = xEventGroupGetBits(s_state); - if (event->data_len > 1 && s_agent) { - cJSON *root = cJSON_Parse(event->data); - if (root == NULL) { - break; - } - cJSON *desc = cJSON_GetObjectItem(root, "desc"); - if (desc == NULL) { - cJSON_Delete(root); - break; - } - printf("desc->valuestring:%s\n", desc->valuestring); - juice_set_remote_description(s_agent, desc->valuestring); - char cand_name[] = "cand0"; - while (true) { - cJSON *cand = cJSON_GetObjectItem(root, cand_name); - if (cand == NULL) { - break; - } - printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring); - juice_add_remote_candidate(s_agent, cand->valuestring); - cand_name[4]++; - } - cJSON_Delete(root); - xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process - // and destroy the mqtt client - } -#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 - if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) { - if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) { - xEventGroupSetBits(s_state, PEER_SYNC2); - } else { - xEventGroupSetBits(s_state, PEER_FAIL); - } - } -#else - if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) { - if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) { - xEventGroupSetBits(s_state, PEER_SYNC1); - } else { - xEventGroupSetBits(s_state, PEER_FAIL); - } - } else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) { - xEventGroupSetBits(s_state, PEER_SYNC2); - } -#endif - break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); - xEventGroupSetBits(s_state, PEER_FAIL); - break; - default: - ESP_LOGI(TAG, "Other event id:%d", event->event_id); - break; - } -} - -static esp_err_t sync_peers(void) -{ - esp_err_t ret = ESP_OK; - esp_mqtt_client_config_t mqtt_cfg = { - .broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI, - .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, - }; - esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); - ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); - ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL), - err, TAG, "Failed to register mqtt event handler"); - ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client"); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), - ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); - ESP_LOGI(TAG, "Waiting for the other peer..."); - const int max_sync_retry = 60; - int retry = 0; - while (true) { - EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000)); - if (bits & PEER_SYNC2) { - break; - } - if (bits & PEER_SYNC1) { - continue; - } - ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer"); - ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry); -#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 - ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0, - ESP_FAIL, TAG, "Failed to publish mqtt message"); -#endif - } - ESP_LOGI(TAG, "Sync done"); - ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0, - ESP_FAIL, TAG, "Failed to publish peer's description"); - ESP_LOGI(TAG, "Waiting for the other peer description and candidates..."); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), - ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates"); -err: - free(s_peer_desc); - esp_mqtt_client_destroy(client); - return ret; -} - -static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr) -{ - ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state)); - if (state == JUICE_STATE_CONNECTED) { - xEventGroupSetBits(s_state, PEER_CONNECTED); - } else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) { - esp_restart(); - } -} - -static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) -{ - static uint8_t cand_nr = 0; - if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates - char cand_name[] = "cand0"; - cand_name[4] += cand_nr++; - cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp); - } -} - -static void juice_gathering_done(juice_agent_t *agent, void *user_ptr) -{ - ESP_LOGI(TAG, "Gathering done"); - if (s_state) { - xEventGroupSetBits(s_state, PEER_GATHER_DONE); - } -} - -#define ALIGN(size) (((size) + 3U) & ~(3U)) - -static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) +static void peer_recv(const char *data, size_t size) { if (s_local_mqtt) { message_wrap_t *message = (message_wrap_t *)data; @@ -250,45 +51,21 @@ static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload); esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0); } + } -static esp_err_t create_candidates(void) +void app_main(void) { - ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); - s_peer_desc_json = cJSON_CreateObject(); - esp_err_t ret = ESP_OK; - juice_set_log_level(JUICE_LOG_LEVEL_INFO); - juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER, - .bind_address = wifi_get_ipv4(WIFI_IF_STA), - .stun_server_port = 19302, - .cb_state_changed = juice_state, - .cb_candidate = juice_candidate, - .cb_gathering_done = juice_gathering_done, - .cb_recv = juice_recv, - }; - - s_agent = juice_create(&config); - ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent"); - ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS, - ESP_FAIL, err, TAG, "Failed to get local description"); - ESP_LOGI(TAG, "desc: %s", s_buffer); - cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer); - - ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS, - ESP_FAIL, err, TAG, "Failed to start gathering candidates"); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)), - ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); - s_peer_desc = cJSON_Print(s_peer_desc_json); - ESP_LOGI(TAG, "desc: %s", s_peer_desc); - cJSON_Delete(s_peer_desc_json); - return ESP_OK; - + __attribute__((__unused__)) esp_err_t ret; + ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi"); + ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker"); + ESP_GOTO_ON_ERROR(peer_init(peer_recv), err, TAG, "Failed to init peer library"); + ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client"); + ESP_LOGI(TAG, "Everything is ready, exiting main task"); + return; err: - juice_destroy(s_agent); - s_agent = NULL; - cJSON_Delete(s_peer_desc_json); - s_peer_desc_json = NULL; - return ret; + ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)"); + esp_deep_sleep(1000000LL * (esp_random() % 20)); } static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data) @@ -335,18 +112,26 @@ err: static void handle_message(char *client, char *topic, char *payload, int len, int qos, int retain) { - if (client && strcmp(client, "local_mqtt") == 0 ) { + if (client && strcmp(client, "local_mqtt") == 0) { // This is our little local client -- do not forward return; } ESP_LOGI(TAG, "handle_message topic:%s", topic); ESP_LOGI(TAG, "handle_message data:%.*s", len, payload); ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain); - if (s_local_mqtt && s_agent) { + if (s_local_mqtt) { + static char *s_buffer = NULL; + static size_t s_buffer_len = 0; + if (s_buffer == NULL || s_buffer_len == 0) { + peer_get_buffer(&s_buffer, &s_buffer_len); + if (s_buffer == NULL || s_buffer_len == 0) { + return; + } + } int topic_len = strlen(topic) + 1; // null term int topic_len_aligned = ALIGN(topic_len); int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len; - if (total_msg_len > MAX_BUFFER_SIZE) { + if (total_msg_len > s_buffer_len) { ESP_LOGE(TAG, "Fail to forward, message too long"); return; } @@ -356,7 +141,7 @@ static void handle_message(char *client, char *topic, char *payload, int len, in memcpy(s_buffer + 4, topic, topic_len); memcpy(s_buffer + 4 + topic_len_aligned, payload, len); - juice_send(s_agent, s_buffer, total_msg_len); + peer_send(s_buffer, total_msg_len); } } diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default new file mode 100644 index 000000000..4c23f9c13 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default @@ -0,0 +1,2 @@ +CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=n +CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=y diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer new file mode 100644 index 000000000..20d4a1bbc --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer @@ -0,0 +1,5 @@ +CONFIG_IDF_TARGET="esp32" +CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=y +CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=n +CONFIG_SPIRAM=y +CONFIG_MBEDTLS_EXTERNAL_MEM_ALLOC=y diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults index 037b68015..f0fd9cde6 100644 --- a/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults @@ -1,3 +1,6 @@ CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384 CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768 +CONFIG_LWIP_SNTP_MAX_SERVERS=2 +CONFIG_MBEDTLS_SSL_PROTO_DTLS=y +CONFIG_MBEDTLS_SSL_DTLS_SRTP=y