fix(modem): AT-only example: support MQTT over TLS on BG96

This commit is contained in:
David Cermak
2023-02-12 22:14:39 +01:00
parent 97831c62b1
commit 31d4323f53
5 changed files with 294 additions and 210 deletions

View File

@ -97,7 +97,7 @@ extern "C" void app_main(void)
assert(dte); assert(dte);
/* Configure the DCE */ /* Configure the DCE */
esp_modem_dce_config_t dce_config = ESP_MODEM_DCE_DEFAULT_CONFIG("lpwa.vodafone.com"); esp_modem_dce_config_t dce_config = ESP_MODEM_DCE_DEFAULT_CONFIG(CONFIG_EXAMPLE_MODEM_APN);
/* create the DCE and initialize network manually (using AT commands) */ /* create the DCE and initialize network manually (using AT commands) */
auto dce = sock_dce::create(&dce_config, std::move(dte)); auto dce = sock_dce::create(&dce_config, std::move(dte));
@ -106,10 +106,10 @@ extern "C" void app_main(void)
return; return;
} }
dce->init(1883); dce->init_sock(8883);
esp_mqtt_client_config_t mqtt_config = {}; esp_mqtt_client_config_t mqtt_config = {};
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0) #if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
mqtt_config.broker.address.uri = "mqtt://127.0.0.1"; mqtt_config.broker.address.uri = "mqtts://127.0.0.1";
mqtt_config.session.message_retransmit_timeout = 10000; mqtt_config.session.message_retransmit_timeout = 10000;
#else #else
mqtt_config.uri = "mqtt://127.0.0.1"; mqtt_config.uri = "mqtt://127.0.0.1";
@ -118,13 +118,13 @@ extern "C" void app_main(void)
esp_mqtt_client_handle_t mqtt_client = esp_mqtt_client_init(&mqtt_config); esp_mqtt_client_handle_t mqtt_client = esp_mqtt_client_init(&mqtt_config);
esp_mqtt_client_register_event(mqtt_client, static_cast<esp_mqtt_event_id_t>(ESP_EVENT_ANY_ID), mqtt_event_handler, NULL); esp_mqtt_client_register_event(mqtt_client, static_cast<esp_mqtt_event_id_t>(ESP_EVENT_ANY_ID), mqtt_event_handler, NULL);
esp_mqtt_client_start(mqtt_client); esp_mqtt_client_start(mqtt_client);
if (!dce->start(BROKER_URL, 1883)) { if (!dce->start(BROKER_URL, 8883)) {
ESP_LOGE(TAG, "Failed to start DCE"); ESP_LOGE(TAG, "Failed to start DCE");
return; return;
} }
while (1) { while (1) {
while (dce->perform()) { while (dce->perform_sock()) {
ESP_LOGD(TAG, "...performing"); ESP_LOGV(TAG, "...performing");
} }
ESP_LOGE(TAG, "Loop exit.. retrying"); ESP_LOGE(TAG, "Loop exit.. retrying");
// handle disconnections errors // handle disconnections errors

View File

@ -28,7 +28,7 @@ command_result net_open(CommandableIf *t)
if (out.find("+QISTATE: 0") != std::string::npos) { if (out.find("+QISTATE: 0") != std::string::npos) {
ESP_LOGV(TAG, "%s", out.data() ); ESP_LOGV(TAG, "%s", out.data() );
ESP_LOGD(TAG, "Already there"); ESP_LOGD(TAG, "Already there");
return command_result::OK; return command_result::FAIL;
} else if (out.empty()) { } else if (out.empty()) {
return dce_commands::generic_command(t, "AT+QIACT=1\r", "OK", "ERROR", 150000); return dce_commands::generic_command(t, "AT+QIACT=1\r", "OK", "ERROR", 150000);
} }
@ -38,6 +38,8 @@ command_result net_open(CommandableIf *t)
command_result net_close(CommandableIf *t) command_result net_close(CommandableIf *t)
{ {
ESP_LOGV(TAG, "%s", __func__ ); ESP_LOGV(TAG, "%s", __func__ );
dce_commands::generic_command(t, "AT+QICLOSE=0\r", "OK", "ERROR", 10000);
esp_modem::Task::Delay(1000);
return dce_commands::generic_command(t, "AT+QIDEACT=1\r", "OK", "ERROR", 40000); return dce_commands::generic_command(t, "AT+QIDEACT=1\r", "OK", "ERROR", 40000);
} }
@ -104,103 +106,131 @@ command_result get_ip(CommandableIf *t, std::string &ip)
namespace sock_dce { namespace sock_dce {
void Listener::start_sending(size_t len) void Responder::start_sending(size_t len)
{ {
data_to_send = len; data_to_send = len;
send_stat = 0; send_stat = 0;
send_cmd("AT+QISEND=0," + std::to_string(len) + "\r"); send_cmd("AT+QISEND=0," + std::to_string(len) + "\r");
} }
void Listener::start_receiving(size_t len) void Responder::start_receiving(size_t len)
{ {
send_cmd("AT+QIRD=0," + std::to_string(size) + "\r"); send_cmd("AT+QIRD=0," + std::to_string(len) + "\r");
} }
bool Listener::start_connecting(std::string host, int port) bool Responder::start_connecting(std::string host, int port)
{ {
send_cmd(R"(AT+QIOPEN=1,0,"TCP",")" + host + "\"," + std::to_string(port) + "\r"); send_cmd(R"(AT+QIOPEN=1,0,"TCP",")" + host + "\"," + std::to_string(port) + "\r");
return true; return true;
} }
Listener::state Listener::recv(uint8_t *data, size_t len) Responder::ret Responder::recv(uint8_t *data, size_t len)
{ {
const size_t MIN_MESSAGE = 6; const int MIN_MESSAGE = 6;
const std::string_view head = "+QIRD: "; size_t actual_len = 0;
auto head_pos = (char *)std::search(data, data + len, head.begin(), head.end()); auto *recv_data = (char *)data;
if (head_pos == nullptr) { if (data_to_recv == 0) {
return state::FAIL; const std::string_view head = "+QIRD: ";
} auto head_pos = std::search(recv_data, recv_data + len, head.begin(), head.end());
if (head_pos == nullptr) {
return ret::FAIL;
}
auto next_nl = (char *)memchr(head_pos + head.size(), '\n', MIN_MESSAGE); auto next_nl = (char *)memchr(head_pos + head.size(), '\n', MIN_MESSAGE);
if (next_nl == nullptr) { if (next_nl == nullptr) {
return state::FAIL; return ret::FAIL;
} }
size_t actual_len; if (std::from_chars(head_pos + head.size(), next_nl, actual_len).ec == std::errc::invalid_argument) {
if (std::from_chars(head_pos + head.size(), next_nl, actual_len).ec == std::errc::invalid_argument) { ESP_LOGE(TAG, "cannot convert");
ESP_LOGE(TAG, "cannot convert"); return ret::FAIL;
return state::FAIL; }
}
ESP_LOGD(TAG, "Received: actual len=%d", actual_len); ESP_LOGD(TAG, "Received: actual len=%d", actual_len);
if (actual_len == 0) { if (actual_len == 0) {
ESP_LOGD(TAG, "no data received"); ESP_LOGD(TAG, "no data received");
return state::FAIL; return ret::FAIL;
} }
// TODO improve : compare *actual_len* & data size (to be sure that received data is equal to *actual_len*) if (actual_len > buffer_size) {
if (actual_len > size) { ESP_LOGE(TAG, "TOO BIG");
ESP_LOGE(TAG, "TOO BIG"); return ret::FAIL;
return state::FAIL; }
recv_data = next_nl + 1;
auto first_data_len = len - (recv_data - (char *)data) /* minus size of the command marker */;
if (actual_len > first_data_len) {
::send(sock, recv_data, first_data_len, 0);
data_to_recv = actual_len - first_data_len;
return ret::NEED_MORE_DATA;
}
::send(sock, recv_data, actual_len, 0);
} else if (data_to_recv > len) { // continue sending
::send(sock, recv_data, len, 0);
data_to_recv -= len;
return ret::NEED_MORE_DATA;
} else if (data_to_recv <= len) { // last read -> looking for "OK" marker
::send(sock, recv_data, data_to_recv, 0);
actual_len = data_to_recv;
} }
::send(sock, next_nl + 1, actual_len, 0);
// "OK" after the data // "OK" after the data
auto last_pos = (char *)memchr(next_nl + 1 + actual_len, 'O', MIN_MESSAGE); char *last_pos = nullptr;
if (last_pos == nullptr || last_pos[1] != 'K') { if (actual_len + 1 + 2 /* OK */ > len) {
return state::FAIL; last_pos = (char *)memchr(recv_data + 1 + actual_len, 'O', MIN_MESSAGE);
if (last_pos == nullptr || last_pos[1] != 'K') {
data_to_recv = 0;
return ret::FAIL;
}
} }
if ((char *)data + len - last_pos > MIN_MESSAGE) { if (last_pos != nullptr && (char *)data + len - last_pos - 2 > MIN_MESSAGE) {
// check for async replies after the Recv header // check for async replies after the Recv header
std::string_view response((char *)last_pos + 2 /* OK */, (char *)data + len - last_pos); std::string_view response((char *)last_pos + 2 /* OK */, (char *)data + len - last_pos);
check_async_replies(response); check_async_replies(status::RECEIVING, response);
} }
return state::OK; // check if some other data?
start_receiving(0);
data_to_recv = 0;
return ret::OK;
} }
Listener::state Listener::send(uint8_t *data, size_t len) Responder::ret Responder::send(uint8_t *data, size_t len)
{ {
if (send_stat == 0) { if (send_stat < 3) {
if (memchr(data, '>', len) == NULL) { if (memchr(data, '>', len) == NULL) {
if (send_stat++ < 2) {
return Responder::ret::NEED_MORE_DATA;
}
ESP_LOGE(TAG, "Missed >"); ESP_LOGE(TAG, "Missed >");
return state::FAIL; return ret::FAIL;
} }
auto written = dte->write(&buffer[0], data_to_send); auto written = dte->write(&buffer[0], data_to_send);
if (written != data_to_send) { if (written != data_to_send) {
ESP_LOGE(TAG, "written %d (%d)...", written, len); ESP_LOGE(TAG, "written %d (%d)...", written, len);
return state::FAIL; return ret::FAIL;
} }
data_to_send = 0; data_to_send = 0;
send_stat++; send_stat = 3;
} }
return Listener::state::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
Listener::state Listener::send(std::string_view response) Responder::ret Responder::send(std::string_view response)
{ {
if (send_stat == 1) { if (send_stat == 3) {
if (response.find("SEND OK") != std::string::npos) { if (response.find("SEND OK") != std::string::npos) {
send_cmd("AT+QISEND=0,0\r"); send_cmd("AT+QISEND=0,0\r");
send_stat++; send_stat++;
return ret::IN_PROGRESS;
} else if (response.find("SEND FAIL") != std::string::npos) { } else if (response.find("SEND FAIL") != std::string::npos) {
ESP_LOGE(TAG, "Sending buffer full"); ESP_LOGE(TAG, "Sending buffer full");
return state::FAIL; return ret::FAIL;
} else if (response.find("ERROR") != std::string::npos) { } else if (response.find("ERROR") != std::string::npos) {
ESP_LOGE(TAG, "Failed to sent"); ESP_LOGE(TAG, "Failed to sent");
return state::FAIL; return ret::FAIL;
} }
} else if (send_stat == 2) { } else if (send_stat == 4) {
constexpr std::string_view head = "+QISEND: "; constexpr std::string_view head = "+QISEND: ";
if (response.find(head) != std::string::npos) { if (response.find(head) != std::string::npos) {
// Parsing +QISEND: <total_send_length>,<ackedbytes>,<unackedbytes> // Parsing +QISEND: <total_send_length>,<ackedbytes>,<unackedbytes>
@ -215,7 +245,7 @@ Listener::state Listener::send(std::string_view response)
size_t value; size_t value;
if (std::from_chars(response.data(), next_comma, value).ec == std::errc::invalid_argument) { if (std::from_chars(response.data(), next_comma, value).ec == std::errc::invalid_argument) {
ESP_LOGE(TAG, "cannot convert"); ESP_LOGE(TAG, "cannot convert");
return state::FAIL; return ret::FAIL;
} }
switch (property++) { switch (property++) {
@ -224,49 +254,94 @@ Listener::state Listener::send(std::string_view response)
case 1: ack = value; case 1: ack = value;
break; break;
default: default:
return state::FAIL; return ret::FAIL;
} }
response = response.substr(pos + 1); response = response.substr(pos + 1);
} }
if (std::from_chars(response.data(), response.data() + pos, unack).ec == std::errc::invalid_argument) { if (std::from_chars(response.data(), response.data() + pos, unack).ec == std::errc::invalid_argument) {
return state::FAIL; return ret::FAIL;
} }
// TODO improve : need check *total* & *ack* values, or loop (every 5 sec) with 90s or 120s timeout
if (ack < total) { if (ack < total) {
ESP_LOGE(TAG, "all sending data are not ack (missing %d bytes acked)", (total - ack)); ESP_LOGD(TAG, "all sending data are not ack (missing %d bytes acked)", (total - ack));
if (total - ack > 64) {
ESP_LOGW(TAG, "Need a pause: missing %d bytes acked", (total - ack));
return ret::NEED_MORE_TIME;
}
} }
return state::OK; send_stat = 0;
return ret::OK;
} else if (response.find("ERROR") != std::string::npos) { } else if (response.find("ERROR") != std::string::npos) {
ESP_LOGE(TAG, "Failed to check sending"); ESP_LOGE(TAG, "Failed to check sending");
return state::FAIL; return ret::FAIL;
} }
} }
return Listener::state::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
Listener::state Listener::connect(std::string_view response) Responder::ret Responder::connect(std::string_view response)
{ {
if (response.find("+QIOPEN: 0,0") != std::string::npos) { if (response.find("+QIOPEN: 0,0") != std::string::npos) {
ESP_LOGI(TAG, "Connected!"); ESP_LOGI(TAG, "Connected!");
return state::OK; return ret::OK;
} }
if (response.find("ERROR") != std::string::npos) { if (response.find("ERROR") != std::string::npos) {
ESP_LOGE(TAG, "Failed to open"); ESP_LOGE(TAG, "Failed to open");
return state::FAIL; return ret::FAIL;
} }
return Listener::state::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
void Listener::check_async_replies(std::string_view &response) const Responder::ret Responder::check_async_replies(status state, std::string_view &response)
{ {
ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data()); ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data());
if (response.find("+QIURC: \"recv\",0") != std::string::npos) { if (response.find("+QIURC: \"recv\",0") != std::string::npos) {
uint64_t data_ready = 1; uint64_t data_ready = 1;
write(data_ready_fd, &data_ready, sizeof(data_ready)); write(data_ready_fd, &data_ready, sizeof(data_ready));
ESP_LOGD(TAG, "Got data on modem!"); ESP_LOGD(TAG, "Got data on modem!");
} else if (response.find("+QIRD: ") != std::string::npos) {
static constexpr std::string_view head = "+QIRD: ";
size_t head_pos = response.find(head);
// Parsing +QIURC: <total_receive_length>,<have_read_length>,<unread_length>
response = response.substr(head_pos + head.size());
int next_cr = response.find('\r');
if (next_cr != std::string::npos) {
response = response.substr(next_cr - 2, next_cr);
if (response.find(",0") != std::string::npos) {
ESP_LOGV(TAG, "Receiving done");
} else {
uint64_t data_ready = 1;
write(data_ready_fd, &data_ready, sizeof(data_ready));
ESP_LOGD(TAG, "Got data on modem!");
}
}
} else if (response.find("+QIURC: \"closed\",0") != std::string::npos) {
return ret::FAIL;
} }
if (state == status::SENDING) {
return send(response);
} else if (state == status::CONNECTING) {
return connect(response);
}
return ret::IN_PROGRESS;
}
Responder::ret Responder::process_data(status state, uint8_t *data, size_t len)
{
if (state == status::SENDING) {
return send(data, len);
}
if (state == status::RECEIVING) {
return recv(data, len);
}
return Responder::ret::IN_PROGRESS;
}
status Responder::pending()
{
send_cmd("AT+QISEND=0,0\r");
return status::SENDING;
} }

View File

@ -6,14 +6,15 @@
#include <charconv> #include <charconv>
#include <cstring> #include <cstring>
#include <sys/socket.h>
#include "sock_commands.hpp" #include "sock_commands.hpp"
#include "cxx_include/esp_modem_command_library_utils.hpp" #include "cxx_include/esp_modem_command_library_utils.hpp"
#include "sock_dce.hpp" #include "sock_dce.hpp"
namespace sock_commands {
static const char *TAG = "sock_commands"; static const char *TAG = "sock_commands";
namespace sock_commands {
using namespace esp_modem; using namespace esp_modem;
command_result net_open(CommandableIf *term) command_result net_open(CommandableIf *term)
@ -27,12 +28,17 @@ command_result net_open(CommandableIf *term)
ESP_LOGV(TAG, "%s", response.data() ); ESP_LOGV(TAG, "%s", response.data() );
if (response.find("+NETOPEN: 1") != std::string::npos) { if (response.find("+NETOPEN: 1") != std::string::npos) {
ESP_LOGD(TAG, "Already there"); ESP_LOGD(TAG, "Already there");
return command_result::OK; ret = command_result::OK;
} else if (response.find("+NETOPEN: 0") != std::string::npos) { } else if (response.find("+NETOPEN: 0") != std::string::npos) {
ESP_LOGD(TAG, "Need to setup"); ESP_LOGD(TAG, "Need to setup");
return dce_commands::generic_command(term, "AT+NETOPEN\r", "+NETOPEN: 1", "+NETOPEN: 0", 10000); ret = dce_commands::generic_command(term, "AT+NETOPEN\r", "+NETOPEN: 1", "+NETOPEN: 0", 10000);
} else {
return command_result::FAIL;
} }
return command_result::FAIL; if (ret != command_result::OK) {
return ret;
}
return dce_commands::generic_command(term, "AT+CIPRXGET=1\r", "OK", "ERROR", 5000);
} }
command_result net_close(CommandableIf *term) command_result net_close(CommandableIf *term)
@ -186,28 +192,25 @@ command_result set_rx_mode(CommandableIf *term, int mode)
namespace sock_dce { namespace sock_dce {
void Listener::start_sending(size_t len) void Responder::start_sending(size_t len)
{ {
data_to_send = len; data_to_send = len;
send_stat = 0; send_stat = 0;
send_cmd("AT+CIPSEND=0," + std::to_string(len) + "\r"); send_cmd("AT+CIPSEND=0," + std::to_string(len) + "\r");
} }
void Listener::start_receiving(size_t len) void Responder::start_receiving(size_t len)
{ {
send_cmd("AT+CIPRXGET=2,0," + std::to_string(size) + "\r"); send_cmd("AT+CIPRXGET=2,0," + std::to_string(len) + "\r");
} }
bool Listener::start_connecting(std::string host, int port) bool Responder::start_connecting(std::string host, int port)
{ {
if (esp_modem::dce_commands::generic_command(dte.get(), "AT+CIPRXGET=1\r", "OK", "ERROR", 5000) != esp_modem::command_result::OK) {
return false;
}
send_cmd(R"(AT+CIPOPEN=0,"TCP",")" + host + "\"," + std::to_string(port) + "\r"); send_cmd(R"(AT+CIPOPEN=0,"TCP",")" + host + "\"," + std::to_string(port) + "\r");
return true; return true;
} }
Listener::state Listener::recv(uint8_t *data, size_t len) Responder::ret Responder::recv(uint8_t *data, size_t len)
{ {
const int MIN_MESSAGE = 6; const int MIN_MESSAGE = 6;
size_t actual_len = 0; size_t actual_len = 0;
@ -216,40 +219,37 @@ Listener::state Listener::recv(uint8_t *data, size_t len)
static constexpr std::string_view head = "+CIPRXGET: 2,0,"; static constexpr std::string_view head = "+CIPRXGET: 2,0,";
auto head_pos = std::search(recv_data, recv_data + len, head.begin(), head.end()); auto head_pos = std::search(recv_data, recv_data + len, head.begin(), head.end());
if (head_pos == nullptr) { if (head_pos == nullptr) {
return state::FAIL; return ret::FAIL;
} }
// state = status::RECEIVING_FAILED;
// signal.set(IDLE);
// return;
// }
if (head_pos - (char *)data > MIN_MESSAGE) { if (head_pos - (char *)data > MIN_MESSAGE) {
// check for async replies before the Recv header // check for async replies before the Recv header
std::string_view response((char *)data, head_pos - (char *)data); std::string_view response((char *)data, head_pos - (char *)data);
check_async_replies(response); check_async_replies(status::RECEIVING, response);
} }
auto next_comma = (char *)memchr(head_pos + head.size(), ',', MIN_MESSAGE); auto next_comma = (char *)memchr(head_pos + head.size(), ',', MIN_MESSAGE);
if (next_comma == nullptr) { if (next_comma == nullptr) {
return state::FAIL; return ret::FAIL;
} }
if (std::from_chars(head_pos + head.size(), next_comma, actual_len).ec == std::errc::invalid_argument) { if (std::from_chars(head_pos + head.size(), next_comma, actual_len).ec == std::errc::invalid_argument) {
ESP_LOGE(TAG, "cannot convert"); ESP_LOGE(TAG, "cannot convert");
return state::FAIL; return ret::FAIL;
} }
auto next_nl = (char *)memchr(next_comma, '\n', 8 /* total_len size (~4) + markers */); auto next_nl = (char *)memchr(next_comma, '\n', 8 /* total_len size (~4) + markers */);
if (next_nl == nullptr) { if (next_nl == nullptr) {
ESP_LOGE(TAG, "not found"); ESP_LOGE(TAG, "not found");
return state::FAIL; return ret::FAIL;
} }
if (actual_len > size) { if (actual_len > buffer_size) {
ESP_LOGE(TAG, "TOO BIG"); ESP_LOGE(TAG, "TOO BIG");
return state::FAIL; return ret::FAIL;
} }
size_t total_len = 0; size_t total_len = 0;
if (std::from_chars(next_comma + 1, next_nl - 1, total_len).ec == std::errc::invalid_argument) { if (std::from_chars(next_comma + 1, next_nl - 1, total_len).ec == std::errc::invalid_argument) {
ESP_LOGE(TAG, "cannot convert"); ESP_LOGE(TAG, "cannot convert");
return state::FAIL; return ret::FAIL;
} }
read_again = (total_len > 0); read_again = (total_len > 0);
recv_data = next_nl + 1; recv_data = next_nl + 1;
@ -257,13 +257,13 @@ Listener::state Listener::recv(uint8_t *data, size_t len)
if (actual_len > first_data_len) { if (actual_len > first_data_len) {
::send(sock, recv_data, first_data_len, 0); ::send(sock, recv_data, first_data_len, 0);
data_to_recv = actual_len - first_data_len; data_to_recv = actual_len - first_data_len;
return state::IN_PROGRESS; return ret::NEED_MORE_DATA;
} }
::send(sock, recv_data, actual_len, 0); ::send(sock, recv_data, actual_len, 0);
} else if (data_to_recv > len) { // continue sending } else if (data_to_recv > len) { // continue sending
::send(sock, recv_data, len, 0); ::send(sock, recv_data, len, 0);
data_to_recv -= len; data_to_recv -= len;
return state::IN_PROGRESS; return ret::NEED_MORE_DATA;
} else if (data_to_recv <= len) { // last read -> looking for "OK" marker } else if (data_to_recv <= len) { // last read -> looking for "OK" marker
::send(sock, recv_data, data_to_recv, 0); ::send(sock, recv_data, data_to_recv, 0);
actual_len = data_to_recv; actual_len = data_to_recv;
@ -275,73 +275,73 @@ Listener::state Listener::recv(uint8_t *data, size_t len)
last_pos = (char *)memchr(recv_data + 1 + actual_len, 'O', MIN_MESSAGE); last_pos = (char *)memchr(recv_data + 1 + actual_len, 'O', MIN_MESSAGE);
if (last_pos == nullptr || last_pos[1] != 'K') { if (last_pos == nullptr || last_pos[1] != 'K') {
data_to_recv = 0; data_to_recv = 0;
return state::FAIL; return ret::FAIL;
} }
} }
if (last_pos != nullptr && (char *)data + len - last_pos > MIN_MESSAGE) { if (last_pos != nullptr && (char *)data + len - last_pos - 2 > MIN_MESSAGE) {
// check for async replies after the Recv header // check for async replies after the Recv header
std::string_view response((char *)last_pos + 2 /* OK */, (char *)data + len - last_pos - 2); std::string_view response((char *)last_pos + 2 /* OK */, (char *)data + len - last_pos - 2);
check_async_replies(response); check_async_replies(status::RECEIVING, response);
} }
data_to_recv = 0; data_to_recv = 0;
if (read_again) { if (read_again) {
uint64_t data_ready = 1; uint64_t data_ready = 1;
write(data_ready_fd, &data_ready, sizeof(data_ready)); write(data_ready_fd, &data_ready, sizeof(data_ready));
} }
return state::OK; return ret::OK;
} }
Listener::state Listener::send(uint8_t *data, size_t len) Responder::ret Responder::send(uint8_t *data, size_t len)
{ {
if (send_stat == 0) { if (send_stat == 0) {
if (memchr(data, '>', len) == NULL) { if (memchr(data, '>', len) == NULL) {
ESP_LOGE(TAG, "Missed >"); ESP_LOGE(TAG, "Missed >");
return state::FAIL; return ret::FAIL;
} }
auto written = dte->write(&buffer[0], data_to_send); auto written = dte->write(&buffer[0], data_to_send);
if (written != data_to_send) { if (written != data_to_send) {
ESP_LOGE(TAG, "written %d (%d)...", written, len); ESP_LOGE(TAG, "written %d (%d)...", written, len);
return state::FAIL; return ret::FAIL;
} }
data_to_send = 0; data_to_send = 0;
uint8_t ctrl_z = '\x1A'; uint8_t ctrl_z = '\x1A';
dte->write(&ctrl_z, 1); dte->write(&ctrl_z, 1);
send_stat++; send_stat++;
return state::IN_PROGRESS; return ret::IN_PROGRESS;
} }
return Listener::state::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
Listener::state Listener::send(std::string_view response) Responder::ret Responder::send(std::string_view response)
{ {
if (send_stat == 1) { if (send_stat == 1) {
if (response.find("+CIPSEND:") != std::string::npos) { if (response.find("+CIPSEND:") != std::string::npos) {
send_stat = 0; send_stat = 0;
return state::OK; return ret::OK;
} }
if (response.find("ERROR") != std::string::npos) { if (response.find("ERROR") != std::string::npos) {
ESP_LOGE(TAG, "Failed to sent"); ESP_LOGE(TAG, "Failed to sent");
send_stat = 0; send_stat = 0;
return state::FAIL; return ret::FAIL;
} }
} }
return Listener::state::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
Listener::state Listener::connect(std::string_view response) Responder::ret Responder::connect(std::string_view response)
{ {
if (response.find("+CIPOPEN: 0,0") != std::string::npos) { if (response.find("+CIPOPEN: 0,0") != std::string::npos) {
ESP_LOGI(TAG, "Connected!"); ESP_LOGI(TAG, "Connected!");
return state::OK; return ret::OK;
} }
if (response.find("ERROR") != std::string::npos) { if (response.find("ERROR") != std::string::npos) {
ESP_LOGE(TAG, "Failed to open"); ESP_LOGE(TAG, "Failed to open");
return state::FAIL; return ret::FAIL;
} }
return Listener::state::IN_PROGRESS; return Responder::ret::IN_PROGRESS;
} }
void Listener::check_async_replies(std::string_view &response) const Responder::ret Responder::check_async_replies(status state, std::string_view &response)
{ {
ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data()); ESP_LOGD(TAG, "response %.*s", static_cast<int>(response.size()), response.data());
if (response.find("+CIPRXGET: 1") != std::string::npos) { if (response.find("+CIPRXGET: 1") != std::string::npos) {
@ -349,8 +349,30 @@ void Listener::check_async_replies(std::string_view &response) const
write(data_ready_fd, &data_ready, sizeof(data_ready)); write(data_ready_fd, &data_ready, sizeof(data_ready));
ESP_LOGD(TAG, "Got data on modem!"); ESP_LOGD(TAG, "Got data on modem!");
} }
if (state == status::SENDING) {
return send(response);
} else if (state == status::CONNECTING) {
return connect(response);
}
return ret::IN_PROGRESS;
} }
Responder::ret Responder::process_data(status state, uint8_t *data, size_t len)
{
if (state == status::SENDING) {
return send(data, len);
}
if (state == status::RECEIVING) {
return recv(data, len);
}
return Responder::ret::IN_PROGRESS;
}
status Responder::pending()
{
return status::PENDING;
}
} // sock_dce } // sock_dce

View File

@ -16,7 +16,7 @@ namespace sock_dce {
constexpr auto const *TAG = "sock_dce"; constexpr auto const *TAG = "sock_dce";
bool DCE::perform() bool DCE::perform_sock()
{ {
if (listen_sock == -1) { if (listen_sock == -1) {
ESP_LOGE(TAG, "Listening socket not ready"); ESP_LOGE(TAG, "Listening socket not ready");
@ -32,13 +32,18 @@ bool DCE::perform()
.tv_sec = 0, .tv_sec = 0,
.tv_usec = 500000, .tv_usec = 500000,
}; };
if (state == status::PENDING) {
vTaskDelay(pdMS_TO_TICKS(500));
state = at.pending();
return true;
}
fd_set fdset; fd_set fdset;
FD_ZERO(&fdset); FD_ZERO(&fdset);
FD_SET(sock, &fdset); FD_SET(sock, &fdset);
FD_SET(data_ready_fd, &fdset); FD_SET(data_ready_fd, &fdset);
int s = select(std::max(sock, data_ready_fd) + 1, &fdset, nullptr, nullptr, &tv); int s = select(std::max(sock, data_ready_fd) + 1, &fdset, nullptr, nullptr, &tv);
if (s == 0) { if (s == 0) {
ESP_LOGD(TAG, "perform select timeout..."); ESP_LOGV(TAG, "perform select timeout...");
return true; return true;
} else if (s < 0) { } else if (s < 0) {
ESP_LOGE(TAG, "select error %d", errno); ESP_LOGE(TAG, "select error %d", errno);
@ -54,68 +59,42 @@ bool DCE::perform()
return true; return true;
} }
void DCE::forwarding(uint8_t *data, size_t len) void DCE::perform_at(uint8_t *data, size_t len)
{ {
ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_DEBUG); ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_VERBOSE);
if (state == status::SENDING) { switch (at.process_data(state, data, len)) {
switch (at.send(data, len)) { case Responder::ret::OK:
case Listener::state::OK: state = status::IDLE;
state = status::IDLE; signal.set(IDLE);
signal.set(IDLE); return;
return; case Responder::ret::FAIL:
case Listener::state::FAIL: state = status::FAILED;
state = status::SENDING_FAILED; signal.set(IDLE);
signal.set(IDLE); return;
return; case Responder::ret::NEED_MORE_DATA:
case Listener::state::IN_PROGRESS: return;
break; case Responder::ret::IN_PROGRESS:
// return; break;
} case Responder::ret::NEED_MORE_TIME:
} else if (state == status::RECEIVING) { state = status::PENDING;
switch (at.recv(data, len)) { return;
case Listener::state::OK:
state = status::IDLE;
signal.set(IDLE);
return;
case Listener::state::FAIL:
state = status::RECEIVING_FAILED;
signal.set(IDLE);
return;
case Listener::state::IN_PROGRESS:
// break;
return;
}
} }
std::string_view response((char *)data, len); std::string_view response((char *)data, len);
at.check_async_replies(response); switch (at.check_async_replies(state, response)) {
// Notification about Data Ready could come any time case Responder::ret::OK:
if (state == status::SENDING) { state = status::IDLE;
switch (at.send(response)) { signal.set(IDLE);
case Listener::state::OK: return;
state = status::IDLE; case Responder::ret::FAIL:
signal.set(IDLE); state = status::FAILED;
return; signal.set(IDLE);
case Listener::state::FAIL: return;
state = status::SENDING_FAILED; case Responder::ret::NEED_MORE_TIME:
signal.set(IDLE); state = status::PENDING;
return; return;
case Listener::state::IN_PROGRESS: case Responder::ret::NEED_MORE_DATA:
break; case Responder::ret::IN_PROGRESS:
} break;
}
if (state == status::CONNECTING) {
switch (at.connect(response)) {
case Listener::state::OK:
state = status::IDLE;
signal.set(IDLE);
return;
case Listener::state::FAIL:
state = status::CONNECTION_FAILED;
signal.set(IDLE);
return;
case Listener::state::IN_PROGRESS:
break;
}
} }
} }
@ -125,6 +104,7 @@ void DCE::close_sock()
close(sock); close(sock);
sock = -1; sock = -1;
} }
dte->on_read(nullptr);
const int retries = 5; const int retries = 5;
int i = 0; int i = 0;
while (net_close() != esp_modem::command_result::OK) { while (net_close() != esp_modem::command_result::OK) {
@ -152,7 +132,7 @@ bool DCE::at_to_sock()
return false; return false;
} }
state = status::RECEIVING; state = status::RECEIVING;
at.start_receiving(size); at.start_receiving(at.get_buf_len());
return true; return true;
} }
@ -170,7 +150,7 @@ bool DCE::sock_to_at()
return false; return false;
} }
state = status::SENDING; state = status::SENDING;
int len = ::recv(sock, &buffer[0], size, 0); int len = ::recv(sock, at.get_buf(), at.get_buf_len(), 0);
if (len < 0) { if (len < 0) {
ESP_LOGE(TAG, "read error %d", errno); ESP_LOGE(TAG, "read error %d", errno);
close_sock(); close_sock();
@ -180,7 +160,7 @@ bool DCE::sock_to_at()
close_sock(); close_sock();
return false; return false;
} }
ESP_LOG_BUFFER_HEXDUMP(TAG, &buffer[0], len, ESP_LOG_VERBOSE); ESP_LOG_BUFFER_HEXDUMP(TAG, at.get_buf(), len, ESP_LOG_VERBOSE);
at.start_sending(len); at.start_sending(len);
return true; return true;
} }
@ -212,7 +192,7 @@ bool DCE::accept_sock()
return false; return false;
} }
void DCE::init(int port) void DCE::init_sock(int port)
{ {
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT(); esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
esp_vfs_eventfd_register(&config); esp_vfs_eventfd_register(&config);
@ -228,7 +208,7 @@ void DCE::init(int port)
int opt = 1; int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
ESP_LOGI(TAG, "Socket created"); ESP_LOGI(TAG, "Socket created");
struct sockaddr_in addr = { }; struct sockaddr_in addr = { };
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_port = htons(port); addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
@ -253,7 +233,7 @@ bool DCE::start(std::string host, int port)
dte->on_read(nullptr); dte->on_read(nullptr);
tcp_close(); tcp_close();
dte->on_read([this](uint8_t *data, size_t len) { dte->on_read([this](uint8_t *data, size_t len) {
this->forwarding(data, len); this->perform_at(data, len);
return esp_modem::command_result::TIMEOUT; return esp_modem::command_result::TIMEOUT;
}); });
if (!at.start_connecting(host, port)) { if (!at.start_connecting(host, port)) {
@ -267,6 +247,7 @@ bool DCE::start(std::string host, int port)
bool DCE::init_network() bool DCE::init_network()
{ {
dte->on_read(nullptr);
const int retries = 5; const int retries = 5;
int i = 0; int i = 0;
while (sync() != esp_modem::command_result::OK) { while (sync() != esp_modem::command_result::OK) {
@ -292,6 +273,7 @@ bool DCE::init_network()
ESP_LOGE(TAG, "Failed to open network"); ESP_LOGE(TAG, "Failed to open network");
return false; return false;
} }
net_close();
esp_modem::Task::Delay(1000); esp_modem::Task::Delay(1000);
} }
ESP_LOGD(TAG, "Network opened"); ESP_LOGD(TAG, "Network opened");
@ -341,9 +323,4 @@ DECLARE_SOCK_COMMANDS(return_type name(...) )
#undef ESP_MODEM_DECLARE_DCE_COMMAND #undef ESP_MODEM_DECLARE_DCE_COMMAND
} // namespace sock_dce } // namespace sock_dce

View File

@ -14,30 +14,50 @@
namespace sock_dce { namespace sock_dce {
static constexpr size_t size = 512;
class Listener { enum class status {
IDLE,
CONNECTING,
SENDING,
RECEIVING,
FAILED,
PENDING
};
class Responder {
public: public:
enum class state { enum class ret {
OK, FAIL, IN_PROGRESS OK, FAIL, IN_PROGRESS, NEED_MORE_DATA, NEED_MORE_TIME
}; };
Listener(std::array<uint8_t, size> &b, int &s, int &ready_fd, std::shared_ptr<esp_modem::DTE> &dte_arg): Responder(int &s, int &ready_fd, std::shared_ptr<esp_modem::DTE> &dte_arg):
buffer(b), sock(s), data_ready_fd(ready_fd), dte(dte_arg) {} sock(s), data_ready_fd(ready_fd), dte(dte_arg) {}
state recv(uint8_t *data, size_t len); ret process_data(status state, uint8_t *data, size_t len);
state send(uint8_t *data, size_t len); ret check_async_replies(status state, std::string_view &response);
state send(std::string_view response);
state connect(std::string_view response);
void check_async_replies(std::string_view &response) const;
void start_sending(size_t len); void start_sending(size_t len);
void start_receiving(size_t len); void start_receiving(size_t len);
bool start_connecting(std::string host, int port); bool start_connecting(std::string host, int port);
status pending();
uint8_t *get_buf()
{
return &buffer[0];
}
size_t get_buf_len()
{
return buffer_size;
}
private: private:
static constexpr size_t buffer_size = 512;
ret recv(uint8_t *data, size_t len);
ret send(uint8_t *data, size_t len);
ret send(std::string_view response);
ret connect(std::string_view response);
void send_cmd(std::string_view command) void send_cmd(std::string_view command)
{ {
dte->write((uint8_t *) command.begin(), command.size()); dte->write((uint8_t *) command.begin(), command.size());
} }
std::array<uint8_t, size> &buffer; std::array<uint8_t, buffer_size> buffer;
size_t data_to_recv = 0; size_t data_to_recv = 0;
bool read_again = false; bool read_again = false;
int &sock; int &sock;
@ -61,9 +81,9 @@ esp_modem::return_type name(__VA_ARGS__);
bool init_network(); bool init_network();
bool start(std::string host, int port); bool start(std::string host, int port);
void init(int port); void init_sock(int port);
bool perform(); bool perform_sock();
private: private:
esp_modem::SignalGroup signal; esp_modem::SignalGroup signal;
@ -73,21 +93,11 @@ private:
bool sock_to_at(); bool sock_to_at();
bool at_to_sock(); bool at_to_sock();
void forwarding(uint8_t *data, size_t len); void perform_at(uint8_t *data, size_t len);
enum class status {
IDLE,
CONNECTING,
CONNECTION_FAILED,
SENDING,
SENDING_FAILED,
RECEIVING,
RECEIVING_FAILED
};
status state{status::IDLE}; status state{status::IDLE};
static constexpr uint8_t IDLE = 1; static constexpr uint8_t IDLE = 1;
std::array<uint8_t, size> buffer; Responder at{sock, data_ready_fd, dte};
Listener at{buffer, sock, data_ready_fd, dte};
int sock {-1}; int sock {-1};
int listen_sock {-1}; int listen_sock {-1};
int data_ready_fd {-1}; int data_ready_fd {-1};