Compare commits

..

15 Commits

Author SHA1 Message Date
29f1dec408 Merge pull request #854 from glmfe/master
bump(websocket): 1.4.0 -> 1.5.0
2025-07-23 08:45:59 -03:00
05715d80d7 bump(websocket): 1.4.0 -> 1.5.0
1.5.0
Features
- add separate tx lock for send and receive (250eebf)
- add unregister event to websocket client (ce16050)
- add ability to reconnect after close (19891d8)
Bug Fixes
- release client-lock during WEBSOCKET_EVENT_DATA (030cb75)
2025-07-22 17:54:02 -03:00
75d6845194 Merge pull request #850 from shootao/feat/add_ws_tx_lock
esp_websocket: add_ws_tx_lock (IDFGH-15952)
2025-07-21 08:08:13 -03:00
250eebf3fc feat(websocket): add separate tx lock for send and receive 2025-07-21 10:46:55 +08:00
84b61dca16 Merge pull request #840 from david-cermak/feat/mosq_esp_peer
[mosq]: Add support for esp-peer in brokerless example
2025-07-18 13:13:27 +02:00
76e45f7254 feat(mosq): Update brokerless example to work with esp-peer
* Relax CI criteria to build on v5.2+ (for the brokerless due to
  esp-peer dependency)
2025-07-18 12:49:34 +02:00
462561b8d9 Merge pull request #843 from david-cermak/fix/example_mqtt
[examples]: Use another public broker for MQTT example
2025-07-15 08:44:34 +02:00
ae8cf218c8 fix(examples): Use another public broker for MQTT example 2025-07-11 19:00:28 +02:00
c340f85a90 Merge pull request #836 from david-cermak/fix/modem_urc
[modem]: Fix URC handler processing
2025-07-11 18:14:08 +02:00
b95d8be41d fix(modem_sim): Support of PPPD exit 2025-07-11 17:37:51 +02:00
9302994673 fix(modem): Fix URC handling in DTE data callback
This partially revert 6eceb28f7d
and fixes URC handling and passing full buffer to higher layers
2025-07-11 11:01:28 +02:00
e5787e3d9f feat(modem_sim): Modem simulator based on esp-at 2025-07-11 11:01:17 +02:00
7cddc8c6f5 Merge pull request #834 from david-cermak/fix/modem_build_new_gcc
[modem]: Fix to use compatible iterator types for std::search in new gcc
2025-07-11 09:29:00 +02:00
fac2edbe59 fix(modem): Use another public broker for examples and tests
since mqtt.eclipseprojects.io is no longer available
2025-07-11 08:16:17 +02:00
ed0f633418 fix(modem): Fix incompatible iterator in std::search() in new gcc
Creates a temporary string view and uses find() instead.
2025-07-11 08:15:53 +02:00
40 changed files with 1781 additions and 289 deletions

View File

@ -45,7 +45,7 @@ jobs:
strategy:
matrix:
idf_ver: ["release-v5.0", "release-v5.1", "release-v5.2", "release-v5.3", "release-v5.4", "latest"]
test: ["target", "target_ota", "target_iperf"]
test: ["target", "target_ota", "target_iperf", "target_urc"]
runs-on: ubuntu-22.04
container: espressif/idf:${{ matrix.idf_ver }}

28
.github/workflows/modem_sim__build.yml vendored Normal file
View File

@ -0,0 +1,28 @@
name: "modem_sim: build-tests"
on:
push:
branches:
- master
pull_request:
types: [opened, synchronize, reopened, labeled]
jobs:
build_modem_sim:
if: contains(github.event.pull_request.labels.*.name, 'modem_sim') || github.event_name == 'push'
name: Build
strategy:
matrix:
idf_ver: ["release-v5.4"]
runs-on: ubuntu-22.04
container: espressif/idf:${{ matrix.idf_ver }}
steps:
- name: Checkout esp-protocols
uses: actions/checkout@v3
- name: Build ESP-AT with IDF-${{ matrix.idf_ver }}
shell: bash
run: |
cd common_components/modem_sim
./install.sh
source export.sh
idf.py build

View File

@ -14,10 +14,15 @@ jobs:
strategy:
matrix:
idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2", "release-v5.1"]
example: ["broker", "serverless_mqtt"]
exclude:
- idf_ver: "release-v5.1"
example: "serverless_mqtt" # serverless_mqtt is not supported due to esp-peer
runs-on: ubuntu-22.04
container: espressif/idf:${{ matrix.idf_ver }}
env:
TEST_DIR: components/mosquitto/examples
TEST_DIR: components/mosquitto/examples/${{ matrix.example }}
TARGET_TEST: broker
TARGET_TEST_DIR: build_esp32_default
steps:
@ -31,14 +36,17 @@ jobs:
. ${IDF_PATH}/export.sh
pip install idf-component-manager idf-build-apps --upgrade
python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml
# upload only the target test artifacts
cd ${TEST_DIR}/${TARGET_TEST}
${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR}
zip -qur artifacts.zip ${TARGET_TEST_DIR}
if [ "${{ matrix.example }}" == "${TARGET_TEST}" ]; then
# upload only the target test artifacts
cd ${TEST_DIR}
${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR}
zip -qur artifacts.zip ${TARGET_TEST_DIR}
fi
- uses: actions/upload-artifact@v4
if: ${{ matrix.example == 'broker' }}
with:
name: mosq_target_esp32_${{ matrix.idf_ver }}
path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip
path: ${{ env.TEST_DIR }}/artifacts.zip
if-no-files-found: error
test_mosq:

6
.gitignore vendored
View File

@ -94,3 +94,9 @@ docs/html
# esp-idf managed components
**/managed_components/**
# modem simulator uses esp-at clone
common_components/modem_sim/modem_sim_esp32/
# repository release tools
release_notes.txt

View File

@ -61,8 +61,8 @@ repos:
- repo: local
hooks:
- id: commit message scopes
name: "commit message must be scoped with: mdns, dns, modem, websocket, asio, mqtt_cxx, console, common, eppp, tls_cxx, mosq, sockutls, lws"
entry: '\A(?!(feat|fix|ci|bump|test|docs|chore)\((mdns|dns|modem|common|console|websocket|asio|mqtt_cxx|examples|eppp|tls_cxx|mosq|sockutls|lws)\)\:)'
name: "commit message must be scoped with: mdns, dns, modem, websocket, asio, mqtt_cxx, console, common, eppp, tls_cxx, mosq, sockutls, lws, modem_sim"
entry: '\A(?!(feat|fix|ci|bump|test|docs|chore)\((mdns|dns|modem|common|console|websocket|asio|mqtt_cxx|examples|eppp|tls_cxx|mosq|sockutls|lws|modem_sim)\)\:)'
language: pygrep
args: [--multiline]
stages: [commit-msg]

View File

@ -0,0 +1,11 @@
#!/bin/bash
source $IDF_PATH/export.sh
export AT_CUSTOM_COMPONENTS="`pwd`/pppd_cmd"
cd modem_sim_esp32/esp-at
python -m pip install -r requirements.txt
python build.py reconfigure

View File

@ -0,0 +1,64 @@
#!/bin/bash
set -e
# Create directory "modem_sim_esp32", go inside it
# Usage: ./install.sh [platform] [module]
SCRIPT_DIR=$(pwd)
mkdir -p modem_sim_esp32
cd modem_sim_esp32
if [ -z "$IDF_PATH" ]; then
echo "Error: IDF_PATH environment variable is not set"
exit 1
fi
# Default ESP_AT_VERSION uses this specific commit from master to support new chips and features
ESP_AT_VERSION="aa9d7e0e9b741744f7bf5bec3bbf887cff033d5f"
# Shallow clone of esp-at.git at $ESP_AT_VERSION
if [ ! -d "esp-at" ]; then
# cannot shallow clone from a specific commit, so we init, shallow fetch, and checkout
mkdir -p esp-at && cd esp-at && git init && git remote add origin https://github.com/espressif/esp-at.git
git fetch --depth 1 origin $ESP_AT_VERSION && git checkout $ESP_AT_VERSION
else
echo "esp-at directory already exists, skipping clone."
cd esp-at
fi
# Add esp-idf directory which is a symlink to the $IDF_PATH
if [ ! -L "esp-idf" ]; then
ln -sf "$IDF_PATH" esp-idf
else
echo "esp-idf symlink already exists, skipping."
fi
# Create "build" directory
mkdir -p build
# Default values for platform and module
platform="PLATFORM_ESP32"
module="WROOM-32"
# Override defaults if parameters are provided
if [ ! -z "$1" ]; then
platform="$1"
fi
if [ ! -z "$2" ]; then
module="$2"
fi
# Create file "build/module_info.json" with content
cat > build/module_info.json << EOF
{
"platform": "$platform",
"module": "$module",
"description": "4MB, Wi-Fi + BLE, OTA, TX:17 RX:16",
"silence": 0
}
EOF
cp "$SCRIPT_DIR/sdkconfig.defaults" "module_config/module_esp32_default/sdkconfig.defaults"
echo "Installation completed successfully!"
echo "Created modem_sim_esp32 directory with esp-at repository and configuration"

View File

@ -0,0 +1,6 @@
idf_component_register(
SRCS additional_commands.c
INCLUDE_DIRS include
REQUIRES at freertos nvs_flash)
idf_component_set_property(${COMPONENT_NAME} WHOLE_ARCHIVE TRUE)

View File

@ -0,0 +1,411 @@
/*
* SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
#include "esp_at.h"
#include "driver/gpio.h"
#include "driver/uart.h"
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "freertos/semphr.h"
#include "freertos/queue.h"
#include "esp_netif.h"
#include "esp_netif_ppp.h"
#include "esp_check.h"
#include "esp_http_server.h"
#include "esp_timer.h"
static uint8_t at_test_cmd_test(uint8_t *cmd_name)
{
uint8_t buffer[64] = {0};
snprintf((char *)buffer, 64, "test command: <AT%s=?> is executed\r\n", cmd_name);
esp_at_port_write_data(buffer, strlen((char *)buffer));
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_query_cmd_test(uint8_t *cmd_name)
{
uint8_t buffer[64] = {0};
snprintf((char *)buffer, 64, "query command: <AT%s?> is executed\r\n", cmd_name);
esp_at_port_write_data(buffer, strlen((char *)buffer));
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_setup_cmd_test(uint8_t para_num)
{
uint8_t index = 0;
printf("setup command: <AT%s=%d> is executed\r\n", esp_at_get_current_cmd_name(), para_num);
// get first parameter, and parse it into a digit
int32_t digit = 0;
if (esp_at_get_para_as_digit(index++, &digit) != ESP_AT_PARA_PARSE_RESULT_OK) {
return ESP_AT_RESULT_CODE_ERROR;
}
printf("digit: %d\r\n", digit);
// get second parameter, and parse it into a string
uint8_t *str = NULL;
if (esp_at_get_para_as_str(index++, &str) != ESP_AT_PARA_PARSE_RESULT_OK) {
return ESP_AT_RESULT_CODE_ERROR;
}
printf("string: %s\r\n", str);
// allocate a buffer and construct the data, then send the data to mcu via interface (uart/spi/sdio/socket)
uint8_t *buffer = (uint8_t *)malloc(512);
if (!buffer) {
return ESP_AT_RESULT_CODE_ERROR;
}
int len = snprintf((char *)buffer, 512, "setup command: <AT%s=%d,\"%s\"> is executed\r\n",
esp_at_get_current_cmd_name(), digit, str);
esp_at_port_write_data(buffer, len);
// remember to free the buffer
free(buffer);
return ESP_AT_RESULT_CODE_OK;
}
#define TAG "at_custom_cmd"
static esp_netif_t *s_netif = NULL;
static httpd_handle_t http_server = NULL;
static void on_ppp_event(void *arg, esp_event_base_t base, int32_t event_id, void *data)
{
esp_netif_t **netif = data;
if (base == NETIF_PPP_STATUS && event_id == NETIF_PPP_ERRORUSER) {
printf("Disconnected!");
}
}
static void on_ip_event(void *arg, esp_event_base_t base, int32_t event_id, void *data)
{
ip_event_got_ip_t *event = (ip_event_got_ip_t *)data;
esp_netif_t *netif = event->esp_netif;
if (event_id == IP_EVENT_PPP_GOT_IP) {
printf("Got IPv4 event: Interface \"%s(%s)\" address: " IPSTR, esp_netif_get_desc(netif),
esp_netif_get_ifkey(netif), IP2STR(&event->ip_info.ip));
ESP_ERROR_CHECK(esp_netif_napt_enable(s_netif));
} else if (event_id == IP_EVENT_PPP_LOST_IP) {
ESP_LOGI(TAG, "Disconnected");
}
}
static SemaphoreHandle_t at_sync_sema = NULL;
static void wait_data_callback(void)
{
static uint8_t buffer[1500] = {0};
int len = esp_at_port_read_data(buffer, sizeof(buffer) - 1);
// Check for the escape sequence "+++" in the received data
const uint8_t escape_seq[] = "+++";
uint8_t *escape_ptr = memmem(buffer, len, escape_seq, 3);
if (escape_ptr != NULL) {
printf("Found +++ sequence, signal to the command processing thread\n");
int data_before_escape = escape_ptr - buffer;
if (data_before_escape > 0) {
esp_netif_receive(s_netif, buffer, data_before_escape, NULL);
}
if (at_sync_sema) {
xSemaphoreGive(at_sync_sema);
}
return;
}
esp_netif_receive(s_netif, buffer, len, NULL);
}
static esp_err_t transmit(void *h, void *buffer, size_t len)
{
printf("transmit: %d bytes\n", len);
esp_at_port_write_data(buffer, len);
return ESP_OK;
}
static uint8_t at_exe_cmd_test(uint8_t *cmd_name)
{
uint8_t buffer[64] = {0};
snprintf((char *)buffer, 64, "execute command: <AT%s> is executed\r\n", cmd_name);
esp_at_port_write_data(buffer, strlen((char *)buffer));
printf("Command <AT%s> executed successfully\r\n", cmd_name);
if (!at_sync_sema) {
at_sync_sema = xSemaphoreCreateBinary();
assert(at_sync_sema != NULL);
esp_netif_driver_ifconfig_t driver_cfg = {
.handle = (void *)1,
.transmit = transmit,
};
const esp_netif_driver_ifconfig_t *ppp_driver_cfg = &driver_cfg;
esp_netif_inherent_config_t base_netif_cfg = ESP_NETIF_INHERENT_DEFAULT_PPP();
esp_netif_config_t netif_ppp_config = { .base = &base_netif_cfg,
.driver = ppp_driver_cfg,
.stack = ESP_NETIF_NETSTACK_DEFAULT_PPP
};
s_netif = esp_netif_new(&netif_ppp_config);
esp_netif_ppp_config_t netif_params;
ESP_ERROR_CHECK(esp_netif_ppp_get_params(s_netif, &netif_params));
netif_params.ppp_our_ip4_addr.addr = ESP_IP4TOADDR(192, 168, 11, 1);
netif_params.ppp_their_ip4_addr.addr = ESP_IP4TOADDR(192, 168, 11, 2);
netif_params.ppp_error_event_enabled = true;
ESP_ERROR_CHECK(esp_netif_ppp_set_params(s_netif, &netif_params));
if (esp_event_handler_register(IP_EVENT, ESP_EVENT_ANY_ID, on_ip_event, NULL) != ESP_OK) {
printf("Failed to register IP event handler");
}
if (esp_event_handler_register(NETIF_PPP_STATUS, ESP_EVENT_ANY_ID, on_ppp_event, NULL) != ESP_OK) {
printf("Failed to register NETIF_PPP_STATUS event handler");
}
}
esp_at_port_write_data((uint8_t *)"CONNECT\r\n", strlen("CONNECT\r\n"));
// set the callback function which will be called by AT port after receiving the input data
esp_at_port_enter_specific(wait_data_callback);
esp_netif_action_start(s_netif, 0, 0, 0);
esp_netif_action_connected(s_netif, 0, 0, 0);
while (xSemaphoreTake(at_sync_sema, pdMS_TO_TICKS(1000)) == pdFALSE) {
printf(".");
}
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_test_cereg(uint8_t *cmd_name)
{
printf("%s: AT command <AT%s> is executed\r\n", __func__, cmd_name);
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_query_cereg(uint8_t *cmd_name)
{
printf("%s: AT command <AT%s> is executed\r\n", __func__, cmd_name);
static uint8_t buffer[] = "+CEREG: 7,8\r\n";
esp_at_port_write_data(buffer, sizeof(buffer));
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_setup_cereg(uint8_t num)
{
printf("%s: AT command <AT%d> is executed\r\n", __func__, num);
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_exe_cereg(uint8_t *cmd_name)
{
printf("%s: AT command <AT%s> is executed\r\n", __func__, cmd_name);
return ESP_AT_RESULT_CODE_OK;
}
static esp_err_t hello_get_handler(httpd_req_t *req)
{
const char* resp_str = "Hello from ESP-AT HTTP Server!";
httpd_resp_send(req, resp_str, HTTPD_RESP_USE_STRLEN);
return ESP_OK;
}
static esp_err_t root_get_handler(httpd_req_t *req)
{
const char* resp_str = "ESP-AT HTTP Server is running";
httpd_resp_send(req, resp_str, HTTPD_RESP_USE_STRLEN);
return ESP_OK;
}
static esp_err_t test_get_handler(httpd_req_t *req)
{
const char* resp_str = "{\"status\":\"success\",\"message\":\"Test endpoint working\",\"timestamp\":12345}";
httpd_resp_set_type(req, "application/json");
httpd_resp_send(req, resp_str, HTTPD_RESP_USE_STRLEN);
return ESP_OK;
}
static esp_err_t async_get_handler(httpd_req_t *req)
{
printf("Starting async chunked response handler\r\n");
// Set content type for plain text response
httpd_resp_set_type(req, "text/plain");
// Static counter to track requests
static uint8_t req_count = 0;
req_count++;
// Send initial response with request count
char buffer[256];
snprintf(buffer, sizeof(buffer), "=== Async Response #%d ===\r\n", req_count);
httpd_resp_sendstr_chunk(req, buffer);
// Long message broken into chunks
const char* chunks[] = {
"This is a simulated slow server response.\r\n",
"Chunk 1: The ESP-AT HTTP server is demonstrating...\r\n",
"Chunk 2: ...asynchronous chunked transfer encoding...\r\n",
"Chunk 3: ...with artificial delays between chunks...\r\n",
"Chunk 4: ...to simulate real-world network conditions.\r\n",
"Chunk 5: Processing data... please wait...\r\n",
"Chunk 6: Still processing... almost done...\r\n",
"Chunk 7: Final chunk - transfer complete!\r\n",
"=== END OF RESPONSE ===\r\n"
};
int num_chunks = sizeof(chunks) / sizeof(chunks[0]);
// Send each chunk with delays
for (int i = 0; i < num_chunks; i++) {
// Add a delay to simulate slow processing
vTaskDelay(pdMS_TO_TICKS(1500)); // 1.5 second delay between chunks
// Add chunk number and timestamp
snprintf(buffer, sizeof(buffer), "[%d/%d] [%d ms] %s",
i + 1, num_chunks, (int)(esp_timer_get_time() / 1000), chunks[i]);
printf("Sending chunk %d: %s", i + 1, chunks[i]);
httpd_resp_sendstr_chunk(req, buffer);
}
// Add final summary
vTaskDelay(pdMS_TO_TICKS(500));
snprintf(buffer, sizeof(buffer), "\r\nTransfer completed in %d chunks with delays.\r\n", num_chunks);
httpd_resp_sendstr_chunk(req, buffer);
// Send NULL to signal end of chunked transfer
httpd_resp_sendstr_chunk(req, NULL);
printf("Async chunked response completed\r\n");
return ESP_OK;
}
static const httpd_uri_t hello = {
.uri = "/hello",
.method = HTTP_GET,
.handler = hello_get_handler,
.user_ctx = NULL
};
static const httpd_uri_t root = {
.uri = "/",
.method = HTTP_GET,
.handler = root_get_handler,
.user_ctx = NULL
};
static const httpd_uri_t test = {
.uri = "/test",
.method = HTTP_GET,
.handler = test_get_handler,
.user_ctx = NULL
};
static const httpd_uri_t async_uri = {
.uri = "/async",
.method = HTTP_GET,
.handler = async_get_handler,
.user_ctx = NULL
};
static esp_err_t start_http_server(void)
{
if (http_server != NULL) {
printf("HTTP server already running\r\n");
return ESP_OK;
}
httpd_config_t config = HTTPD_DEFAULT_CONFIG();
config.server_port = 8080;
config.lru_purge_enable = true;
printf("Starting HTTP server on port: %d\r\n", config.server_port);
if (httpd_start(&http_server, &config) == ESP_OK) {
printf("Registering URI handlers\r\n");
httpd_register_uri_handler(http_server, &hello);
httpd_register_uri_handler(http_server, &root);
httpd_register_uri_handler(http_server, &test);
httpd_register_uri_handler(http_server, &async_uri);
return ESP_OK;
}
printf("Error starting HTTP server!\r\n");
return ESP_FAIL;
}
static esp_err_t stop_http_server(void)
{
if (http_server != NULL) {
httpd_stop(http_server);
http_server = NULL;
printf("HTTP server stopped\r\n");
return ESP_OK;
}
return ESP_OK;
}
/* HTTP Server AT Commands */
static uint8_t at_test_httpd(uint8_t *cmd_name)
{
uint8_t buffer[64] = {0};
snprintf((char *)buffer, 64, "AT%s=<0/1> - Start/Stop HTTP server\r\n", cmd_name);
esp_at_port_write_data(buffer, strlen((char *)buffer));
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_query_httpd(uint8_t *cmd_name)
{
uint8_t buffer[64] = {0};
snprintf((char *)buffer, 64, "+HTTPD:%d\r\n", http_server != NULL ? 1 : 0);
esp_at_port_write_data(buffer, strlen((char *)buffer));
return ESP_AT_RESULT_CODE_OK;
}
static uint8_t at_setup_httpd(uint8_t para_num)
{
int32_t action = 0;
if (esp_at_get_para_as_digit(0, &action) != ESP_AT_PARA_PARSE_RESULT_OK) {
return ESP_AT_RESULT_CODE_ERROR;
}
if (action == 1) {
if (start_http_server() == ESP_OK) {
printf("HTTP server started successfully\r\n");
return ESP_AT_RESULT_CODE_OK;
}
} else if (action == 0) {
if (stop_http_server() == ESP_OK) {
return ESP_AT_RESULT_CODE_OK;
}
}
return ESP_AT_RESULT_CODE_ERROR;
}
static uint8_t at_exe_httpd(uint8_t *cmd_name)
{
// Default action: start server
if (start_http_server() == ESP_OK) {
printf("HTTP server started via execute command\r\n");
return ESP_AT_RESULT_CODE_OK;
}
return ESP_AT_RESULT_CODE_ERROR;
}
static const esp_at_cmd_struct at_custom_cmd[] = {
{"+PPPD", at_test_cmd_test, at_query_cmd_test, at_setup_cmd_test, at_exe_cmd_test},
{"+CEREG", at_test_cereg, at_query_cereg, at_setup_cereg, at_exe_cereg},
{"+HTTPD", at_test_httpd, at_query_httpd, at_setup_httpd, at_exe_httpd},
};
bool esp_at_custom_cmd_register(void)
{
return esp_at_custom_cmd_array_regist(at_custom_cmd, sizeof(at_custom_cmd) / sizeof(esp_at_cmd_struct));
}
ESP_AT_CMD_SET_INIT_FN(esp_at_custom_cmd_register, 1);

View File

@ -0,0 +1,12 @@
/*
* SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
#include "esp_at_core.h"
#include "esp_at.h"
/**
* @brief You can include more header files here for your own AT commands.
*/

View File

@ -0,0 +1,77 @@
# This file was generated using idf.py save-defconfig. It can be edited manually.
# Espressif IoT Development Framework (ESP-IDF) 5.4.1 Project Minimal Configuration
#
CONFIG_BOOTLOADER_APP_ROLLBACK_ENABLE=y
CONFIG_APP_PROJECT_VER_FROM_CONFIG=y
CONFIG_APP_PROJECT_VER="v4.1.0.0-dev"
CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y
CONFIG_PARTITION_TABLE_CUSTOM=y
CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="module_config/module_esp32_default/partitions_at.csv"
CONFIG_PARTITION_TABLE_MD5=n
CONFIG_AT_CUSTOMIZED_PARTITION_TABLE_FILE="module_config/module_esp32_default/at_customize.csv"
CONFIG_BT_ENABLED=y
CONFIG_BT_BTU_TASK_STACK_SIZE=5120
CONFIG_BT_BLE_BLUFI_ENABLE=y
CONFIG_BT_STACK_NO_LOG=y
CONFIG_BT_BLE_DYNAMIC_ENV_MEMORY=y
CONFIG_BTDM_CTRL_MODE_BTDM=y
CONFIG_BTDM_CTRL_LPCLK_SEL_EXT_32K_XTAL=y
CONFIG_BTDM_SCAN_DUPL_CACHE_SIZE=200
CONFIG_ESP_TLS_PSK_VERIFICATION=y
CONFIG_ESP_TLS_INSECURE=y
CONFIG_ESP_TLS_SKIP_SERVER_CERT_VERIFY=y
CONFIG_ESP_ERR_TO_NAME_LOOKUP=n
CONFIG_GPIO_ESP32_SUPPORT_SWITCH_SLP_PULL=y
CONFIG_ETH_DMA_RX_BUFFER_NUM=3
CONFIG_ETH_DMA_TX_BUFFER_NUM=3
CONFIG_HTTPD_MAX_REQ_HDR_LEN=1024
CONFIG_HTTPD_MAX_URI_LEN=1024
CONFIG_ESP_HTTPS_OTA_ALLOW_HTTP=y
CONFIG_RTC_CLK_SRC_EXT_CRYS=y
CONFIG_RTC_EXT_CRYST_ADDIT_CURRENT=y
CONFIG_RTC_CLK_CAL_CYCLES=1024
CONFIG_PM_ENABLE=y
CONFIG_PM_SLP_DISABLE_GPIO=y
CONFIG_ESP_DEFAULT_CPU_FREQ_MHZ_80=y
CONFIG_ESP_TASK_WDT_PANIC=y
CONFIG_ESP_TASK_WDT_TIMEOUT_S=60
CONFIG_ESP_DEBUG_OCDAWARE=n
CONFIG_ESP_WIFI_IRAM_OPT=n
CONFIG_ESP_WIFI_RX_IRAM_OPT=n
CONFIG_ESP_WIFI_SLP_IRAM_OPT=y
CONFIG_ESP_WIFI_SLP_BEACON_LOST_OPT=y
CONFIG_ESP_WIFI_ESPNOW_MAX_ENCRYPT_NUM=0
CONFIG_FATFS_LFN_HEAP=y
CONFIG_FREERTOS_UNICORE=y
CONFIG_FREERTOS_HZ=1000
CONFIG_FREERTOS_USE_TICKLESS_IDLE=y
CONFIG_FREERTOS_CHECK_MUTEX_GIVEN_BY_OWNER=n
CONFIG_FREERTOS_PLACE_FUNCTIONS_INTO_FLASH=y
CONFIG_HEAP_PLACE_FUNCTION_INTO_FLASH=y
CONFIG_LOG_DEFAULT_LEVEL_ERROR=y
CONFIG_LWIP_MAX_SOCKETS=16
CONFIG_LWIP_SO_LINGER=y
CONFIG_LWIP_SO_RCVBUF=y
CONFIG_LWIP_IP4_REASSEMBLY=y
CONFIG_LWIP_IP6_REASSEMBLY=y
CONFIG_LWIP_IPV6_AUTOCONFIG=y
CONFIG_LWIP_TCP_MAXRTX=6
CONFIG_LWIP_TCP_SYNMAXRTX=3
CONFIG_LWIP_PPP_SUPPORT=y
CONFIG_LWIP_PPP_SERVER_SUPPORT=y
CONFIG_LWIP_SNTP_MAX_SERVERS=3
CONFIG_LWIP_SNTP_STARTUP_DELAY=n
CONFIG_MBEDTLS_DYNAMIC_BUFFER=y
CONFIG_MBEDTLS_DYNAMIC_FREE_CONFIG_DATA=y
CONFIG_MBEDTLS_SSL_KEEP_PEER_CERTIFICATE=n
CONFIG_MBEDTLS_HAVE_TIME_DATE=y
CONFIG_MBEDTLS_DHM_C=y
CONFIG_NEWLIB_NANO_FORMAT=y
CONFIG_VFS_SUPPORT_TERMIOS=n
CONFIG_WL_SECTOR_SIZE_512=y
CONFIG_AT_PROCESS_TASK_STACK_SIZE=6144
CONFIG_AT_MQTT_COMMAND_SUPPORT=y
CONFIG_AT_HTTP_COMMAND_SUPPORT=y
CONFIG_AT_BLE_COMMAND_SUPPORT=n
CONFIG_AT_BLE_HID_COMMAND_SUPPORT=n
CONFIG_AT_BLUFI_COMMAND_SUPPORT=n

View File

@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
* SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
@ -18,14 +18,14 @@ using namespace esp_modem;
command_result net_open(CommandableIf *t)
{
ESP_LOGV(TAG, "%s", __func__ );
ESP_LOGV(TAG, "%s", __func__);
std::string out;
auto ret = dce_commands::generic_get_string(t, "AT+QISTATE?\r", out, 1000);
if (ret != command_result::OK) {
return ret;
}
if (out.find("+QISTATE: 0") != std::string::npos) {
ESP_LOGV(TAG, "%s", out.data() );
ESP_LOGV(TAG, "%s", out.data());
ESP_LOGD(TAG, "Already there");
return command_result::FAIL;
} else if (out.empty()) {
@ -36,7 +36,7 @@ command_result net_open(CommandableIf *t)
command_result net_close(CommandableIf *t)
{
ESP_LOGV(TAG, "%s", __func__ );
ESP_LOGV(TAG, "%s", __func__);
dce_commands::generic_command(t, "AT+QICLOSE=0\r", "OK", "ERROR", 10000);
esp_modem::Task::Delay(1000);
return dce_commands::generic_command(t, "AT+QIDEACT=1\r", "OK", "ERROR", 40000);
@ -44,11 +44,11 @@ command_result net_close(CommandableIf *t)
command_result tcp_open(CommandableIf *t, const std::string &host, int port, int timeout)
{
ESP_LOGV(TAG, "%s", __func__ );
ESP_LOGV(TAG, "%s", __func__);
std::string ip_open = R"(AT+QIOPEN=1,0,"TCP",")" + host + "\"," + std::to_string(port) + "\r";
auto ret = dce_commands::generic_command(t, ip_open, "+QIOPEN: 0,0", "ERROR", timeout);
if (ret != command_result::OK) {
ESP_LOGE(TAG, "%s Failed", __func__ );
ESP_LOGE(TAG, "%s Failed", __func__);
return ret;
}
return command_result::OK;
@ -56,27 +56,27 @@ command_result tcp_open(CommandableIf *t, const std::string &host, int port, int
command_result tcp_close(CommandableIf *t)
{
ESP_LOGV(TAG, "%s", __func__ );
ESP_LOGV(TAG, "%s", __func__);
return dce_commands::generic_command(t, "AT+QICLOSE=0\r", "OK", "ERROR", 10000);
}
command_result tcp_send(CommandableIf *t, uint8_t *data, size_t len)
{
ESP_LOGV(TAG, "%s", __func__ );
ESP_LOGV(TAG, "%s", __func__);
assert(0); // Remove when fix done
return command_result::FAIL;
}
command_result tcp_recv(CommandableIf *t, uint8_t *data, size_t len, size_t &out_len)
{
ESP_LOGV(TAG, "%s", __func__ );
ESP_LOGV(TAG, "%s", __func__);
assert(0); // Remove when fix done
return command_result::FAIL;
}
command_result get_ip(CommandableIf *t, std::string &ip)
{
ESP_LOGV(TAG, "%s", __func__ );
ESP_LOGV(TAG, "%s", __func__);
std::string out;
auto ret = dce_commands::generic_get_string(t, "AT+QIACT?\r", out, 5000);
if (ret != command_result::OK) {
@ -130,12 +130,15 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
auto *recv_data = (char *)data;
if (data_to_recv == 0) {
const std::string_view head = "+QIRD: ";
auto head_pos = std::search(recv_data, recv_data + len, head.begin(), head.end());
if (head_pos == recv_data + len) {
const std::string_view recv_data_view = std::string_view(recv_data, len);
auto head_pos_found = recv_data_view.find(head);
if (head_pos_found == std::string_view::npos) {
return ret::FAIL;
}
auto *head_pos = recv_data + head_pos_found;
auto next_nl = (char *)memchr(head_pos + head.size(), '\n', MIN_MESSAGE);
if (next_nl == nullptr) {
return ret::FAIL;
}

View File

@ -185,7 +185,7 @@ menu "Example Configuration"
config EXAMPLE_MQTT_BROKER_URI
string "MQTT Broker URL"
default "mqtt://mqtt.eclipseprojects.io"
default "mqtt://test.mosquitto.org"
help
URL of the mqtt broker which this example connects to.

View File

@ -16,4 +16,4 @@ CONFIG_COMPILER_CXX_EXCEPTIONS=y
CONFIG_ESP_MAIN_TASK_STACK_SIZE=8192
CONFIG_EXAMPLE_CLOSE_CMUX_AT_END=y
CONFIG_EXAMPLE_MQTT_TEST_TOPIC="/ci/esp-modem/pppos-client"
CONFIG_BROKER_URI="mqtt://mqtt.eclipseprojects.io"
CONFIG_BROKER_URI="mqtt://test.mosquitto.org"

View File

@ -367,24 +367,21 @@ void DTE::on_read(got_line_cb on_read_cb)
bool DTE::command_cb::process_line(uint8_t *data, size_t consumed, size_t len)
{
// returning true indicates that the processing finished and lower layers can destroy the accumulated buffer
#ifdef CONFIG_ESP_MODEM_URC_HANDLER
command_result commandResult = command_result::FAIL;
bool consume_buffer = false;
if (urc_handler) {
commandResult = urc_handler(data, consumed + len);
consume_buffer = urc_handler(data, consumed + len) != command_result::TIMEOUT;
}
if (result != command_result::TIMEOUT && got_line == nullptr) {
return false; // this line has been processed already (got OK or FAIL previously)
if (result != command_result::TIMEOUT || got_line == nullptr) {
return consume_buffer; // this line has been processed already (got OK or FAIL previously)
}
#endif
if (memchr(data + consumed, separator, len)) {
result = got_line(data + consumed, consumed + len);
result = got_line(data, consumed + len);
if (result == command_result::OK || result == command_result::FAIL) {
signal.set(GOT_LINE);
#ifdef CONFIG_ESP_MODEM_URC_HANDLER
return commandResult == command_result::OK;
#else
return true;
#endif
}
}
return false;

View File

@ -0,0 +1,7 @@
# The following lines of boilerplate have to be in your project's CMakeLists
# in this exact order for cmake to work correctly
cmake_minimum_required(VERSION 3.5)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
set(COMPONENTS main)
project(urc_test)

View File

@ -0,0 +1,2 @@
idf_component_register(SRCS "urc_test.cpp"
INCLUDE_DIRS ".")

View File

@ -0,0 +1,7 @@
## IDF Component Manager Manifest File
dependencies:
## Required IDF version
idf: ">=4.1.0"
espressif/esp_modem:
version: "^1.0.0"
override_path: "../../../"

View File

@ -0,0 +1,158 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include <cstring>
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "esp_netif.h"
#include "cxx_include/esp_modem_dte.hpp"
#include "esp_modem_config.h"
#include "cxx_include/esp_modem_api.hpp"
#include "cxx_include/esp_modem_dce_factory.hpp"
#include "cxx_include/esp_modem_command_library_utils.hpp"
#include "esp_log.h"
#include "sdkconfig.h"
static const char *TAG = "urc_test";
static EventGroupHandle_t s_event_group = nullptr;
class ESP_AT_Module: public ::esp_modem::ModuleIf {
public:
explicit ESP_AT_Module(std::shared_ptr<::esp_modem::DTE> dte, const esp_modem_dce_config *config):
dte(std::move(dte)) {}
bool setup_data_mode() override
{
// not using network here
return true;
}
bool set_mode(::esp_modem::modem_mode mode) override
{
// we never allow mode change
return false;
}
protected:
std::shared_ptr<::esp_modem::DTE> dte;
};
class DCE : public esp_modem::DCE_T<ESP_AT_Module> {
using DCE_T<ESP_AT_Module>::DCE_T;
public:
bool init()
{
for (int i = 0; i < 5; ++i) {
if (sync() == esp_modem::command_result::OK) {
ESP_LOGI(TAG, "Modem in sync");
return true;
}
vTaskDelay(pdMS_TO_TICKS(500 * (i + 1)));
}
ESP_LOGE(TAG, "Failed to sync with esp-at");
return false;
}
esp_modem::command_result sync()
{
auto ret = esp_modem::dce_commands::generic_command_common(dte.get(), "AT\r\n");
ESP_LOGI(TAG, "Syncing with esp-at...(%d)", static_cast<int>(ret));
return ret;
}
bool http_get(const std::string &url)
{
std::string command = "AT+HTTPCGET=\"" + url + "\"\r\n";
set_urc(handle_urc);
auto ret = dte->write(esp_modem::DTE_Command(command));
ESP_LOGI(TAG, "HTTP GET...(%d)", static_cast<int>(ret));
return ret > 0;
}
bool start_http_server() const
{
auto ret = esp_modem::dce_commands::generic_command_common(dte.get(), "AT+HTTPD\r\n");
ESP_LOGI(TAG, "Start HTTP server...(%d)", static_cast<int>(ret));
return ret == esp_modem::command_result::OK;
}
static constexpr int transfer_completed = 1;
private:
static esp_modem::command_result handle_urc(uint8_t *data, size_t len)
{
static int start_chunk = 0;
static int end_chunk = 0;
std::string_view chunk((const char*)data + start_chunk, len - start_chunk);
int newline = chunk.find('\n');
if (newline == std::string_view::npos) {
end_chunk = len; // careful, this grows buffer usage
printf(".");
return esp_modem::command_result::TIMEOUT;
}
printf("%.*s\n", newline, (char*)data + start_chunk);
start_chunk = end_chunk;
// check for the last one
constexpr char last_chunk[] = "Transfer completed";
if (memmem(data, len, last_chunk, sizeof(last_chunk) - 1) != nullptr) {
xEventGroupSetBits(s_event_group, transfer_completed);
}
return esp_modem::command_result::OK;
}
};
class Factory: public ::esp_modem::dce_factory::Factory {
public:
static std::unique_ptr<DCE> create(const esp_modem::dce_config *config, std::shared_ptr<esp_modem::DTE> dte, esp_netif_t *netif)
{
return build_generic_DCE<ESP_AT_Module, DCE, std::unique_ptr<DCE>>(config, std::move(dte), netif);
}
};
std::unique_ptr<DCE> create(std::shared_ptr<esp_modem::DTE> dte)
{
esp_netif_config_t netif_ppp_config = ESP_NETIF_DEFAULT_PPP();
static esp_netif_t *netif = esp_netif_new(&netif_ppp_config);
assert(netif);
esp_modem_dce_config_t dce_config = ESP_MODEM_DCE_DEFAULT_CONFIG("APN"); // dummy config (not used with esp-at)
return Factory::create(&dce_config, std::move(dte), netif);
}
extern "C" void app_main(void)
{
/* Init and register system/core components */
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());
s_event_group = xEventGroupCreate();
esp_modem_dte_config_t dte_config = ESP_MODEM_DTE_DEFAULT_CONFIG();
dte_config.dte_buffer_size = 1024;
dte_config.uart_config.tx_io_num = 18;
dte_config.uart_config.rx_io_num = 17;
auto uart_dte = esp_modem::create_uart_dte(&dte_config);
if (uart_dte == nullptr) {
ESP_LOGE(TAG, "Failed to create UART DTE");
return;
}
auto dce = create(std::move(uart_dte));
if (!dce->init()) {
ESP_LOGE(TAG, "Failed to setup network");
return;
}
dce->start_http_server();
dce->http_get("http://127.0.0.1:8080/async");
EventBits_t bits = xEventGroupWaitBits(s_event_group, 1, pdTRUE, pdFALSE, pdMS_TO_TICKS(15000));
if (bits & DCE::transfer_completed) {
ESP_LOGI(TAG, "Request finished!");
}
dce->sync();
vEventGroupDelete(s_event_group);
ESP_LOGI(TAG, "Done");
}

View File

@ -0,0 +1,3 @@
CONFIG_COMPILER_CXX_EXCEPTIONS=y
CONFIG_LWIP_PPP_SUPPORT=y
CONFIG_ESP_MODEM_URC_HANDLER=y

View File

@ -3,6 +3,6 @@ commitizen:
bump_message: 'bump(websocket): $current_version -> $new_version'
pre_bump_hooks: python ../../ci/changelog.py esp_websocket_client
tag_format: websocket-v$version
version: 1.4.0
version: 1.5.0
version_files:
- idf_component.yml

View File

@ -1,5 +1,17 @@
# Changelog
## [1.5.0](https://github.com/espressif/esp-protocols/commits/websocket-v1.5.0)
### Features
- add separate tx lock for send and receive ([250eebf](https://github.com/espressif/esp-protocols/commit/250eebf))
- add unregister event to websocket client ([ce16050](https://github.com/espressif/esp-protocols/commit/ce16050))
- add ability to reconnect after close ([19891d8](https://github.com/espressif/esp-protocols/commit/19891d8))
### Bug Fixes
- release client-lock during WEBSOCKET_EVENT_DATA ([030cb75](https://github.com/espressif/esp-protocols/commit/030cb75))
## [1.4.0](https://github.com/espressif/esp-protocols/commits/websocket-v1.4.0)
### Features

View File

@ -7,4 +7,17 @@ menu "ESP WebSocket client"
Enable this option will reallocated buffer when send or receive data and free them when end of use.
This can save about 2 KB memory when no websocket data send and receive.
config ESP_WS_CLIENT_SEPARATE_TX_LOCK
bool "Enable separate tx lock for send and receive data"
default n
help
Enable this option will use separate lock for send and receive data.
This can avoid the lock contention when send and receive data at the same time.
config ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS
int "TX lock timeout in milliseconds"
depends on ESP_WS_CLIENT_SEPARATE_TX_LOCK
default 2000
help
Timeout for acquiring the TX lock when using separate TX lock.
endmenu

View File

@ -39,6 +39,10 @@ static const char *TAG = "websocket_client";
#define WEBSOCKET_KEEP_ALIVE_INTERVAL (5)
#define WEBSOCKET_KEEP_ALIVE_COUNT (3)
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
#define WEBSOCKET_TX_LOCK_TIMEOUT_MS (CONFIG_ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS)
#endif
#define ESP_WS_CLIENT_MEM_CHECK(TAG, a, action) if (!(a)) { \
ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Memory exhausted"); \
action; \
@ -131,6 +135,9 @@ struct esp_websocket_client {
bool selected_for_destroying;
EventGroupHandle_t status_bits;
SemaphoreHandle_t lock;
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
SemaphoreHandle_t tx_lock;
#endif
size_t errormsg_size;
char *errormsg_buffer;
char *rx_buffer;
@ -441,6 +448,9 @@ static void destroy_and_free_resources(esp_websocket_client_handle_t client)
esp_transport_list_destroy(client->transport_list);
}
vSemaphoreDelete(client->lock);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
vSemaphoreDelete(client->tx_lock);
#endif
free(client->tx_buffer);
free(client->rx_buffer);
free(client->errormsg_buffer);
@ -610,10 +620,17 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
return -1;
}
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1;
}
#else
if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1;
}
#endif
if (esp_websocket_new_buf(client, true) != ESP_OK) {
ESP_LOGE(TAG, "Failed to setup tx buffer");
@ -653,7 +670,11 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
ret = widx;
unlock_and_return:
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#else
xSemaphoreGiveRecursive(client->lock);
#endif
return ret;
}
@ -689,6 +710,11 @@ esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_clie
client->lock = xSemaphoreCreateRecursiveMutex();
ESP_WS_CLIENT_MEM_CHECK(TAG, client->lock, goto _websocket_init_fail);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
client->tx_lock = xSemaphoreCreateRecursiveMutex();
ESP_WS_CLIENT_MEM_CHECK(TAG, client->tx_lock, goto _websocket_init_fail);
#endif
client->config = calloc(1, sizeof(websocket_config_storage_t));
ESP_WS_CLIENT_MEM_CHECK(TAG, client->config, goto _websocket_init_fail);
@ -967,8 +993,17 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
if (client->last_opcode == WS_TRANSPORT_OPCODES_PING) {
const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer;
ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
return ESP_FAIL;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
client->wait_for_pong_resp = false;
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_CLOSE) {
@ -1050,8 +1085,16 @@ static void esp_websocket_client_task(void *pv)
if (_tick_get_ms() - client->ping_tick_ms > client->config->ping_interval_sec * 1000) {
client->ping_tick_ms = _tick_get_ms();
ESP_LOGD(TAG, "Sending PING...");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
break;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PING | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
if (!client->wait_for_pong_resp && client->config->pingpong_timeout_sec) {
client->pingpong_tick_ms = _tick_get_ms();
client->wait_for_pong_resp = true;
@ -1086,7 +1129,16 @@ static void esp_websocket_client_task(void *pv)
// if closing not initiated by the client echo the close message back
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) {
ESP_LOGD(TAG, "Closing initiated by the server, sending close frame");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
break;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_CLOSE | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
xEventGroupSetBits(client->status_bits, CLOSE_FRAME_SENT_BIT);
}
break;

View File

@ -1,4 +1,4 @@
version: "1.4.0"
version: "1.5.0"
description: WebSocket protocol client for ESP-IDF
url: https://github.com/espressif/esp-protocols/tree/master/components/esp_websocket_client
dependencies:

View File

@ -3,4 +3,21 @@
cmake_minimum_required(VERSION 3.16)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
# Setup ESP-PEER from GitHub repo (but it's supported only on certain targets)
set(ESP_PEER_COMPATIBLE_TARGETS "esp32s2" "esp32s3" "esp32p4" "esp32")
if(IDF_TARGET IN_LIST ESP_PEER_COMPATIBLE_TARGETS)
execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup/install.sh
${CMAKE_BINARY_DIR}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
RESULT_VARIABLE script_result)
if(script_result)
message(FATAL_ERROR "Script esp_peer_setup.sh failed with exit code ${script_result}")
endif()
list(APPEND EXTRA_COMPONENT_DIRS "${CMAKE_BINARY_DIR}/esp-peer/components/")
else()
message(STATUS "ESP-PEER is not compatible with this target")
endif()
project(serverless_mqtt)

View File

@ -3,14 +3,19 @@
MQTT served by (two) mosquitto's running on two ESP chips.
* Leverages MQTT connectivity between two private networks without cloud premisses.
* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE protocol, by [libjuice](https://github.com/paullouisageneau/libjuice)).
* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE/WebRTC protocol)
## Peer to peer connection
Could be established either by [libjuice](https://github.com/paullouisageneau/libjuice) or [esp-webRTC](https://github.com/espressif/esp-webrtc-solution). While `juice` is just a low level implementation of ICE-UDP, we need to provide some signalling and synchronization, the `WebRTC` is full-fledged solution to establish a peer connection using standardized signalling, security and transfer protocols.
## How it works
This example needs two ESP32 chipsets, that will create two separate Wi-Fi networks (IoT networks) used for IoT devices.
Each IoT network is served by an MQTT server (using mosquitto component).
This example will also synchronize these two MQTT brokers, as if there was only one IoT network with one broker.
This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) to traverse NATs, which enabling direct connection between two private networks behind NATs.
This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) or esp-webRTC (which implements WebRTC) to traverse NATs, which enabling direct connection between two private networks behind NATs.
* Diagram
@ -19,12 +24,16 @@ This example creates a peer to peer connection between two chipsets to keep them
Here's a step-by-step procedure of establishing this remote connection:
1) Initialize and start Wi-Fi AP (for IoT networks) and Wi-Fi station (for internet connection)
2) Start mosquitto broker on IoT network
3) Start libjuice to gather connection candidates
4) Synchronize using a public MQTT broker and exchange ICE descriptors
5) Establish ICE UDP connection between the two ESP32 chipsets
3) Start peer to peer connection
- In case of `libjuice`
- gather connection candidates
- synchronize using a public MQTT broker and exchange ICE descriptors
- establish ICE UDP connection between the two ESP32 chipsets
- In case of `webRTC` simply start the connection.
6) Start forwarding mqtt messages
- Each remote datagram (received from ICE-UDP channel) is re-published to the local MQTT server
- Each local MQTT message (received from mosquitto on_message callback) is sent in ICE-UDP datagram
- Each local MQTT message (received from mosquitto on_message callback) is sent in a peer message
## How to use this example
@ -33,7 +42,9 @@ You need two ESP32 devices that support Wi-Fi station and Wi-Fi software access
* Configure Wi-Fi credentials for both devices on both interfaces
* These devices would be deployed in distinct Wi-Fi environments, so the Wi-Fi station credentials would likely be different.
* They also create their own IoT network (on the soft-AP interface) Wi-Fi, so the AP credentials would likely be the same, suggesting the IoT networks will be keep synchronized (even though these are two distict Wi-Fi networks).
* Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role.
* Choose the peer library
* Only for `libjuice`:
- Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role.
* Optionally: You can use `idf.py` `-D` and `-B` flag to keep separate build directories and sdkconfigs for these two roles
```
idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor
@ -44,9 +55,129 @@ idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor
* Join PEER2 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics.
* Whenever you publish to a topic, all subscribed clients should receive the message, no matter which Wi-Fi network they're connected to.
## Example output
## With libjuice
```
I (4746) esp_netif_handlers: sta ip: 192.168.0.40, mask: 255.255.255.0, gw: 192.168.0.1
4: mosquitto version v2.0.20~3 starting
4: Using default config.
4: Opening ipv4 listen socket on port 1883.
4: mosquitto version v2.0.20~3 running
I (4756) serverless_mqtt1: desc: a=ice-ufrag:sGdl
a=ice-pwd:R4IPGsFctITbT1dCZbfQTL
a=ice-options:ice2,trickle
00:00:04 INFO agent.c:1100: Changing state to gathering
I (4776) serverless_mqtt1: JUICE state change: gathering
00:00:04 INFO agent.c:1100: Changing state to connecting
I (4786) serverless_mqtt1: JUICE state change: connecting
00:00:04 INFO agent.c:422: Using STUN server stun.l.google.com:19302
00:00:04 INFO agent.c:1378: STUN server binding successful
00:00:04 INFO agent.c:1397: Got STUN mapped address 185.194.44.31:62703 from server
00:00:04 INFO agent.c:2428: Candidate gathering done
I (5066) serverless_mqtt1: Gathering done
I (5066) serverless_mqtt1: desc: {
"desc": "a=ice-ufrag:sGdl\r\na=ice-pwd:R4IPGsFctITbT1dCZbfQTL\r\na=ice-options:ice2,trickle\r\n",
"cand0": "a=candidate:1 1 UDP 2122317823 192.168.0.40 62703 typ host",
"cand1": "a=candidate:2 1 UDP 1686109951 185.194.44.31 62703 typ srflx raddr 0.0.0.0 rport 0"
}
I (5096) serverless_mqtt1: Other event id:7
```
### With esp-peer
```
I (5992) esp_netif_handlers: sta ip: 192.168.0.42, mask: 255.255.255.0, gw: 192.168.0.1
4: mosquitto version v2.0.20~3 starting
4: Using default config.
4: Opening ipv4 listen socket on port 1883.
4: mosquitto version v2.0.20~3 running
I (6702) esp-x509-crt-bundle: Certificate validated
I (7982) APPRTC_SIG: result SUCCESS
Initials set to 1
I (7982) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 911
I (8652) esp-x509-crt-bundle: Certificate validated
Got url:stun:webrtc.espressif.com:3478 user_name: 1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw=
I (10022) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 173
I (10032) serverless_mqtt_webrtc: Signaling ice info handler 0x0
I (10042) DTLS: Init SRTP OK
I (11512) DTLS_SRTP: dtls_srtp init done
I (11522) APPRTC_SIG: Registering signaling channel.
I (11522) APPRTC_SIG: Connecting to wss://webrtc.espressif.com:8089/ws...
I (11532) websocket_client: Started
I (11532) serverless_mqtt_webrtc: Waiting for peer to connect
I (12122) esp-x509-crt-bundle: Certificate validated
I (13502) APPRTC_SIG: WEBSOCKET_EVENT_CONNECTED
I (13502) APPRTC_SIG: send to remote : {"cmd":"register","roomid":"111116","clientid":"93827452"}
I (13502) serverless_mqtt_webrtc: Peer state: 2
I (13522) AGENT: Start agent as Controlling
I (13522) PEER_DEF: Start DTLS role as 1
I (13522) AGENT: Send STUN binding request
I (13522) AGENT: Send allocate now
Got error code 401
I (13802) AGENT: Send allocate now
I (14112) AGENT: 0 Get candidate success user:1752835118:ninefingers psw:/a8EMa7VBKpFa1I4Rdpv561YDPw=
I (14112) APPRTC_SIG: Begin to send offer to https://webrtc.espressif.com/message/111116/93827452
I (14782) esp-x509-crt-bundle: Certificate validated
I (16472) HTTPS_CLIENT: HTTP POST Status = 200, content_length = 21
I (21602) PEER_DEF: A SRC: 4
I (21602) PEER_DEF: Get peer role 0
I (21602) PEER_DEF: Get peer role 0
I (21602) AGENT: 0 Add remote type:2 185.194.44.31:60872
I (21612) AGENT: 0 Add remote type:4 172.31.6.33:59012
I (21612) AGENT: 0 Add remote type:1 192.168.0.41:60872
0 Sorted pair 0 type: 1 local:192.168.0.42:63459 Remote:192.168.0.41:60872
0 Sorted pair 1 type: 2 local:185.194.44.31:63459 Remote:185.194.44.31:60872
0 Sorted pair 2 type: 4 local:172.31.6.33:59013 Remote:172.31.6.33:59012
I (21642) serverless_mqtt_webrtc: Peer state: 3
I (22002) AGENT: 0 Send binding request (cand:0) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:76e5004e797d87a62756c714
I (22002) AGENT: 0 Send binding request (cand:0) local:185.194.44.31:63459 remote:185.194.44.31:60872 id:4c598d93643bd7252b449456
I (22012) AGENT: 0 Send binding request (cand:0) local:172.31.6.33:59013 remote:172.31.6.33:59012 id:6db505597d157d8d71dfed43
I (22022) AGENT: 0 send indication bind request
I (22202) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22202) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22212) AGENT: 0 Select pair192.168.0.41:60872
I (22212) AGENT: 0 Send binding request (cand:1) local:192.168.0.42:63459 remote:192.168.0.41:60872 id:45cb977b6ea3bbbe3a984b90
I (22222) AGENT: 0 PeerIndication recv local:172.31.6.33:59013 remote:172.31.6.33:59012
I (22402) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22402) AGENT: 0 Send binding response local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22412) AGENT: 0 Candidate responsed
I (22412) AGENT: 0 PeerBinding recv local:192.168.0.42:63459 remote:192.168.0.41:60872
I (22422) AGENT: 0 Connection OK 192.168.0.41:60872
I (22422) serverless_mqtt_webrtc: Peer state: 5
I (22452) DTLS: Start to do server handshake
Works as 1
I (23842) DTLS: SRTP connected OK
I (23852) DTLS: Server handshake success
I (23852) PEER_DEF: DTLS handshake success
I (23852) serverless_mqtt_webrtc: Peer state: 6
I (23852) serverless_mqtt_webrtc: Peer is connected!
I (23862) serverless_mqtt: local client event id:7
22: New connection from 192.168.4.1:63904 on port 1883.
22: New client connected from 192.168.4.1:63904 as local_mqtt (p2, c1, k120).
I (23892) serverless_mqtt: local client connected
I (23872) serverless_mqtt: Everything is ready, exiting main task
I (23892) main_task: Returned from app_main()
I (24552) SCTP: 0 Receive chunk 1 SCTP_INIT
I (24552) SCTP: 0 state 2
I (24552) SCTP: Send INIT_ACK chunk
I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO
I (24762) SCTP: Send ECHO_ACK chunk
I (24762) SCTP: 0 state 5
I (24762) serverless_mqtt_webrtc: Peer state: 8
I (24762) SCTP: 0 Receive chunk 10 SCTP_COOKIE_ECHO
I (24772) SCTP: Send ECHO_ACK chunk
I (24962) SCTP: Get DCEP esp_channel event:3 type:0 si:2
I (24972) serverless_mqtt_webrtc: Peer state: 9
```
## Warning
This example uses libjuice as a dependency:
This example uses `libjuice` as a dependency:
* libjuice (UDP Interactive Connectivity Establishment): https://github.com/paullouisageneau/libjuice

View File

@ -0,0 +1,28 @@
From cdc43a56f5ea1ab1935f55f47f8644f5dd30825e Mon Sep 17 00:00:00 2001
From: David Cermak <cermak@espressif.com>
Date: Thu, 10 Jul 2025 11:09:57 +0200
Subject: [PATCH] fix(media_lib): Remove deprecated freeRTOS header
---
components/media_lib_sal/port/media_lib_os_freertos.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/components/media_lib_sal/port/media_lib_os_freertos.c b/components/media_lib_sal/port/media_lib_os_freertos.c
index d248d59..aea0527 100644
--- a/components/media_lib_sal/port/media_lib_os_freertos.c
+++ b/components/media_lib_sal/port/media_lib_os_freertos.c
@@ -40,8 +40,12 @@
#include "esp_idf_version.h"
#if CONFIG_FREERTOS_ENABLE_TASK_SNAPSHOT
+#if (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 2, 0))
+#include "esp_private/freertos_debug.h"
+#else
#include "freertos/task_snapshot.h"
#endif
+#endif
#ifdef __XTENSA__
#include "esp_debug_helpers.h"
--
2.43.0

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
set -e
echo "bin_dir: $1"
bin_dir="$1"
THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
ESP_PEER_VERSION="ccff3bd65cea750bf6c0abcf9d95b931ba9329f0"
ESP_PEER_URL="https://github.com/espressif/esp-webrtc-solution/archive/${ESP_PEER_VERSION}.zip"
ESP_PEER_DIR="${bin_dir}/esp-peer"
ZIP_PATH="${bin_dir}/esp-peer.zip"
EXTRACTED_DIR="${ESP_PEER_DIR}/esp-webrtc-solution-${ESP_PEER_VERSION}"
COMPONENTS_SRC="${EXTRACTED_DIR}/components"
COMPONENTS_DST="${ESP_PEER_DIR}/components"
PATCH_FILE_1="${THIS_DIR}/Remove-deprecated-freeRTOS-header.patch"
PATCH_FILE_2="${THIS_DIR}/libpeer-Add-direct-dependency-to-libsrtp.patch"
# Download if not exists
if [ ! -d "$EXTRACTED_DIR" ]; then
echo "Downloading esp-peer ${ESP_PEER_VERSION}..."
wget -O "$ZIP_PATH" "$ESP_PEER_URL"
unzip -o "$ZIP_PATH" -d "$ESP_PEER_DIR"
patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_1"
patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_2"
mv ${EXTRACTED_DIR}/components ${ESP_PEER_DIR}
mv ${ESP_PEER_DIR}/components/esp_webrtc/impl/peer_default ${ESP_PEER_DIR}/components
fi

View File

@ -0,0 +1,23 @@
From 695e057000698f4897b6c5802851499842e2fe31 Mon Sep 17 00:00:00 2001
From: David Cermak <cermak@espressif.com>
Date: Fri, 11 Jul 2025 16:59:21 +0200
Subject: [PATCH] fix(libpeer): Add direct dependency to libsrtp
---
components/esp_webrtc/impl/peer_default/CMakeLists.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/components/esp_webrtc/impl/peer_default/CMakeLists.txt b/components/esp_webrtc/impl/peer_default/CMakeLists.txt
index 2af35cf..3fb4615 100644
--- a/components/esp_webrtc/impl/peer_default/CMakeLists.txt
+++ b/components/esp_webrtc/impl/peer_default/CMakeLists.txt
@@ -2,6 +2,6 @@ idf_component_register(INCLUDE_DIRS ./include)
get_filename_component(BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR} NAME)
add_prebuilt_library(${BASE_DIR} "${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}/libpeer_default.a"
- PRIV_REQUIRES ${BASE_DIR} esp_timer)
+ PRIV_REQUIRES ${BASE_DIR} esp_timer espressif__esp_libsrtp)
target_link_libraries(${COMPONENT_LIB} INTERFACE "-L ${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}")
target_link_libraries(${COMPONENT_LIB} INTERFACE peer_default)
--
2.43.0

View File

@ -1,4 +1,14 @@
if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER)
set(PEER_BACKEND_SRC "peer_impl_webrtc.c")
else()
set(PEER_BACKEND_SRC "peer_impl_juice.c")
endif()
idf_component_register(SRCS "serverless_mqtt.c"
"wifi_connect.c"
"${PEER_BACKEND_SRC}"
INCLUDE_DIRS "."
REQUIRES libjuice nvs_flash mqtt json esp_wifi)
if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER)
idf_component_optional_requires(PUBLIC media_lib_sal esp_webrtc peer_default)
endif()

View File

@ -33,8 +33,40 @@ menu "Example Configuration"
WiFi station password for the example to use.
endmenu
choice EXAMPLE_PEER_LIB
prompt "Choose peer library"
default EXAMPLE_PEER_LIB_LIBJUICE
help
Choose the peer library to use for WebRTC communication.
libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling)
esp_peer: Use ESP-IDF specific peer library
config EXAMPLE_PEER_LIB_ESP_PEER
bool "esp_peer"
config EXAMPLE_PEER_LIB_LIBJUICE
bool "libjuice"
endchoice
config EXAMPLE_WEBRTC_URL
string "WebRTC server URL"
depends on EXAMPLE_PEER_LIB_ESP_PEER
default "https://webrtc.espressif.com/join/"
help
URL of WebRTC remote endpoint.
config EXAMPLE_WEBRTC_ROOM_ID
string "WebRTC room ID"
depends on EXAMPLE_PEER_LIB_ESP_PEER
default "12345"
help
Room ID for WebRTC synchronisation.
Could be a random number, but the same for both peers.
config EXAMPLE_MQTT_BROKER_URI
string "MQTT Broker URL"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default "mqtt://mqtt.eclipseprojects.io"
help
URL of the mqtt broker use for synchronisation and exchanging
@ -42,12 +74,14 @@ menu "Example Configuration"
config EXAMPLE_MQTT_SYNC_TOPIC
string "MQTT topic for synchronisation"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default "/topic/serverless_mqtt"
help
MQTT topic used fo synchronisation.
config EXAMPLE_STUN_SERVER
string "Hostname of STUN server"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default "stun.l.google.com"
help
STUN server hostname.
@ -67,6 +101,7 @@ menu "Example Configuration"
choice EXAMPLE_SERVERLESS_ROLE
prompt "Choose your role"
depends on EXAMPLE_PEER_LIB_LIBJUICE
default EXAMPLE_SERVERLESS_ROLE_PEER1
help
Choose either peer1 or peer2.

View File

@ -0,0 +1,18 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include <stdio.h>
#include "esp_random.h"
#include "esp_sleep.h"
#include "mosq_broker.h"
typedef void (*on_peer_recv_t)(const char *data, size_t size);
esp_err_t peer_init(on_peer_recv_t cb);
void peer_get_buffer(char ** buffer, size_t *buffer_len);
void peer_send(char* data, size_t size);

View File

@ -0,0 +1,283 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include <stdio.h>
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "mqtt_client.h"
#include "esp_wifi.h"
#include "esp_log.h"
#include "esp_check.h"
#include "juice/juice.h"
#include "cJSON.h"
#include "peer_impl.h"
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
#define OUR_PEER "1"
#define THEIR_PEER "2"
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
#define OUR_PEER "2"
#define THEIR_PEER "1"
#endif
#define PEER_SYNC0 BIT(0)
#define PEER_SYNC1 BIT(1)
#define PEER_SYNC2 BIT(2)
#define PEER_FAIL BIT(3)
#define PEER_GATHER_DONE BIT(4)
#define PEER_DESC_PUBLISHED BIT(5)
#define PEER_CONNECTED BIT(6)
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
static const char *TAG = "serverless_mqtt" OUR_PEER;
static char s_buffer[MAX_BUFFER_SIZE];
static EventGroupHandle_t s_state = NULL;
static juice_agent_t *s_agent = NULL;
static cJSON *s_peer_desc_json = NULL;
static char *s_peer_desc = NULL;
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
static on_peer_recv_t s_on_recv = NULL;
char *wifi_get_ipv4(wifi_interface_t interface);
static esp_err_t sync_peers(void);
static esp_err_t create_candidates(void);
void peer_get_buffer(char ** buffer, size_t *buffer_len)
{
if (buffer && buffer_len) {
*buffer = s_buffer;
*buffer_len = MAX_BUFFER_SIZE;
}
}
void peer_send(char* data, size_t size)
{
juice_send(s_agent, data, size);
}
esp_err_t peer_init(on_peer_recv_t cb)
{
esp_err_t ret = ESP_FAIL;
ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback");
s_on_recv = cb;
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
if (bits & PEER_CONNECTED) {
ESP_LOGI(TAG, "Peer is connected!");
return ESP_OK;
}
err:
ESP_LOGE(TAG, "Failed to init peer");
return ret;
}
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
}
xEventGroupSetBits(s_state, PEER_SYNC0);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
break;
}
EventBits_t bits = xEventGroupGetBits(s_state);
if (event->data_len > 1 && s_agent) {
cJSON *root = cJSON_Parse(event->data);
if (root == NULL) {
break;
}
cJSON *desc = cJSON_GetObjectItem(root, "desc");
if (desc == NULL) {
cJSON_Delete(root);
break;
}
printf("desc->valuestring:%s\n", desc->valuestring);
juice_set_remote_description(s_agent, desc->valuestring);
char cand_name[] = "cand0";
while (true) {
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
if (cand == NULL) {
break;
}
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
juice_add_remote_candidate(s_agent, cand->valuestring);
cand_name[4]++;
}
cJSON_Delete(root);
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
// and destroy the mqtt client
}
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
}
#else
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC1);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
}
#endif
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
static esp_err_t sync_peers(void)
{
esp_err_t ret = ESP_OK;
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
};
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
err, TAG, "Failed to register mqtt event handler");
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
ESP_LOGI(TAG, "Waiting for the other peer...");
const int max_sync_retry = 60;
int retry = 0;
while (true) {
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
if (bits & PEER_SYNC2) {
break;
}
if (bits & PEER_SYNC1) {
continue;
}
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish mqtt message");
#endif
}
ESP_LOGI(TAG, "Sync done");
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish peer's description");
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
err:
free(s_peer_desc);
esp_mqtt_client_destroy(client);
return ret;
}
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
{
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
if (state == JUICE_STATE_CONNECTED) {
xEventGroupSetBits(s_state, PEER_CONNECTED);
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
esp_restart();
}
}
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
{
static uint8_t cand_nr = 0;
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
char cand_name[] = "cand0";
cand_name[4] += cand_nr++;
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
}
}
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
{
ESP_LOGI(TAG, "Gathering done");
if (s_state) {
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
}
}
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
{
if (s_local_mqtt) {
s_on_recv(data, size);
} else {
ESP_LOGI(TAG, "No local mqtt client, dropping data");
}
}
static esp_err_t create_candidates(void)
{
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
s_peer_desc_json = cJSON_CreateObject();
esp_err_t ret = ESP_OK;
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
.stun_server_port = 19302,
.cb_state_changed = juice_state,
.cb_candidate = juice_candidate,
.cb_gathering_done = juice_gathering_done,
.cb_recv = juice_recv,
};
s_agent = juice_create(&config);
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to get local description");
ESP_LOGI(TAG, "desc: %s", s_buffer);
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
s_peer_desc = cJSON_Print(s_peer_desc_json);
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
cJSON_Delete(s_peer_desc_json);
return ESP_OK;
err:
juice_destroy(s_agent);
s_agent = NULL;
cJSON_Delete(s_peer_desc_json);
s_peer_desc_json = NULL;
return ret;
}

View File

@ -0,0 +1,248 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include "media_lib_os.h"
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "media_lib_adapter.h"
#include "media_lib_os.h"
#include "esp_log.h"
#include "esp_webrtc_defaults.h"
#include "esp_peer_default.h"
#include "common.h"
#include "esp_check.h"
#include "peer_impl.h"
#include "sdkconfig.h"
#define WEBRTC_URL (CONFIG_EXAMPLE_WEBRTC_URL CONFIG_EXAMPLE_WEBRTC_ROOM_ID)
#define PEER_CONNECTED BIT(0)
#define PEER_DISCONNECTED BIT(1)
#define MAX_BUFFER_SIZE (4*1024)
static EventGroupHandle_t s_state = NULL;
static const char *TAG = "serverless_mqtt_webrtc";
void peer_get_buffer(char ** buffer, size_t *buffer_len)
{
static char s_buffer[MAX_BUFFER_SIZE];
if (buffer && buffer_len) {
*buffer = s_buffer;
*buffer_len = MAX_BUFFER_SIZE;
}
}
static int start_webrtc(char *url);
static int stop_webrtc(void);
static on_peer_recv_t s_on_recv = NULL;
static esp_peer_signaling_handle_t signaling = NULL;
static esp_peer_handle_t peer = NULL;
static bool peer_running = false;
static void thread_scheduler(const char *thread_name, media_lib_thread_cfg_t *thread_cfg)
{
if (strcmp(thread_name, "pc_task") == 0) {
thread_cfg->stack_size = 25 * 1024;
thread_cfg->priority = 18;
thread_cfg->core_id = 1;
}
}
esp_err_t peer_init(on_peer_recv_t cb)
{
esp_err_t ret = ESP_OK;
s_on_recv = cb;
s_state = xEventGroupCreate();
media_lib_add_default_adapter();
media_lib_thread_set_schedule_cb(thread_scheduler);
ESP_RETURN_ON_FALSE(s_state, ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
ESP_GOTO_ON_FALSE(start_webrtc(WEBRTC_URL) == ESP_PEER_ERR_NONE, ESP_FAIL, err, TAG, "Failed to start webRTC");
ESP_LOGI(TAG, "Waiting for peer to connect");
int i = 0;
while (1) {
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(1000));
if (bits & PEER_CONNECTED) {
ESP_LOGI(TAG, "Peer is connected!");
return ret;
}
ESP_GOTO_ON_FALSE(i++ < 100, ESP_ERR_TIMEOUT, err, TAG, "Peer connection timeout");
if (peer) {
esp_peer_query(peer);
}
}
err:
vEventGroupDelete(s_state);
return ret;
}
static int peer_state_handler(esp_peer_state_t state, void* ctx)
{
ESP_LOGI(TAG, "Peer state: %d", state);
if (state == ESP_PEER_STATE_CONNECTED) {
xEventGroupSetBits(s_state, PEER_CONNECTED);
} else if (state == ESP_PEER_STATE_DISCONNECTED) {
xEventGroupSetBits(s_state, PEER_DISCONNECTED);
}
return 0;
}
static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx)
{
if (msg->type == ESP_PEER_MSG_TYPE_SDP) {
// Send local SDP to signaling server
esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg);
}
return 0;
}
static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx)
{
return 0;
}
static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx)
{
return 0;
}
static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx)
{
ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]);
return 0;
}
static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx)
{
return 0;
}
static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx)
{
if (frame && frame->size > 0) {
s_on_recv((char*)frame->data, frame->size);
}
return 0;
}
static void pc_task(void *arg)
{
while (peer_running) {
esp_peer_main_loop(peer);
media_lib_thread_sleep(20);
}
media_lib_thread_destroy(NULL);
}
static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx)
{
if (peer == NULL) {
esp_peer_default_cfg_t peer_cfg = {
.agent_recv_timeout = 500,
};
esp_peer_cfg_t cfg = {
.server_lists = &info->server_info,
.server_num = 1,
.audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV,
.audio_info = {
.codec = ESP_PEER_AUDIO_CODEC_G711A,
},
.enable_data_channel = true,
.role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED,
.on_state = peer_state_handler,
.on_msg = peer_msg_handler,
.on_video_info = peer_video_info_handler,
.on_audio_info = peer_audio_info_handler,
.on_video_data = peer_video_data_handler,
.on_audio_data = peer_audio_data_handler,
.on_data = peer_data_handler,
.ctx = ctx,
.extra_cfg = &peer_cfg,
.extra_size = sizeof(esp_peer_default_cfg_t),
};
int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer);
if (ret != ESP_PEER_ERR_NONE) {
return ret;
}
media_lib_thread_handle_t thread = NULL;
peer_running = true;
media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL);
if (thread == NULL) {
peer_running = false;
}
}
return 0;
}
static int signaling_connected_handler(void* ctx)
{
if (peer) {
return esp_peer_new_connection(peer);
}
return 0;
}
static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx)
{
if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) {
esp_peer_close(peer);
peer = NULL;
} else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) {
// Receive remote SDP
if (peer) {
esp_peer_send_msg(peer, (esp_peer_msg_t*)msg);
}
}
return 0;
}
static int signaling_close_handler(void *ctx)
{
return 0;
}
static int start_signaling(char* url)
{
esp_peer_signaling_cfg_t cfg = {
.signal_url = url,
.on_ice_info = signaling_ice_info_handler,
.on_connected = signaling_connected_handler,
.on_msg = signaling_msg_handler,
.on_close = signaling_close_handler,
};
// Use APPRTC signaling
return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling);
}
static int start_webrtc(char *url)
{
stop_webrtc();
return start_signaling(url);
}
static int stop_webrtc(void)
{
peer_running = false;
if (peer) {
esp_peer_close(peer);
peer = NULL;
}
if (signaling) {
esp_peer_signaling_stop(signaling);
signaling = NULL;
}
return 0;
}
void peer_send(char* data, size_t size)
{
esp_peer_data_frame_t data_frame = {
.type = ESP_PEER_DATA_CHANNEL_DATA,
.data = (uint8_t*)data,
.size = size,
};
esp_peer_send_data(peer, &data_frame);
}

View File

@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
* SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
@ -13,30 +13,9 @@
#include "esp_check.h"
#include "esp_sleep.h"
#include "mosq_broker.h"
#include "juice/juice.h"
#include "cJSON.h"
#include "peer_impl.h"
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
#define OUR_PEER "1"
#define THEIR_PEER "2"
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
#define OUR_PEER "2"
#define THEIR_PEER "1"
#endif
#define PEER_SYNC0 BIT(0)
#define PEER_SYNC1 BIT(1)
#define PEER_SYNC2 BIT(2)
#define PEER_FAIL BIT(3)
#define PEER_GATHER_DONE BIT(4)
#define PEER_DESC_PUBLISHED BIT(5)
#define PEER_CONNECTED BIT(6)
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
#define ALIGN(size) (((size) + 3U) & ~(3U))
typedef struct message_wrap {
uint16_t topic_len;
@ -44,196 +23,18 @@ typedef struct message_wrap {
char data[];
} __attribute__((packed)) message_wrap_t;
static const char *TAG = "serverless_mqtt" OUR_PEER;
static char s_buffer[MAX_BUFFER_SIZE];
static EventGroupHandle_t s_state = NULL;
static juice_agent_t *s_agent = NULL;
static cJSON *s_peer_desc_json = NULL;
static char *s_peer_desc = NULL;
static const char *TAG = "serverless_mqtt";
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
char *wifi_get_ipv4(wifi_interface_t interface);
esp_err_t wifi_connect(void);
static esp_err_t sync_peers(void);
static esp_err_t create_candidates(void);
static esp_err_t create_local_client(void);
static esp_err_t create_local_broker(void);
void app_main(void)
{
__attribute__((__unused__)) esp_err_t ret;
ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi");
ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker");
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
if (bits & PEER_CONNECTED) {
ESP_LOGI(TAG, "Peer is connected!");
ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client");
ESP_LOGI(TAG, "Everything is ready, exiting main task");
return;
}
err:
ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)");
esp_deep_sleep(1000000LL * (esp_random() % 20));
}
esp_err_t peer_init(on_peer_recv_t cb);
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
}
xEventGroupSetBits(s_state, PEER_SYNC0);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
break;
}
EventBits_t bits = xEventGroupGetBits(s_state);
if (event->data_len > 1 && s_agent) {
cJSON *root = cJSON_Parse(event->data);
if (root == NULL) {
break;
}
cJSON *desc = cJSON_GetObjectItem(root, "desc");
if (desc == NULL) {
cJSON_Delete(root);
break;
}
printf("desc->valuestring:%s\n", desc->valuestring);
juice_set_remote_description(s_agent, desc->valuestring);
char cand_name[] = "cand0";
while (true) {
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
if (cand == NULL) {
break;
}
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
juice_add_remote_candidate(s_agent, cand->valuestring);
cand_name[4]++;
}
cJSON_Delete(root);
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
// and destroy the mqtt client
}
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
}
#else
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
xEventGroupSetBits(s_state, PEER_SYNC1);
} else {
xEventGroupSetBits(s_state, PEER_FAIL);
}
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
xEventGroupSetBits(s_state, PEER_SYNC2);
}
#endif
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
xEventGroupSetBits(s_state, PEER_FAIL);
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
static esp_err_t sync_peers(void)
{
esp_err_t ret = ESP_OK;
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
};
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
err, TAG, "Failed to register mqtt event handler");
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
ESP_LOGI(TAG, "Waiting for the other peer...");
const int max_sync_retry = 60;
int retry = 0;
while (true) {
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
if (bits & PEER_SYNC2) {
break;
}
if (bits & PEER_SYNC1) {
continue;
}
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish mqtt message");
#endif
}
ESP_LOGI(TAG, "Sync done");
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
ESP_FAIL, TAG, "Failed to publish peer's description");
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
err:
free(s_peer_desc);
esp_mqtt_client_destroy(client);
return ret;
}
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
{
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
if (state == JUICE_STATE_CONNECTED) {
xEventGroupSetBits(s_state, PEER_CONNECTED);
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
esp_restart();
}
}
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
{
static uint8_t cand_nr = 0;
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
char cand_name[] = "cand0";
cand_name[4] += cand_nr++;
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
}
}
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
{
ESP_LOGI(TAG, "Gathering done");
if (s_state) {
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
}
}
#define ALIGN(size) (((size) + 3U) & ~(3U))
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
static void peer_recv(const char *data, size_t size)
{
if (s_local_mqtt) {
message_wrap_t *message = (message_wrap_t *)data;
@ -250,45 +51,21 @@ static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void
ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload);
esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0);
}
}
static esp_err_t create_candidates(void)
void app_main(void)
{
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
s_peer_desc_json = cJSON_CreateObject();
esp_err_t ret = ESP_OK;
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
.stun_server_port = 19302,
.cb_state_changed = juice_state,
.cb_candidate = juice_candidate,
.cb_gathering_done = juice_gathering_done,
.cb_recv = juice_recv,
};
s_agent = juice_create(&config);
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to get local description");
ESP_LOGI(TAG, "desc: %s", s_buffer);
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
s_peer_desc = cJSON_Print(s_peer_desc_json);
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
cJSON_Delete(s_peer_desc_json);
return ESP_OK;
__attribute__((__unused__)) esp_err_t ret;
ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi");
ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker");
ESP_GOTO_ON_ERROR(peer_init(peer_recv), err, TAG, "Failed to init peer library");
ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client");
ESP_LOGI(TAG, "Everything is ready, exiting main task");
return;
err:
juice_destroy(s_agent);
s_agent = NULL;
cJSON_Delete(s_peer_desc_json);
s_peer_desc_json = NULL;
return ret;
ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)");
esp_deep_sleep(1000000LL * (esp_random() % 20));
}
static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data)
@ -335,18 +112,26 @@ err:
static void handle_message(char *client, char *topic, char *payload, int len, int qos, int retain)
{
if (client && strcmp(client, "local_mqtt") == 0 ) {
if (client && strcmp(client, "local_mqtt") == 0) {
// This is our little local client -- do not forward
return;
}
ESP_LOGI(TAG, "handle_message topic:%s", topic);
ESP_LOGI(TAG, "handle_message data:%.*s", len, payload);
ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain);
if (s_local_mqtt && s_agent) {
if (s_local_mqtt) {
static char *s_buffer = NULL;
static size_t s_buffer_len = 0;
if (s_buffer == NULL || s_buffer_len == 0) {
peer_get_buffer(&s_buffer, &s_buffer_len);
if (s_buffer == NULL || s_buffer_len == 0) {
return;
}
}
int topic_len = strlen(topic) + 1; // null term
int topic_len_aligned = ALIGN(topic_len);
int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len;
if (total_msg_len > MAX_BUFFER_SIZE) {
if (total_msg_len > s_buffer_len) {
ESP_LOGE(TAG, "Fail to forward, message too long");
return;
}
@ -356,7 +141,7 @@ static void handle_message(char *client, char *topic, char *payload, int len, in
memcpy(s_buffer + 4, topic, topic_len);
memcpy(s_buffer + 4 + topic_len_aligned, payload, len);
juice_send(s_agent, s_buffer, total_msg_len);
peer_send(s_buffer, total_msg_len);
}
}

View File

@ -0,0 +1,2 @@
CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=n
CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=y

View File

@ -0,0 +1,5 @@
CONFIG_IDF_TARGET="esp32"
CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=y
CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=n
CONFIG_SPIRAM=y
CONFIG_MBEDTLS_EXTERNAL_MEM_ALLOC=y

View File

@ -1,3 +1,6 @@
CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y
CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384
CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768
CONFIG_LWIP_SNTP_MAX_SERVERS=2
CONFIG_MBEDTLS_SSL_PROTO_DTLS=y
CONFIG_MBEDTLS_SSL_DTLS_SRTP=y

View File

@ -2,7 +2,7 @@ menu "Example Configuration"
config BROKER_URL
string "Broker URL"
default "mqtt://mqtt.eclipseprojects.io"
default "mqtt://test.mosquitto.org"
help
URL of the broker to connect to