Merge pull request #849 from david-cermak/feat/modem_tcp_client_multi_conn

[modem]: tcp-client example to support multiple connections
This commit is contained in:
david-cermak
2025-11-18 15:25:21 +01:00
committed by GitHub
14 changed files with 441 additions and 165 deletions

View File

@@ -22,3 +22,18 @@ To enable this mode, please set `EXAMPLE_CUSTOM_TCP_TRANSPORT=y`
This configuration could be used with any network library, which is connecting to a localhost endpoint instead of remote one. This example creates a localhost listener which basically mimics the remote endpoint by forwarding the traffic between the library and the TCP/socket layer of the modem (which is already secure if the TLS is used in the network library) This configuration could be used with any network library, which is connecting to a localhost endpoint instead of remote one. This example creates a localhost listener which basically mimics the remote endpoint by forwarding the traffic between the library and the TCP/socket layer of the modem (which is already secure if the TLS is used in the network library)
![with localhost listener](at_client_localhost.png) ![with localhost listener](at_client_localhost.png)
### Multi-connection support
This example supports opening multiple TCP connections concurrently when the modem firmware allows it.
- ESP-AT: Multi-connection mode is enabled via `AT+CIPMUX=1`. The example assigns a unique link ID per DCE instance and includes the link ID in `CIPSTART/CIPSEND/CIPRECVDATA` commands.
- BG96/SIM7600: The example uses module-specific multi-connection syntax (for example `QIOPEN/CIPOPEN` with a connection ID) and tracks link IDs internally.
How it works:
- The `sock_dce` layer creates multiple DCE instances over a shared DTE. A lightweight mutex coordinates access to the UART so only one DCE issues AT commands at a time.
- Asynchronous URCs (for example `+IPD`, `+QIURC`, `+CIPRXGET: 1,<cid>`) wake the corresponding DCE which then performs receive operations for its link.
Usage:
- `app_main` starts two DCE tasks to demonstrate concurrent connections. Adjust the number of DCE instances as needed.
- For ESP-AT, ensure your firmware supports `CIPMUX=1` and passive receive (`CIPRECVTYPE`).

View File

@@ -44,6 +44,18 @@ menu "Example Configuration"
help help
Set APN (Access Point Name), a logical name to choose data network Set APN (Access Point Name), a logical name to choose data network
config EXAMPLE_USE_TLS
bool "Use TLS for MQTT broker"
default n
help
Enable TLS for connection to the MQTT broker.
config EXAMPLE_BROKER_HOST
string "MQTT broker host"
default "test.mosquitto.org"
help
Hostname or IP address of the MQTT broker.
menu "UART Configuration" menu "UART Configuration"
config EXAMPLE_MODEM_UART_TX_PIN config EXAMPLE_MODEM_UART_TX_PIN
int "TXD Pin Number" int "TXD Pin Number"

View File

@@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
#include <algorithm>
#include <charconv> #include <charconv>
#include <sys/socket.h> #include <sys/socket.h>
#include "esp_vfs.h" #include "esp_vfs.h"
@@ -14,6 +15,29 @@
namespace sock_dce { namespace sock_dce {
constexpr auto const *TAG = "sock_dce"; constexpr auto const *TAG = "sock_dce";
constexpr auto WAIT_TO_IDLE_TIMEOUT = 5000;
// Definition of the static member variables
std::vector<DCE *> DCE::dce_list{};
bool DCE::network_init = false;
int Responder::s_link_id = 0;
SemaphoreHandle_t Responder::s_dte_mutex{};
// Constructor - add this DCE instance to the static list
DCE::DCE(std::shared_ptr<esp_modem::DTE> dte_arg, const esp_modem_dce_config *config)
: Module(std::move(dte_arg), config)
{
dce_list.push_back(this);
}
// Destructor - remove this DCE instance from the static list
DCE::~DCE()
{
auto it = std::find(dce_list.begin(), dce_list.end(), this);
if (it != dce_list.end()) {
dce_list.erase(it);
}
}
bool DCE::perform_sock() bool DCE::perform_sock()
@@ -61,13 +85,26 @@ bool DCE::perform_sock()
void DCE::perform_at(uint8_t *data, size_t len) void DCE::perform_at(uint8_t *data, size_t len)
{ {
ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_VERBOSE); if (state != status::RECEIVING) {
std::string_view resp_sv((char *)data, len);
at.check_urc(state, resp_sv);
if (state == status::IDLE) {
return;
}
}
// Trace incoming AT bytes when handling a response; use DEBUG level
ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_DEBUG);
switch (at.process_data(state, data, len)) { switch (at.process_data(state, data, len)) {
case Responder::ret::OK: case Responder::ret::OK:
// Release DTE access for this link after processing data
ESP_LOGD(TAG, "GIVE data %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::IDLE; state = status::IDLE;
signal.set(IDLE); signal.set(IDLE);
return; return;
case Responder::ret::FAIL: case Responder::ret::FAIL:
ESP_LOGD(TAG, "GIVE data %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::FAILED; state = status::FAILED;
signal.set(IDLE); signal.set(IDLE);
return; return;
@@ -82,10 +119,14 @@ void DCE::perform_at(uint8_t *data, size_t len)
std::string_view response((char *)data, len); std::string_view response((char *)data, len);
switch (at.check_async_replies(state, response)) { switch (at.check_async_replies(state, response)) {
case Responder::ret::OK: case Responder::ret::OK:
ESP_LOGD(TAG, "GIVE command %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::IDLE; state = status::IDLE;
signal.set(IDLE); signal.set(IDLE);
return; return;
case Responder::ret::FAIL: case Responder::ret::FAIL:
ESP_LOGD(TAG, "GIVE command %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::FAILED; state = status::FAILED;
signal.set(IDLE); signal.set(IDLE);
return; return;
@@ -121,7 +162,7 @@ bool DCE::at_to_sock()
uint64_t data; uint64_t data;
read(data_ready_fd, &data, sizeof(data)); read(data_ready_fd, &data, sizeof(data));
ESP_LOGD(TAG, "select read: modem data available %" PRIu64, data); ESP_LOGD(TAG, "select read: modem data available %" PRIu64, data);
if (!signal.wait(IDLE, 1000)) { if (!signal.wait(IDLE, WAIT_TO_IDLE_TIMEOUT)) {
ESP_LOGE(TAG, "Failed to get idle"); ESP_LOGE(TAG, "Failed to get idle");
close_sock(); close_sock();
return false; return false;
@@ -131,6 +172,10 @@ bool DCE::at_to_sock()
close_sock(); close_sock();
return false; return false;
} }
// Take DTE mutex before issuing receive on this link
ESP_LOGD(TAG, "TAKE RECV %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD(TAG, "TAKEN RECV %d", at.link_id);
state = status::RECEIVING; state = status::RECEIVING;
at.start_receiving(at.get_buf_len()); at.start_receiving(at.get_buf_len());
return true; return true;
@@ -139,7 +184,7 @@ bool DCE::at_to_sock()
bool DCE::sock_to_at() bool DCE::sock_to_at()
{ {
ESP_LOGD(TAG, "socket read: data available"); ESP_LOGD(TAG, "socket read: data available");
if (!signal.wait(IDLE, 1000)) { if (!signal.wait(IDLE, WAIT_TO_IDLE_TIMEOUT)) {
ESP_LOGE(TAG, "Failed to get idle"); ESP_LOGE(TAG, "Failed to get idle");
close_sock(); close_sock();
return false; return false;
@@ -149,6 +194,10 @@ bool DCE::sock_to_at()
close_sock(); close_sock();
return false; return false;
} }
// Take DTE mutex before issuing send on this link
ESP_LOGD(TAG, "TAKE SEND %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD(TAG, "TAKEN SEND %d", at.link_id);
state = status::SENDING; state = status::SENDING;
int len = ::recv(sock, at.get_buf(), at.get_buf_len(), 0); int len = ::recv(sock, at.get_buf(), at.get_buf_len(), 0);
if (len < 0) { if (len < 0) {
@@ -201,7 +250,7 @@ void DCE::start_listening(int port)
} }
int opt = 1; int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
ESP_LOGI(TAG, "Socket created"); ESP_LOGD(TAG, "Socket created");
struct sockaddr_in addr = { }; struct sockaddr_in addr = { };
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_port = htons(port); addr.sin_port = htons(port);
@@ -213,7 +262,7 @@ void DCE::start_listening(int port)
ESP_LOGE(TAG, "Socket unable to bind: errno %d", errno); ESP_LOGE(TAG, "Socket unable to bind: errno %d", errno);
return; return;
} }
ESP_LOGI(TAG, "Socket bound, port %d", 1883); ESP_LOGD(TAG, "Socket bound, port %d", 1883);
err = listen(listen_sock, 1); err = listen(listen_sock, 1);
if (err != 0) { if (err != 0) {
ESP_LOGE(TAG, "Error occurred during listen: errno %d", errno); ESP_LOGE(TAG, "Error occurred during listen: errno %d", errno);
@@ -224,12 +273,12 @@ void DCE::start_listening(int port)
bool DCE::connect(std::string host, int port) bool DCE::connect(std::string host, int port)
{ {
dte->on_read(nullptr); data_ready_fd = eventfd(0, EFD_SUPPORT_ISR);
tcp_close(); assert(data_ready_fd > 0);
dte->on_read([this](uint8_t *data, size_t len) { // Take DTE mutex before starting connect for this link
this->perform_at(data, len); ESP_LOGD(TAG, "TAKE CONNECT %d", at.link_id);
return esp_modem::command_result::TIMEOUT; xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
}); ESP_LOGD(TAG, "TAKEN CONNECT %d", at.link_id);
if (!at.start_connecting(host, port)) { if (!at.start_connecting(host, port)) {
ESP_LOGE(TAG, "Unable to start connecting"); ESP_LOGE(TAG, "Unable to start connecting");
dte->on_read(nullptr); dte->on_read(nullptr);
@@ -241,12 +290,15 @@ bool DCE::connect(std::string host, int port)
bool DCE::init() bool DCE::init()
{ {
if (network_init) {
return true;
}
network_init = true;
Responder::s_dte_mutex = xSemaphoreCreateBinary();
xSemaphoreGive(at.s_dte_mutex);
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT(); esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
esp_vfs_eventfd_register(&config); esp_vfs_eventfd_register(&config);
data_ready_fd = eventfd(0, EFD_SUPPORT_ISR);
assert(data_ready_fd > 0);
dte->on_read(nullptr); dte->on_read(nullptr);
const int retries = 5; const int retries = 5;
int i = 0; int i = 0;
@@ -287,6 +339,10 @@ bool DCE::init()
esp_modem::Task::Delay(5000); esp_modem::Task::Delay(5000);
} }
ESP_LOGI(TAG, "Got IP %s", ip_addr.c_str()); ESP_LOGI(TAG, "Got IP %s", ip_addr.c_str());
dte->on_read([](uint8_t *data, size_t len) {
read_callback(data, len);
return esp_modem::command_result::TIMEOUT;
});
return true; return true;
} }

View File

@@ -34,6 +34,7 @@ public:
sock(s), data_ready_fd(ready_fd), dte(dte_arg) {} sock(s), data_ready_fd(ready_fd), dte(dte_arg) {}
ret process_data(status state, uint8_t *data, size_t len); ret process_data(status state, uint8_t *data, size_t len);
ret check_async_replies(status state, std::string_view &response); ret check_async_replies(status state, std::string_view &response);
ret check_urc(status state, std::string_view &response);
void start_sending(size_t len); void start_sending(size_t len);
void start_receiving(size_t len); void start_receiving(size_t len);
@@ -63,13 +64,19 @@ public:
return total_len; return total_len;
} }
// Unique link identifier used to target multi-connection AT commands
int link_id{s_link_id++};
// Shared mutex guarding DTE access across concurrent DCE instances
static SemaphoreHandle_t s_dte_mutex;
private: private:
static int s_link_id;
static constexpr size_t buffer_size = 512; static constexpr size_t buffer_size = 512;
bool on_read(char *data, size_t len) bool on_read(char *data, size_t len)
{ {
#ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT #ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
::send(sock, data, len, 0); ::send(sock, data, len, 0);
printf("sending %d\n", len);
#else #else
::memcpy(&buffer[actual_read], data, len); ::memcpy(&buffer[actual_read], data, len);
actual_read += len; actual_read += len;
@@ -101,6 +108,8 @@ private:
class DCE : public Module { class DCE : public Module {
using Module::Module; using Module::Module;
public: public:
DCE(std::shared_ptr<esp_modem::DTE> dte_arg, const esp_modem_dce_config *config);
~DCE();
/** /**
* @brief Opens network in AT command mode * @brief Opens network in AT command mode
@@ -163,6 +172,10 @@ public:
return 0; return 0;
} }
at.clear_offsets(); at.clear_offsets();
// Take DTE mutex before issuing receive on this link
ESP_LOGD("TAG", "TAKE RECV %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD("TAG", "TAKEN RECV %d", at.link_id);
state = status::RECEIVING; state = status::RECEIVING;
uint64_t data; uint64_t data;
read(data_ready_fd, &data, sizeof(data)); read(data_ready_fd, &data, sizeof(data));
@@ -184,6 +197,10 @@ public:
if (!wait_to_idle(timeout_ms)) { if (!wait_to_idle(timeout_ms)) {
return -1; return -1;
} }
// Take DTE mutex before issuing send on this link
ESP_LOGD("TAG", "TAKE SEND %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD("TAG", "TAKEN SEND %d", at.link_id);
state = status::SENDING; state = status::SENDING;
memcpy(at.get_buf(), buffer, len_to_send); memcpy(at.get_buf(), buffer, len_to_send);
ESP_LOG_BUFFER_HEXDUMP("dce", at.get_buf(), len, ESP_LOG_VERBOSE); ESP_LOG_BUFFER_HEXDUMP("dce", at.get_buf(), len, ESP_LOG_VERBOSE);
@@ -224,6 +241,14 @@ public:
} }
return -1; return -1;
} }
static std::vector<DCE *> dce_list;
static bool network_init;
static void read_callback(uint8_t *data, size_t len)
{
for (auto dce : dce_list) {
dce->perform_at(data, len);
}
}
private: private:
esp_modem::SignalGroup signal; esp_modem::SignalGroup signal;
void close_sock(); void close_sock();

View File

@@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
#include <algorithm>
#include <charconv> #include <charconv>
#include <sys/socket.h> #include <sys/socket.h>
#include "esp_vfs.h" #include "esp_vfs.h"
@@ -14,6 +15,29 @@
namespace sock_dce { namespace sock_dce {
constexpr auto const *TAG = "sock_dce"; constexpr auto const *TAG = "sock_dce";
constexpr auto WAIT_TO_IDLE_TIMEOUT = 5000;
// Definition of the static member variables
std::vector<DCE*> DCE::dce_list{};
bool DCE::network_init = false;
int Responder::s_link_id = 0;
SemaphoreHandle_t Responder::s_dte_mutex{};
// Constructor - add this DCE instance to the static list
DCE::DCE(std::shared_ptr<esp_modem::DTE> dte_arg, const esp_modem_dce_config *config)
: Module(std::move(dte_arg), config)
{
dce_list.push_back(this);
}
// Destructor - remove this DCE instance from the static list
DCE::~DCE()
{
auto it = std::find(dce_list.begin(), dce_list.end(), this);
if (it != dce_list.end()) {
dce_list.erase(it);
}
}
bool DCE::perform_sock() bool DCE::perform_sock()
@@ -61,13 +85,26 @@ bool DCE::perform_sock()
void DCE::perform_at(uint8_t *data, size_t len) void DCE::perform_at(uint8_t *data, size_t len)
{ {
ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_VERBOSE); if (state != status::RECEIVING) {
std::string_view resp_sv((char *)data, len);
at.check_urc(state, resp_sv);
if (state == status::IDLE) {
return;
}
}
// Trace incoming AT bytes when handling a response; use DEBUG level
ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_DEBUG);
switch (at.process_data(state, data, len)) { switch (at.process_data(state, data, len)) {
case Responder::ret::OK: case Responder::ret::OK:
// Release DTE access for this link after processing data
ESP_LOGD(TAG, "GIVE data %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::IDLE; state = status::IDLE;
signal.set(IDLE); signal.set(IDLE);
return; return;
case Responder::ret::FAIL: case Responder::ret::FAIL:
ESP_LOGD(TAG, "GIVE data %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::FAILED; state = status::FAILED;
signal.set(IDLE); signal.set(IDLE);
return; return;
@@ -82,10 +119,14 @@ void DCE::perform_at(uint8_t *data, size_t len)
std::string_view response((char *)data, len); std::string_view response((char *)data, len);
switch (at.check_async_replies(state, response)) { switch (at.check_async_replies(state, response)) {
case Responder::ret::OK: case Responder::ret::OK:
ESP_LOGD(TAG, "GIVE command %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::IDLE; state = status::IDLE;
signal.set(IDLE); signal.set(IDLE);
return; return;
case Responder::ret::FAIL: case Responder::ret::FAIL:
ESP_LOGD(TAG, "GIVE command %d", at.link_id);
xSemaphoreGive(at.s_dte_mutex);
state = status::FAILED; state = status::FAILED;
signal.set(IDLE); signal.set(IDLE);
return; return;
@@ -121,7 +162,7 @@ bool DCE::at_to_sock()
uint64_t data; uint64_t data;
read(data_ready_fd, &data, sizeof(data)); read(data_ready_fd, &data, sizeof(data));
ESP_LOGD(TAG, "select read: modem data available %" PRIu64, data); ESP_LOGD(TAG, "select read: modem data available %" PRIu64, data);
if (!signal.wait(IDLE, 1000)) { if (!signal.wait(IDLE, WAIT_TO_IDLE_TIMEOUT)) {
ESP_LOGE(TAG, "Failed to get idle"); ESP_LOGE(TAG, "Failed to get idle");
close_sock(); close_sock();
return false; return false;
@@ -131,6 +172,10 @@ bool DCE::at_to_sock()
close_sock(); close_sock();
return false; return false;
} }
// Take DTE mutex before issuing receive on this link
ESP_LOGD(TAG, "TAKE RECV %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD(TAG, "TAKEN RECV %d", at.link_id);
state = status::RECEIVING; state = status::RECEIVING;
at.start_receiving(at.get_buf_len()); at.start_receiving(at.get_buf_len());
return true; return true;
@@ -139,7 +184,7 @@ bool DCE::at_to_sock()
bool DCE::sock_to_at() bool DCE::sock_to_at()
{ {
ESP_LOGD(TAG, "socket read: data available"); ESP_LOGD(TAG, "socket read: data available");
if (!signal.wait(IDLE, 1000)) { if (!signal.wait(IDLE, WAIT_TO_IDLE_TIMEOUT)) {
ESP_LOGE(TAG, "Failed to get idle"); ESP_LOGE(TAG, "Failed to get idle");
close_sock(); close_sock();
return false; return false;
@@ -149,6 +194,10 @@ bool DCE::sock_to_at()
close_sock(); close_sock();
return false; return false;
} }
// Take DTE mutex before issuing send on this link
ESP_LOGD(TAG, "TAKE SEND %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD(TAG, "TAKEN SEND %d", at.link_id);
state = status::SENDING; state = status::SENDING;
int len = ::recv(sock, at.get_buf(), at.get_buf_len(), 0); int len = ::recv(sock, at.get_buf(), at.get_buf_len(), 0);
if (len < 0) { if (len < 0) {
@@ -201,7 +250,7 @@ void DCE::start_listening(int port)
} }
int opt = 1; int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
ESP_LOGI(TAG, "Socket created"); ESP_LOGD(TAG, "Socket created");
struct sockaddr_in addr = { }; struct sockaddr_in addr = { };
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_port = htons(port); addr.sin_port = htons(port);
@@ -213,7 +262,7 @@ void DCE::start_listening(int port)
ESP_LOGE(TAG, "Socket unable to bind: errno %d", errno); ESP_LOGE(TAG, "Socket unable to bind: errno %d", errno);
return; return;
} }
ESP_LOGI(TAG, "Socket bound, port %d", 1883); ESP_LOGD(TAG, "Socket bound, port %d", 1883);
err = listen(listen_sock, 1); err = listen(listen_sock, 1);
if (err != 0) { if (err != 0) {
ESP_LOGE(TAG, "Error occurred during listen: errno %d", errno); ESP_LOGE(TAG, "Error occurred during listen: errno %d", errno);
@@ -224,12 +273,12 @@ void DCE::start_listening(int port)
bool DCE::connect(std::string host, int port) bool DCE::connect(std::string host, int port)
{ {
dte->on_read(nullptr); data_ready_fd = eventfd(0, EFD_SUPPORT_ISR);
tcp_close(); assert(data_ready_fd > 0);
dte->on_read([this](uint8_t *data, size_t len) { // Take DTE mutex before starting connect for this link
this->perform_at(data, len); ESP_LOGD(TAG, "TAKE CONNECT %d", at.link_id);
return esp_modem::command_result::TIMEOUT; xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
}); ESP_LOGD(TAG, "TAKEN CONNECT %d", at.link_id);
if (!at.start_connecting(host, port)) { if (!at.start_connecting(host, port)) {
ESP_LOGE(TAG, "Unable to start connecting"); ESP_LOGE(TAG, "Unable to start connecting");
dte->on_read(nullptr); dte->on_read(nullptr);
@@ -241,12 +290,15 @@ bool DCE::connect(std::string host, int port)
bool DCE::init() bool DCE::init()
{ {
if (network_init) {
return true;
}
network_init = true;
Responder::s_dte_mutex = xSemaphoreCreateBinary();
xSemaphoreGive(at.s_dte_mutex);
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT(); esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
esp_vfs_eventfd_register(&config); esp_vfs_eventfd_register(&config);
data_ready_fd = eventfd(0, EFD_SUPPORT_ISR);
assert(data_ready_fd > 0);
dte->on_read(nullptr); dte->on_read(nullptr);
const int retries = 5; const int retries = 5;
int i = 0; int i = 0;
@@ -287,6 +339,10 @@ bool DCE::init()
esp_modem::Task::Delay(5000); esp_modem::Task::Delay(5000);
} }
ESP_LOGI(TAG, "Got IP %s", ip_addr.c_str()); ESP_LOGI(TAG, "Got IP %s", ip_addr.c_str());
dte->on_read([](uint8_t *data, size_t len) {
read_callback(data, len);
return esp_modem::command_result::TIMEOUT;
});
return true; return true;
} }

View File

@@ -34,6 +34,7 @@ public:
sock(s), data_ready_fd(ready_fd), dte(dte_arg) {} sock(s), data_ready_fd(ready_fd), dte(dte_arg) {}
ret process_data(status state, uint8_t *data, size_t len); ret process_data(status state, uint8_t *data, size_t len);
ret check_async_replies(status state, std::string_view &response); ret check_async_replies(status state, std::string_view &response);
ret check_urc(status state, std::string_view &response);
void start_sending(size_t len); void start_sending(size_t len);
void start_receiving(size_t len); void start_receiving(size_t len);
@@ -63,13 +64,19 @@ public:
return total_len; return total_len;
} }
// Unique link identifier used to target multi-connection AT commands
int link_id{s_link_id++};
// Shared mutex guarding DTE access across concurrent DCE instances
static SemaphoreHandle_t s_dte_mutex;
private: private:
static int s_link_id;
static constexpr size_t buffer_size = 512; static constexpr size_t buffer_size = 512;
bool on_read(char *data, size_t len) bool on_read(char *data, size_t len)
{ {
#ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT #ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
::send(sock, data, len, 0); ::send(sock, data, len, 0);
printf("sending %d\n", len);
#else #else
::memcpy(&buffer[actual_read], data, len); ::memcpy(&buffer[actual_read], data, len);
actual_read += len; actual_read += len;
@@ -101,6 +108,8 @@ private:
class DCE : public Module { class DCE : public Module {
using Module::Module; using Module::Module;
public: public:
DCE(std::shared_ptr<esp_modem::DTE> dte_arg, const esp_modem_dce_config *config);
~DCE();
// --- ESP-MODEM command module starts here --- // --- ESP-MODEM command module starts here ---
#include "esp_modem_command_declare_helper.inc" #include "esp_modem_command_declare_helper.inc"
@@ -141,6 +150,10 @@ esp_modem::return_type name(ESP_MODEM_COMMAND_PARAMS(__VA_ARGS__));
return 0; return 0;
} }
at.clear_offsets(); at.clear_offsets();
// Take DTE mutex before issuing receive on this link
ESP_LOGD("TAG", "TAKE RECV %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD("TAG", "TAKEN RECV %d", at.link_id);
state = status::RECEIVING; state = status::RECEIVING;
uint64_t data; uint64_t data;
read(data_ready_fd, &data, sizeof(data)); read(data_ready_fd, &data, sizeof(data));
@@ -163,6 +176,10 @@ esp_modem::return_type name(ESP_MODEM_COMMAND_PARAMS(__VA_ARGS__));
if (!wait_to_idle(timeout_ms)) { if (!wait_to_idle(timeout_ms)) {
return -1; return -1;
} }
// Take DTE mutex before issuing send on this link
ESP_LOGD("TAG", "TAKE SEND %d", at.link_id);
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
ESP_LOGD("TAG", "TAKEN SEND %d", at.link_id);
state = status::SENDING; state = status::SENDING;
memcpy(at.get_buf(), buffer, len_to_send); memcpy(at.get_buf(), buffer, len_to_send);
ESP_LOG_BUFFER_HEXDUMP("dce", at.get_buf(), len, ESP_LOG_VERBOSE); ESP_LOG_BUFFER_HEXDUMP("dce", at.get_buf(), len, ESP_LOG_VERBOSE);
@@ -204,6 +221,14 @@ esp_modem::return_type name(ESP_MODEM_COMMAND_PARAMS(__VA_ARGS__));
} }
return -1; return -1;
} }
static std::vector<DCE*> dce_list;
static bool network_init;
static void read_callback(uint8_t *data, size_t len)
{
for (auto dce : dce_list) {
dce->perform_at(data, len);
}
}
private: private:
esp_modem::SignalGroup signal; esp_modem::SignalGroup signal;

View File

@@ -1,7 +1,11 @@
dependencies: dependencies:
espressif/esp_modem: espressif/esp_modem:
version: "^1.0.1" version: ^1.0.1
override_path: "../../../" override_path: ../../../
espressif/mbedtls_cxx: espressif/mbedtls_cxx:
version: "*" version: '*'
override_path: "../../../../mbedtls_cxx" override_path: ../../../../mbedtls_cxx
espressif/mqtt:
rules:
- if: idf_version >=6.0
version: ^1.0.0

View File

@@ -22,15 +22,26 @@
#include "esp_log.h" #include "esp_log.h"
#include "tcp_transport_mbedtls.h" #include "tcp_transport_mbedtls.h"
#include "tcp_transport_at.h" #include "tcp_transport_at.h"
#include "sdkconfig.h"
#define BROKER_URL "test.mosquitto.org" #define USE_TLS CONFIG_EXAMPLE_USE_TLS
#define BROKER_HOST CONFIG_EXAMPLE_BROKER_HOST
#if USE_TLS
#define BROKER_SCHEME "mqtts"
#define BROKER_PORT 8883
#else
#define BROKER_SCHEME "mqtt"
#define BROKER_PORT 1883 #define BROKER_PORT 1883
#endif
#define BROKER_URL BROKER_SCHEME "://" BROKER_HOST
static const char *TAG = "modem_client"; static const char *TAG = "modem_client";
static EventGroupHandle_t event_group = NULL; static EventGroupHandle_t event_group = NULL;
static const int CONNECT_BIT = BIT0; static const int CONNECT_BIT = BIT0;
static const int GOT_DATA_BIT = BIT2; static const int GOT_DATA_BIT = BIT2;
static const int DCE0_DONE = BIT3;
static const int DCE1_DONE = BIT4;
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{ {
@@ -73,13 +84,15 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
} }
} }
static void perform(void* ctx);
extern "C" void app_main(void) extern "C" void app_main(void)
{ {
/* Init and register system/core components */ /* Init and register system/core components */
ESP_ERROR_CHECK(esp_netif_init()); ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default()); ESP_ERROR_CHECK(esp_event_loop_create_default());
// Default to INFO; individual modules use DEBUG for verbose tracing
esp_log_level_set("*", ESP_LOG_INFO);
event_group = xEventGroupCreate(); event_group = xEventGroupCreate();
@@ -104,30 +117,60 @@ extern "C" void app_main(void)
esp_modem_dce_config_t dce_config = ESP_MODEM_DCE_DEFAULT_CONFIG(CONFIG_EXAMPLE_MODEM_APN); esp_modem_dce_config_t dce_config = ESP_MODEM_DCE_DEFAULT_CONFIG(CONFIG_EXAMPLE_MODEM_APN);
/* create the DCE and initialize network manually (using AT commands) */ /* create the DCE and initialize network manually (using AT commands) */
auto dce = sock_dce::create(&dce_config, std::move(dte)); auto dce = sock_dce::create(&dce_config, dte);
if (!dce->init()) { if (!dce->init()) {
ESP_LOGE(TAG, "Failed to setup network"); ESP_LOGE(TAG, "Failed to setup network");
return; return;
} }
esp_mqtt_client_config_t mqtt_config = {}; xTaskCreate(perform, "perform", 4096, dce.get(), 4, nullptr);
mqtt_config.broker.address.port = BROKER_PORT;
mqtt_config.session.message_retransmit_timeout = 10000;
#ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
mqtt_config.broker.address.uri = "mqtt://127.0.0.1";
dce->start_listening(BROKER_PORT);
#else
mqtt_config.broker.address.uri = "mqtt://" BROKER_URL;
esp_transport_handle_t at = esp_transport_at_init(dce.get());
esp_transport_handle_t ssl = esp_transport_tls_init(at);
mqtt_config.network.transport = ssl; /* create another DCE to serve a new connection */
auto dce1 = sock_dce::create(&dce_config, dte);
if (!dce1->init()) {
ESP_LOGE(TAG, "Failed to setup network");
return;
}
xTaskCreate(perform, "perform", 4096, dce1.get(), 4, nullptr);
xEventGroupWaitBits(event_group, DCE0_DONE | DCE1_DONE, pdFALSE, pdTRUE, portMAX_DELAY);
#ifdef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
// this example does never keeps both DCEs running and in tcp-transport option
// we don't need a task to run so we exit main and keep DCE's "running"
vTaskDelay(portMAX_DELAY);
#endif #endif
}
static void perform(void* ctx)
{
auto dce = static_cast<sock_dce::DCE*>(ctx);
char mqtt_client_id[] = "MQTT_CLIENT_0";
static int counter = 0;
const int id = counter++;
mqtt_client_id[sizeof(mqtt_client_id) - 2] += id; // assumes a different client id per each thread
esp_mqtt_client_config_t mqtt_config = {};
mqtt_config.session.message_retransmit_timeout = 10000;
mqtt_config.credentials.client_id = mqtt_client_id;
#ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
mqtt_config.broker.address.port = BROKER_PORT + id;
mqtt_config.broker.address.uri = BROKER_SCHEME "://127.0.0.1";
dce->start_listening(BROKER_PORT + id);
#else
mqtt_config.broker.address.port = BROKER_PORT;
mqtt_config.broker.address.uri = BROKER_URL;
esp_transport_handle_t at = esp_transport_at_init(dce);
#if USE_TLS
esp_transport_handle_t ssl = esp_transport_tls_init(at);
mqtt_config.network.transport = ssl;
#else
mqtt_config.network.transport = at;
#endif // USE_TLS
#endif // CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
esp_mqtt_client_handle_t mqtt_client = esp_mqtt_client_init(&mqtt_config); esp_mqtt_client_handle_t mqtt_client = esp_mqtt_client_init(&mqtt_config);
esp_mqtt_client_register_event(mqtt_client, static_cast<esp_mqtt_event_id_t>(ESP_EVENT_ANY_ID), mqtt_event_handler, nullptr); esp_mqtt_client_register_event(mqtt_client, static_cast<esp_mqtt_event_id_t>(ESP_EVENT_ANY_ID), mqtt_event_handler, nullptr);
esp_mqtt_client_start(mqtt_client); esp_mqtt_client_start(mqtt_client);
#ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT #ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
if (!dce->connect(BROKER_URL, BROKER_PORT)) { if (!dce->connect(BROKER_HOST, BROKER_PORT)) {
ESP_LOGE(TAG, "Failed to start DCE"); ESP_LOGE(TAG, "Failed to start DCE");
return; return;
} }
@@ -135,18 +178,17 @@ extern "C" void app_main(void)
while (dce->perform_sock()) { while (dce->perform_sock()) {
ESP_LOGV(TAG, "...performing"); ESP_LOGV(TAG, "...performing");
} }
ESP_LOGE(TAG, "Loop exit.. retrying"); ESP_LOGD(TAG, "Loop exit.. retrying");
// handle disconnections errors // handle disconnections errors
if (!dce->init()) { if (!dce->init()) {
ESP_LOGE(TAG, "Failed to reinit network"); ESP_LOGE(TAG, "Failed to reinit network");
return; return;
} }
if (!dce->connect(BROKER_URL, BROKER_PORT)) { if (!dce->connect(BROKER_HOST, BROKER_PORT)) {
ESP_LOGI(TAG, "Network reinitialized, retrying"); ESP_LOGI(TAG, "Network reinitialized, retrying");
} }
} }
#else
vTaskDelay(portMAX_DELAY);
#endif #endif
xEventGroupSetBits(event_group, id ? DCE0_DONE : DCE1_DONE);
vTaskDelete(nullptr);
} }

View File

@@ -109,17 +109,17 @@ void Responder::start_sending(size_t len)
{ {
data_to_send = len; data_to_send = len;
send_stat = 0; send_stat = 0;
send_cmd("AT+QISEND=0," + std::to_string(len) + "\r"); send_cmd("AT+QISEND=" + std::to_string(link_id) + "," + std::to_string(len) + "\r");
} }
void Responder::start_receiving(size_t len) void Responder::start_receiving(size_t len)
{ {
send_cmd("AT+QIRD=0," + std::to_string(len) + "\r"); send_cmd("AT+QIRD=" + std::to_string(link_id) + "," + std::to_string(len) + "\r");
} }
bool Responder::start_connecting(std::string host, int port) bool Responder::start_connecting(std::string host, int port)
{ {
send_cmd(R"(AT+QIOPEN=1,0,"TCP",")" + host + "\"," + std::to_string(port) + "\r"); send_cmd(std::string("AT+QIOPEN=1,") + std::to_string(link_id) + R"(,"TCP",")" + host + "\"," + std::to_string(port) + "\r");
return true; return true;
} }
@@ -130,16 +130,24 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
auto *recv_data = (char *)data; auto *recv_data = (char *)data;
if (data_to_recv == 0) { if (data_to_recv == 0) {
const std::string_view head = "+QIRD: "; const std::string_view head = "+QIRD: ";
again:
const std::string_view recv_data_view = std::string_view(recv_data, len); const std::string_view recv_data_view = std::string_view(recv_data, len);
auto head_pos_found = recv_data_view.find(head); auto head_pos_found = recv_data_view.find(head);
if (head_pos_found == std::string_view::npos) { if (head_pos_found == std::string_view::npos) {
return ret::FAIL; return ret::IN_PROGRESS;
} }
auto *head_pos = recv_data + head_pos_found; auto *head_pos = recv_data + head_pos_found;
auto next_nl = (char *)memchr(head_pos + head.size(), '\n', MIN_MESSAGE); auto next_nl = (char *)memchr(head_pos + head.size(), '\n', MIN_MESSAGE);
if (next_nl == nullptr) { if (next_nl == nullptr) {
if (head_pos + head.size() + 1 < recv_data + len) {
// might be that we misinterpreted the URC +QIRD: <>,<>,<> (notification) with the +QIRD: <> (read data)
// so we try to find the next +QIRD:
recv_data = head_pos + head.size() + 1;
goto again;
}
ESP_LOGD(TAG, "no new line found");
return ret::FAIL; return ret::FAIL;
} }
@@ -151,7 +159,9 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
ESP_LOGD(TAG, "Received: actual len=%d", actual_len); ESP_LOGD(TAG, "Received: actual len=%d", actual_len);
if (actual_len == 0) { if (actual_len == 0) {
ESP_LOGD(TAG, "no data received"); ESP_LOGD(TAG, "no data received");
return ret::FAIL; data_to_recv = 0;
// return OK here, as BG96 would keep unacked data and notifies us with +QIRD: 0
return ret::OK;
} }
if (actual_len > buffer_size) { if (actual_len > buffer_size) {
@@ -182,6 +192,7 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
last_pos = (char *)memchr(recv_data + 1 + actual_len, 'O', MIN_MESSAGE); last_pos = (char *)memchr(recv_data + 1 + actual_len, 'O', MIN_MESSAGE);
if (last_pos == nullptr || last_pos[1] != 'K') { if (last_pos == nullptr || last_pos[1] != 'K') {
data_to_recv = 0; data_to_recv = 0;
ESP_LOGD(TAG, "no OK after data");
return ret::FAIL; return ret::FAIL;
} }
} }
@@ -222,7 +233,7 @@ Responder::ret Responder::send(std::string_view response)
{ {
if (send_stat == 3) { if (send_stat == 3) {
if (response.find("SEND OK") != std::string::npos) { if (response.find("SEND OK") != std::string::npos) {
send_cmd("AT+QISEND=0,0\r"); send_cmd(std::string("AT+QISEND=") + std::to_string(link_id) + ",0\r");
send_stat++; send_stat++;
return ret::IN_PROGRESS; return ret::IN_PROGRESS;
} else if (response.find("SEND FAIL") != std::string::npos) { } else if (response.find("SEND FAIL") != std::string::npos) {
@@ -267,7 +278,7 @@ Responder::ret Responder::send(std::string_view response)
if (ack < total) { if (ack < total) {
ESP_LOGD(TAG, "all sending data are not ack (missing %d bytes acked)", (total - ack)); ESP_LOGD(TAG, "all sending data are not ack (missing %d bytes acked)", (total - ack));
if (total - ack > 64) { if (total - ack > 64) {
ESP_LOGW(TAG, "Need a pause: missing %d bytes acked", (total - ack)); ESP_LOGD(TAG, "Need a pause: missing %d bytes acked", (total - ack));
return ret::NEED_MORE_TIME; return ret::NEED_MORE_TIME;
} }
} }
@@ -284,7 +295,8 @@ Responder::ret Responder::send(std::string_view response)
Responder::ret Responder::connect(std::string_view response) Responder::ret Responder::connect(std::string_view response)
{ {
if (response.find("+QIOPEN: 0,0") != std::string::npos) { std::string open_response = "+QIOPEN: " + std::to_string(link_id) + ",0";
if (response.find(open_response) != std::string::npos) {
ESP_LOGI(TAG, "Connected!"); ESP_LOGI(TAG, "Connected!");
return ret::OK; return ret::OK;
} }
@@ -295,10 +307,9 @@ Responder::ret Responder::connect(std::string_view response)
return Responder::ret::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
Responder::ret Responder::check_async_replies(status state, std::string_view &response) Responder::ret Responder::check_urc(status state, std::string_view &response)
{ {
ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data()); if (response.find(std::string("+QIURC: \"recv\",") + std::to_string(link_id)) != std::string::npos) {
if (response.find("+QIURC: \"recv\",0") != std::string::npos) {
uint64_t data_ready = 1; uint64_t data_ready = 1;
write(data_ready_fd, &data_ready, sizeof(data_ready)); write(data_ready_fd, &data_ready, sizeof(data_ready));
ESP_LOGD(TAG, "Got data on modem!"); ESP_LOGD(TAG, "Got data on modem!");
@@ -309,6 +320,9 @@ Responder::ret Responder::check_async_replies(status state, std::string_view &re
response = response.substr(head_pos + head.size()); response = response.substr(head_pos + head.size());
int next_cr = response.find('\r'); int next_cr = response.find('\r');
if (next_cr != std::string::npos) { if (next_cr != std::string::npos) {
if (next_cr < 2) {
return ret::IN_PROGRESS;
}
response = response.substr(next_cr - 2, next_cr); response = response.substr(next_cr - 2, next_cr);
if (response.find(",0") != std::string::npos) { if (response.find(",0") != std::string::npos) {
ESP_LOGV(TAG, "Receiving done"); ESP_LOGV(TAG, "Receiving done");
@@ -318,12 +332,21 @@ Responder::ret Responder::check_async_replies(status state, std::string_view &re
ESP_LOGD(TAG, "Got data on modem!"); ESP_LOGD(TAG, "Got data on modem!");
} }
} }
} else if (response.find("+QIURC: \"closed\",0") != std::string::npos) { } else if (response.find(std::string("+QIURC: \"closed\",") + std::to_string(link_id)) != std::string::npos) {
ESP_LOGE(TAG, "Connection closed");
return ret::FAIL; return ret::FAIL;
} }
return ret::IN_PROGRESS;
}
Responder::ret Responder::check_async_replies(status state, std::string_view &response)
{
ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data());
if (state == status::SENDING) { if (state == status::SENDING) {
return send(response); return send(response);
} else if (state == status::CONNECTING) { }
if (state == status::CONNECTING) {
return connect(response); return connect(response);
} }
return ret::IN_PROGRESS; return ret::IN_PROGRESS;
@@ -342,7 +365,7 @@ Responder::ret Responder::process_data(status state, uint8_t *data, size_t len)
status Responder::pending() status Responder::pending()
{ {
send_cmd("AT+QISEND=0,0\r"); send_cmd(std::string("AT+QISEND=") + std::to_string(link_id) + ",0\r");
return status::SENDING; return status::SENDING;
} }

View File

@@ -58,12 +58,19 @@ command_result net_open(CommandableIf *t)
} }
ESP_LOGI(TAG, "WiFi connected successfully"); ESP_LOGI(TAG, "WiFi connected successfully");
// Set passive receive mode (1) for better control // Enable multiple connections mode
ret = set_rx_mode(t, 1); ret = dce_commands::generic_command(t, "AT+CIPMUX=1\r\n", "OK", "ERROR", 1000);
if (ret != command_result::OK) { if (ret != command_result::OK) {
ESP_LOGE(TAG, "Failed to set preferred Rx mode"); ESP_LOGE(TAG, "Failed to enable multiple connections mode");
return ret; return ret;
} }
ESP_LOGD(TAG, "Multiple connections mode enabled");
// Set passive receive mode (1) for better control
for (int i = 0; i < 2; i++) {
std::string cmd = "AT+CIPRECVTYPE=" + std::to_string(i) + ",1\r\n";
dce_commands::generic_command(t, cmd, "OK", "ERROR", 1000);
}
return command_result::OK; return command_result::OK;
} }
@@ -78,49 +85,20 @@ command_result net_close(CommandableIf *t)
return command_result::OK; return command_result::OK;
} }
command_result tcp_open(CommandableIf *t, const std::string &host, int port, int timeout)
{
ESP_LOGV(TAG, "%s", __func__);
// Set single connection mode (just in case)
auto ret = dce_commands::generic_command(t, "AT+CIPMUX=0\r\n", "OK", "ERROR", 1000);
if (ret != command_result::OK) {
ESP_LOGW(TAG, "Failed to set single connection mode");
}
// Establish TCP connection
std::string tcp_cmd = "AT+CIPSTART=\"TCP\",\"" + host + "\"," + std::to_string(port) + "\r\n";
ret = dce_commands::generic_command(t, tcp_cmd, "CONNECT", "ERROR", timeout);
if (ret != command_result::OK) {
ESP_LOGE(TAG, "Failed to establish TCP connection to %s:%d", host.c_str(), port);
return ret;
}
ESP_LOGI(TAG, "TCP connection established to %s:%d", host.c_str(), port);
return command_result::OK;
}
command_result tcp_close(CommandableIf *t) command_result tcp_close(CommandableIf *t)
{ {
return command_result::OK;
ESP_LOGV(TAG, "%s", __func__); ESP_LOGV(TAG, "%s", __func__);
return dce_commands::generic_command(t, "AT+CIPCLOSE\r\n", "CLOSED", "ERROR", 5000); // Use link ID 0 for closing connection
const int link_id = 0;
std::string close_cmd = "AT+CIPCLOSE=" + std::to_string(link_id) + "\r\n";
// In multiple connections mode, response format is: <link ID>,CLOSED
std::string expected_response = std::to_string(link_id) + ",CLOSED";
return dce_commands::generic_command(t, close_cmd, expected_response, "ERROR", 5000);
} }
command_result tcp_send(CommandableIf *t, uint8_t *data, size_t len)
{
ESP_LOGV(TAG, "%s", __func__);
// This function is not used in the current implementation
// Data sending is handled by the DCE responder
return command_result::FAIL;
}
command_result tcp_recv(CommandableIf *t, uint8_t *data, size_t len, size_t &out_len)
{
ESP_LOGV(TAG, "%s", __func__);
// This function is not used in the current implementation
// Data receiving is handled by the DCE responder
return command_result::FAIL;
}
command_result get_ip(CommandableIf *t, std::string &ip) command_result get_ip(CommandableIf *t, std::string &ip)
{ {
@@ -150,9 +128,11 @@ command_result get_ip(CommandableIf *t, std::string &ip)
command_result set_rx_mode(CommandableIf *t, int mode) command_result set_rx_mode(CommandableIf *t, int mode)
{ {
ESP_LOGE(TAG, "%s", __func__); ESP_LOGV(TAG, "%s", __func__);
// For multiple connections mode, set receive mode for link ID 0
const int link_id = 0;
// Active mode (0) sends data automatically, Passive mode (1) notifies about data for reading // Active mode (0) sends data automatically, Passive mode (1) notifies about data for reading
std::string cmd = "AT+CIPRECVTYPE=" + std::to_string(mode) + "\r\n"; std::string cmd = "AT+CIPRECVTYPE=" + std::to_string(link_id) + "," + std::to_string(mode) + "\r\n";
return dce_commands::generic_command(t, cmd, "OK", "ERROR", 1000); return dce_commands::generic_command(t, cmd, "OK", "ERROR", 1000);
} }
@@ -164,17 +144,20 @@ void Responder::start_sending(size_t len)
{ {
data_to_send = len; data_to_send = len;
send_stat = 0; send_stat = 0;
send_cmd("AT+CIPSEND=" + std::to_string(len) + "\r\n"); // For multiple connections mode, include link ID
send_cmd("AT+CIPSEND=" + std::to_string(link_id) + "," + std::to_string(len) + "\r\n");
} }
void Responder::start_receiving(size_t len) void Responder::start_receiving(size_t len)
{ {
send_cmd("AT+CIPRECVDATA=" + std::to_string(len) + "\r\n"); // For multiple connections mode, include link ID
send_cmd("AT+CIPRECVDATA=" + std::to_string(link_id) + "," + std::to_string(len) + "\r\n");
} }
bool Responder::start_connecting(std::string host, int port) bool Responder::start_connecting(std::string host, int port)
{ {
std::string cmd = "AT+CIPSTART=\"TCP\",\"" + host + "\"," + std::to_string(port) + "\r\n"; // For multiple connections mode, include link ID
std::string cmd = "AT+CIPSTART=" + std::to_string(link_id) + ",\"TCP\",\"" + host + "\"," + std::to_string(port) + "\r\n";
send_cmd(cmd); send_cmd(cmd);
return true; return true;
} }
@@ -187,16 +170,12 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
if (data_to_recv == 0) { if (data_to_recv == 0) {
const std::string_view head = "+CIPRECVDATA:"; const std::string_view head = "+CIPRECVDATA:";
const std::string_view recv_data_view(recv_data, len);
// Find the response header const auto head_pos_found = recv_data_view.find(head);
auto head_pos = std::search(recv_data, recv_data + len, head.data(), head.data() + head.size(), [](char a, char b) { if (head_pos_found == std::string_view::npos) {
return a == b; return ret::IN_PROGRESS;
});
if (head_pos == recv_data + len) {
return ret::FAIL;
} }
const auto *head_pos = recv_data + head_pos_found;
// Find the end of the length field // Find the end of the length field
auto next_comma = (char *)memchr(head_pos + head.size(), ',', MIN_MESSAGE); auto next_comma = (char *)memchr(head_pos + head.size(), ',', MIN_MESSAGE);
if (next_comma == nullptr) { if (next_comma == nullptr) {
@@ -245,12 +224,25 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
char *ok_pos = nullptr; char *ok_pos = nullptr;
if (actual_len + 1 + 2 /* OK */ <= len) { if (actual_len + 1 + 2 /* OK */ <= len) {
ok_pos = (char *)memchr(recv_data + actual_len + 1, 'O', MIN_MESSAGE); ok_pos = (char *)memchr(recv_data + actual_len + 1, 'O', MIN_MESSAGE);
if (ok_pos == nullptr || ok_pos[1] != 'K') { if (ok_pos == nullptr) { // || ok_pos[1] != 'K') {
data_to_recv = 0; data_to_recv = 0;
ESP_LOGE(TAG, "Missed 'OK' marker");
return ret::OK;
return ret::FAIL;
}
if (ok_pos + 1 < recv_data + len && ok_pos[1] != 'K') {
// we ignore the condition when receiving 'O' as the last character in the last batch,
// don't wait for the 'K' in the next run, assume the data are valid and let higher layers deal with it.
data_to_recv = 0;
ESP_LOGE(TAG, "Missed 'OK' marker2");
return ret::FAIL; return ret::FAIL;
} }
} }
if (ok_pos != nullptr && (char *)data + len - ok_pos - 2 > MIN_MESSAGE) {
// check for async replies after the Recv header
std::string_view response((char *)ok_pos + 2 /* OK */, (char *)data + len - ok_pos);
check_urc(status::RECEIVING, response);
}
// Reset and prepare for next receive // Reset and prepare for next receive
data_to_recv = 0; data_to_recv = 0;
return ret::OK; return ret::OK;
@@ -299,7 +291,8 @@ Responder::ret Responder::send(std::string_view response)
Responder::ret Responder::connect(std::string_view response) Responder::ret Responder::connect(std::string_view response)
{ {
if (response.find("CONNECT") != std::string::npos) { // In multiple connections mode, response format is: <link ID>,CONNECT
if (response.find(",CONNECT") != std::string::npos || response.find("CONNECT") != std::string::npos) {
ESP_LOGI(TAG, "TCP connected!"); ESP_LOGI(TAG, "TCP connected!");
return ret::OK; return ret::OK;
} }
@@ -309,6 +302,17 @@ Responder::ret Responder::connect(std::string_view response)
} }
return ret::IN_PROGRESS; return ret::IN_PROGRESS;
} }
Responder::ret Responder::check_urc(status state, std::string_view &response)
{
// Handle data notifications - in multiple connections mode, format is +IPD,<link ID>,<len>
std::string expected_urc = "+IPD," + std::to_string(link_id);
if (response.find(expected_urc) != std::string::npos) {
uint64_t data_ready = 1;
write(data_ready_fd, &data_ready, sizeof(data_ready));
ESP_LOGD(TAG, "Data available notification");
}
return ret::IN_PROGRESS;
}
Responder::ret Responder::check_async_replies(status state, std::string_view &response) Responder::ret Responder::check_async_replies(status state, std::string_view &response)
{ {
@@ -318,24 +322,17 @@ Responder::ret Responder::check_async_replies(status state, std::string_view &re
if (response.find("WIFI CONNECTED") != std::string::npos) { if (response.find("WIFI CONNECTED") != std::string::npos) {
ESP_LOGI(TAG, "WiFi connected"); ESP_LOGI(TAG, "WiFi connected");
} else if (response.find("WIFI DISCONNECTED") != std::string::npos) { } else if (response.find("WIFI DISCONNECTED") != std::string::npos) {
ESP_LOGW(TAG, "WiFi disconnected"); ESP_LOGD(TAG, "WiFi disconnected");
} }
// Handle TCP status messages // Handle TCP status messages (multiple connections format: <link ID>,CONNECT or <link ID>,CLOSED)
if (response.find("CONNECT") != std::string::npos && state == status::CONNECTING) { if (response.find("CONNECT") != std::string::npos && state == status::CONNECTING) {
return connect(response); return connect(response);
} else if (response.find("CLOSED") != std::string::npos) { } else if (response.find("CLOSED") != std::string::npos) {
ESP_LOGW(TAG, "TCP connection closed"); ESP_LOGD(TAG, "TCP connection closed");
return ret::FAIL; return ret::FAIL;
} }
// Handle data notifications in active mode (if we switch to it later)
if (response.find("+IPD,") != std::string::npos) {
uint64_t data_ready = 1;
write(data_ready_fd, &data_ready, sizeof(data_ready));
ESP_LOGD(TAG, "Data available notification");
}
if (state == status::SENDING) { if (state == status::SENDING) {
return send(response); return send(response);
} else if (state == status::CONNECTING) { } else if (state == status::CONNECTING) {

View File

@@ -1,5 +1,5 @@
/* /*
* SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD * SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
* *
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
@@ -18,13 +18,13 @@ using namespace esp_modem;
command_result net_open(CommandableIf *term) command_result net_open(CommandableIf *term)
{ {
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__);
std::string response; std::string response;
auto ret = dce_commands::generic_get_string(term, "AT+NETOPEN?\r", response, 1000); auto ret = dce_commands::generic_get_string(term, "AT+NETOPEN?\r", response, 1000);
if (ret != command_result::OK) { if (ret != command_result::OK) {
return ret; return ret;
} }
ESP_LOGV(TAG, "%s", response.data() ); ESP_LOGV(TAG, "%s", response.data());
if (response.find("+NETOPEN: 1") != std::string::npos) { if (response.find("+NETOPEN: 1") != std::string::npos) {
ESP_LOGD(TAG, "Already there"); ESP_LOGD(TAG, "Already there");
ret = command_result::OK; ret = command_result::OK;
@@ -42,23 +42,23 @@ command_result net_open(CommandableIf *term)
command_result net_close(CommandableIf *term) command_result net_close(CommandableIf *term)
{ {
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__);
return dce_commands::generic_command(term, "AT+NETCLOSE\r", "+NETCLOSE:", "ERROR", 30000); return dce_commands::generic_command(term, "AT+NETCLOSE\r", "+NETCLOSE:", "ERROR", 30000);
} }
command_result tcp_open(CommandableIf *term, const std::string &host, int port, int timeout) command_result tcp_open(CommandableIf *term, const std::string &host, int port, int timeout)
{ {
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__);
auto ret = dce_commands::generic_command(term, "AT+CIPRXGET=1\r", "OK", "ERROR", 50000); auto ret = dce_commands::generic_command(term, "AT+CIPRXGET=1\r", "OK", "ERROR", 50000);
if (ret != command_result::OK) { if (ret != command_result::OK) {
ESP_LOGE(TAG, "Setting Rx mode failed!"); ESP_LOGE(TAG, "Setting Rx mode failed!");
return ret; return ret;
} }
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__);
std::string ip_open = R"(AT+CIPOPEN=0,"TCP",")" + host + "\"," + std::to_string(port) + "\r"; std::string ip_open = R"(AT+CIPOPEN=0,"TCP",")" + host + "\"," + std::to_string(port) + "\r";
ret = dce_commands::generic_command(term, ip_open, "+CIPOPEN: 0,0", "ERROR", timeout); ret = dce_commands::generic_command(term, ip_open, "+CIPOPEN: 0,0", "ERROR", timeout);
if (ret != command_result::OK) { if (ret != command_result::OK) {
ESP_LOGE(TAG, "%s Failed", __func__ ); ESP_LOGE(TAG, "%s Failed", __func__);
return ret; return ret;
} }
return command_result::OK; return command_result::OK;
@@ -66,13 +66,13 @@ command_result tcp_open(CommandableIf *term, const std::string &host, int port,
command_result tcp_close(CommandableIf *term) command_result tcp_close(CommandableIf *term)
{ {
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__);
return dce_commands::generic_command(term, "AT+CIPCLOSE=0\r", "+CIPCLOSE:", "ERROR", 10000); return dce_commands::generic_command(term, "AT+CIPCLOSE=0\r", "+CIPCLOSE:", "ERROR", 10000);
} }
command_result tcp_send(CommandableIf *term, uint8_t *data, size_t len) command_result tcp_send(CommandableIf *term, uint8_t *data, size_t len)
{ {
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__);
std::string send = "AT+CIPSEND=0," + std::to_string(len) + "\r"; std::string send = "AT+CIPSEND=0," + std::to_string(len) + "\r";
auto ret = term->command(send, [&](uint8_t *data, size_t len) { auto ret = term->command(send, [&](uint8_t *data, size_t len) {
std::string_view response((char *)data, len); std::string_view response((char *)data, len);
@@ -86,10 +86,10 @@ command_result tcp_send(CommandableIf *term, uint8_t *data, size_t len)
return ret; return ret;
} }
ret = command_result::TIMEOUT; ret = command_result::TIMEOUT;
ESP_LOGW(TAG, "Before setting..."); ESP_LOGD(TAG, "Before setting...");
term->on_read([&ret](uint8_t *cmd_data, size_t cmd_len) { term->on_read([&ret](uint8_t *cmd_data, size_t cmd_len) {
std::string_view response((char *)cmd_data, cmd_len); std::string_view response((char *)cmd_data, cmd_len);
ESP_LOGW(TAG, "CIPSEND response %.*s", static_cast<int>(response.size()), response.data()); ESP_LOGD(TAG, "CIPSEND response %.*s", static_cast<int>(response.size()), response.data());
if (response.find("+CIPSEND:") != std::string::npos) { if (response.find("+CIPSEND:") != std::string::npos) {
ret = command_result::OK; ret = command_result::OK;
@@ -98,7 +98,7 @@ command_result tcp_send(CommandableIf *term, uint8_t *data, size_t len)
} }
return ret; return ret;
}); });
ESP_LOGW(TAG, "Before writing..."); ESP_LOGD(TAG, "Before writing...");
auto written = term->write(data, len); auto written = term->write(data, len);
if (written != len) { if (written != len) {
ESP_LOGE(TAG, "written %d (%d)...", written, len); ESP_LOGE(TAG, "written %d (%d)...", written, len);
@@ -107,7 +107,7 @@ command_result tcp_send(CommandableIf *term, uint8_t *data, size_t len)
uint8_t ctrl_z = '\x1A'; uint8_t ctrl_z = '\x1A';
term->write(&ctrl_z, 1); term->write(&ctrl_z, 1);
int count = 0; int count = 0;
while (ret == command_result::TIMEOUT && count++ < 1000 ) { while (ret == command_result::TIMEOUT && count++ < 1000) {
vTaskDelay(pdMS_TO_TICKS(1000)); vTaskDelay(pdMS_TO_TICKS(1000));
} }
term->on_read(nullptr); term->on_read(nullptr);
@@ -116,7 +116,7 @@ command_result tcp_send(CommandableIf *term, uint8_t *data, size_t len)
command_result tcp_recv(CommandableIf *term, uint8_t *data, size_t len, size_t &out_len) command_result tcp_recv(CommandableIf *term, uint8_t *data, size_t len, size_t &out_len)
{ {
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__);
std::string out; std::string out;
auto ret = dce_commands::generic_get_string(term, "AT+CIPRXGET=4,0\r", out); auto ret = dce_commands::generic_get_string(term, "AT+CIPRXGET=4,0\r", out);
if (ret != command_result::OK) { if (ret != command_result::OK) {
@@ -195,17 +195,17 @@ void Responder::start_sending(size_t len)
{ {
data_to_send = len; data_to_send = len;
send_stat = 0; send_stat = 0;
send_cmd("AT+CIPSEND=0," + std::to_string(len) + "\r"); send_cmd("AT+CIPSEND=" + std::to_string(link_id) + "," + std::to_string(len) + "\r");
} }
void Responder::start_receiving(size_t len) void Responder::start_receiving(size_t len)
{ {
send_cmd("AT+CIPRXGET=2,0," + std::to_string(len) + "\r"); send_cmd("AT+CIPRXGET=2," + std::to_string(link_id) + "," + std::to_string(len) + "\r");
} }
bool Responder::start_connecting(std::string host, int port) bool Responder::start_connecting(std::string host, int port)
{ {
send_cmd(R"(AT+CIPOPEN=0,"TCP",")" + host + "\"," + std::to_string(port) + "\r"); send_cmd(std::string("AT+CIPOPEN=") + std::to_string(link_id) + R"(,"TCP",")" + host + "\"," + std::to_string(port) + "\r");
return true; return true;
} }
@@ -215,7 +215,7 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
size_t actual_len = 0; size_t actual_len = 0;
auto *recv_data = (char *)data; auto *recv_data = (char *)data;
if (data_to_recv == 0) { if (data_to_recv == 0) {
static constexpr std::string_view head = "+CIPRXGET: 2,0,"; const std::string head = std::string("+CIPRXGET: 2,") + std::to_string(link_id) + ",";
auto head_pos = std::search(recv_data, recv_data + len, head.begin(), head.end()); auto head_pos = std::search(recv_data, recv_data + len, head.begin(), head.end());
if (head_pos == recv_data + len) { if (head_pos == recv_data + len) {
return ret::FAIL; return ret::FAIL;
@@ -329,7 +329,8 @@ Responder::ret Responder::send(std::string_view response)
Responder::ret Responder::connect(std::string_view response) Responder::ret Responder::connect(std::string_view response)
{ {
if (response.find("+CIPOPEN: 0,0") != std::string::npos) { std::string open_response = "+CIPOPEN: " + std::to_string(link_id) + ",0";
if (response.find(open_response) != std::string::npos) {
ESP_LOGI(TAG, "Connected!"); ESP_LOGI(TAG, "Connected!");
return ret::OK; return ret::OK;
} }
@@ -340,14 +341,22 @@ Responder::ret Responder::connect(std::string_view response)
return Responder::ret::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
Responder::ret Responder::check_async_replies(status state, std::string_view &response) Responder::ret Responder::check_urc(status state, std::string_view &response)
{ {
ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data()); // 1. When <mode> is set to 1 and the 2-4 mode will take effect.
if (response.find("+CIPRXGET: 1") != std::string::npos) { // 2. If AT+CIPRXGET=1, it will report +CIPRXGET: 1,<cid>(multi client) when
const std::string expected = std::string("+CIPRXGET: 1,") + std::to_string(link_id);
if (response.find(expected) != std::string::npos) {
uint64_t data_ready = 1; uint64_t data_ready = 1;
write(data_ready_fd, &data_ready, sizeof(data_ready)); write(data_ready_fd, &data_ready, sizeof(data_ready));
ESP_LOGD(TAG, "Got data on modem!"); ESP_LOGD(TAG, "Got data on modem!");
} }
return ret::IN_PROGRESS;
}
Responder::ret Responder::check_async_replies(status state, std::string_view &response)
{
ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data());
if (state == status::SENDING) { if (state == status::SENDING) {
return send(response); return send(response);
} else if (state == status::CONNECTING) { } else if (state == status::CONNECTING) {

View File

@@ -1,12 +1,16 @@
## IDF Component Manager Manifest File ## IDF Component Manager Manifest File
dependencies: dependencies:
## Required IDF version ## Required IDF version
idf: ">=4.1.0" idf: '>=4.1.0'
espressif/esp_modem: espressif/esp_modem:
version: "^1.0.0" version: ^1.0.0
override_path: "../../../" override_path: ../../../
espressif/esp_modem_usb_dte: espressif/esp_modem_usb_dte:
version: "^1.2.0" version: ^1.2.0
rules: rules:
- if: "idf_version >=4.4" - if: idf_version >=4.4
- if: "target in [esp32s2, esp32s3, esp32p4]" - if: target in [esp32s2, esp32s3, esp32p4]
espressif/mqtt:
rules:
- if: idf_version >=6.0
version: ^1.0.0

View File

@@ -2,3 +2,7 @@ dependencies:
espressif/esp_modem: espressif/esp_modem:
version: "^1.0.1" version: "^1.0.1"
override_path: "../../../" override_path: "../../../"
espressif/mqtt:
rules:
- if: idf_version >=6.0
version: ^1.0.0

View File

@@ -1,10 +1,14 @@
## IDF Component Manager Manifest File ## IDF Component Manager Manifest File
dependencies: dependencies:
espressif/esp_modem: espressif/esp_modem:
version: "*" version: '*'
override_path: ../../.. override_path: ../../..
espressif/mbedtls_cxx: espressif/mbedtls_cxx:
version: "*" version: '*'
override_path: ../../../../mbedtls_cxx override_path: ../../../../mbedtls_cxx
idf: idf:
version: ">=4.1.0" version: '>=4.1.0'
espressif/mqtt:
rules:
- if: idf_version >=6.0
version: ^1.0.0