mirror of
https://github.com/espressif/esp-protocols.git
synced 2025-06-25 09:21:32 +02:00
fix(modem): Added support for inflatable buffer
As a configurable option, if disabled we report an error. Closes https://github.com/espressif/esp-protocols/issues/272
This commit is contained in:
@ -16,6 +16,17 @@ menu "esp-modem"
|
||||
in command mode might come fragmented in rare cases so might need to retry
|
||||
AT commands.
|
||||
|
||||
config ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
|
||||
bool "Use inflatable buffer in DCE"
|
||||
default n
|
||||
help
|
||||
If enabled we will process the ongoing AT command by growing the current
|
||||
buffer (if we've run out the preconfigured buffer).
|
||||
If disabled, we simply report a failure.
|
||||
Use this if additional allocation is not a problem and you need to reliably process
|
||||
all commands, usually with sporadically longer responses than the configured buffer.
|
||||
Could be also used to defragment AT replies in CMUX mode if CMUX_DEFRAGMENT_PAYLOAD=n
|
||||
|
||||
config ESP_MODEM_CMUX_DELAY_AFTER_DLCI_SETUP
|
||||
int "Delay in ms to wait before creating another virtual terminal"
|
||||
default 0
|
||||
|
@ -175,6 +175,9 @@ extern "C" void app_main(void)
|
||||
#endif
|
||||
assert(dce);
|
||||
|
||||
/* Try to connect to the network and publish an mqtt topic */
|
||||
StatusHandler handler;
|
||||
|
||||
if (dte_config.uart_config.flow_control == ESP_MODEM_FLOW_CONTROL_HW) {
|
||||
if (command_result::OK != dce->set_flow_control(2, 2)) {
|
||||
ESP_LOGE(TAG, "Failed to set the set_flow_control mode");
|
||||
@ -215,8 +218,6 @@ extern "C" void app_main(void)
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Try to connect to the network and publish an mqtt topic */
|
||||
StatusHandler handler;
|
||||
if (!handler.wait_for(StatusHandler::IP_Event, 60000)) {
|
||||
ESP_LOGE(TAG, "Cannot get IP within specified timeout... exiting");
|
||||
return;
|
||||
|
@ -79,6 +79,13 @@ public:
|
||||
*/
|
||||
void set_read_cb(std::function<bool(uint8_t *data, size_t len)> f);
|
||||
|
||||
/**
|
||||
* @brief Sets read callback for manual command processing
|
||||
* Note that this API also locks the command API, which can only be used
|
||||
* after you remove the callback by dte->on_read(nullptr)
|
||||
*
|
||||
* @param on_data Function to be called when a command response is available
|
||||
*/
|
||||
void on_read(got_line_cb on_data) override;
|
||||
|
||||
/**
|
||||
@ -122,7 +129,6 @@ protected:
|
||||
}
|
||||
friend class Scoped<DTE>; /*!< Declaring "Scoped<DTE> lock(dte)" locks this instance */
|
||||
private:
|
||||
static const size_t GOT_LINE = SignalGroup::bit0; /*!< Bit indicating response available */
|
||||
|
||||
[[nodiscard]] bool setup_cmux(); /*!< Internal setup of CMUX mode */
|
||||
[[nodiscard]] bool exit_cmux(); /*!< Exit of CMUX mode and cleanup */
|
||||
@ -134,9 +140,73 @@ private:
|
||||
std::shared_ptr<Terminal> primary_term; /*!< Reference to the primary terminal (mostly for sending commands) */
|
||||
std::shared_ptr<Terminal> secondary_term; /*!< Secondary terminal for this DTE */
|
||||
modem_mode mode; /*!< DTE operation mode */
|
||||
SignalGroup signal; /*!< Event group used to signal request-response operations */
|
||||
command_result result; /*!< Command result of the currently exectuted command */
|
||||
std::function<bool(uint8_t *data, size_t len)> on_data; /*!< on data callback for current terminal */
|
||||
|
||||
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
|
||||
/**
|
||||
* @brief Implements an extra buffer that is used to capture partial reads from underlying terminals
|
||||
* when we run out of the standard buffer
|
||||
*/
|
||||
struct extra_buffer {
|
||||
extra_buffer(): buffer(nullptr) {}
|
||||
~extra_buffer()
|
||||
{
|
||||
delete buffer;
|
||||
}
|
||||
std::vector<uint8_t> *buffer;
|
||||
size_t consumed{0};
|
||||
void grow(size_t need_size);
|
||||
void deflate()
|
||||
{
|
||||
grow(0);
|
||||
consumed = 0;
|
||||
}
|
||||
[[nodiscard]] uint8_t *begin() const
|
||||
{
|
||||
return &buffer->at(0);
|
||||
}
|
||||
[[nodiscard]] uint8_t *current() const
|
||||
{
|
||||
return &buffer->at(0) + consumed;
|
||||
}
|
||||
} inflatable;
|
||||
#endif // CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
|
||||
|
||||
/**
|
||||
* @brief Set internal command callbacks to the underlying terminal.
|
||||
* Here we capture command replies to be processed by supplied command callbacks in struct command_cb.
|
||||
*/
|
||||
void set_command_callbacks();
|
||||
|
||||
/**
|
||||
* @brief This abstracts command callback processing and implements its locking, signaling of completion and timeouts.
|
||||
*/
|
||||
struct command_cb {
|
||||
static const size_t GOT_LINE = SignalGroup::bit0; /*!< Bit indicating response available */
|
||||
got_line_cb got_line; /*!< Supplied command callback */
|
||||
Lock line_lock{}; /*!< Command callback locking mechanism */
|
||||
char separator{}; /*!< Command reply separator (end of line/processing unit) */
|
||||
command_result result{}; /*!< Command return code */
|
||||
SignalGroup signal; /*!< Event group used to signal request-response operations */
|
||||
bool process_line(uint8_t *data, size_t consumed, size_t len); /*!< Lets the processing callback handle one line (processing unit) */
|
||||
bool wait_for_line(uint32_t time_ms); /*!< Waiting for command processing */
|
||||
void set(got_line_cb l, char s = '\n') /*!< Sets the command callback atomically */
|
||||
{
|
||||
Scoped<Lock> lock(line_lock);
|
||||
if (l) {
|
||||
// if we set the line callback, we have to reset the signal and the result
|
||||
signal.clear(GOT_LINE);
|
||||
result = command_result::TIMEOUT;
|
||||
}
|
||||
got_line = std::move(l);
|
||||
separator = s;
|
||||
}
|
||||
void give_up() /*!< Reports other than timeout error when processing replies (out of buffer) */
|
||||
{
|
||||
result = command_result::FAIL;
|
||||
signal.set(GOT_LINE);
|
||||
}
|
||||
} command_cb; /*!< Command callback utility class */
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -39,6 +39,7 @@ static bool exit_data(DTE &dte, ModuleIf &device, Netif &netif)
|
||||
});
|
||||
netif.wait_until_ppp_exits();
|
||||
if (!signal->wait(1, 2000)) {
|
||||
dte.set_read_cb(nullptr);
|
||||
if (!device.set_mode(modem_mode::COMMAND_MODE)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -17,53 +17,118 @@ static const size_t dte_default_buffer_size = 1000;
|
||||
DTE::DTE(const esp_modem_dte_config *config, std::unique_ptr<Terminal> terminal):
|
||||
buffer(config->dte_buffer_size),
|
||||
cmux_term(nullptr), primary_term(std::move(terminal)), secondary_term(primary_term),
|
||||
mode(modem_mode::UNDEF) {}
|
||||
mode(modem_mode::UNDEF)
|
||||
{
|
||||
set_command_callbacks();
|
||||
}
|
||||
|
||||
DTE::DTE(std::unique_ptr<Terminal> terminal):
|
||||
buffer(dte_default_buffer_size),
|
||||
cmux_term(nullptr), primary_term(std::move(terminal)), secondary_term(primary_term),
|
||||
mode(modem_mode::UNDEF) {}
|
||||
mode(modem_mode::UNDEF)
|
||||
{
|
||||
set_command_callbacks();
|
||||
}
|
||||
|
||||
DTE::DTE(const esp_modem_dte_config *config, std::unique_ptr<Terminal> t, std::unique_ptr<Terminal> s):
|
||||
buffer(config->dte_buffer_size),
|
||||
cmux_term(nullptr), primary_term(std::move(t)), secondary_term(std::move(s)),
|
||||
mode(modem_mode::DUAL_MODE) {}
|
||||
mode(modem_mode::UNDEF)
|
||||
{
|
||||
set_command_callbacks();
|
||||
}
|
||||
|
||||
DTE::DTE(std::unique_ptr<Terminal> t, std::unique_ptr<Terminal> s):
|
||||
buffer(dte_default_buffer_size),
|
||||
cmux_term(nullptr), primary_term(std::move(t)), secondary_term(std::move(s)),
|
||||
mode(modem_mode::DUAL_MODE) {}
|
||||
mode(modem_mode::UNDEF)
|
||||
{
|
||||
set_command_callbacks();
|
||||
}
|
||||
|
||||
void DTE::set_command_callbacks()
|
||||
{
|
||||
primary_term->set_read_cb([this](uint8_t *data, size_t len) {
|
||||
Scoped<Lock> l(command_cb.line_lock);
|
||||
if (command_cb.got_line == nullptr) {
|
||||
return false;
|
||||
}
|
||||
if (data) {
|
||||
// For terminals which post data directly with the callback (CMUX)
|
||||
// we cannot defragment unless we allocate, but
|
||||
// we'll try to process the data on the actual buffer
|
||||
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
|
||||
if (inflatable.consumed != 0) {
|
||||
inflatable.grow(inflatable.consumed + len);
|
||||
std::memcpy(inflatable.current(), data, len);
|
||||
data = inflatable.begin();
|
||||
}
|
||||
if (command_cb.process_line(data, inflatable.consumed, len)) {
|
||||
return true;
|
||||
}
|
||||
// at this point we're sure that the data processing hasn't finished,
|
||||
// and we have to grow the inflatable buffer (if enabled) or give up
|
||||
if (inflatable.consumed == 0) {
|
||||
inflatable.grow(len);
|
||||
std::memcpy(inflatable.begin(), data, len);
|
||||
}
|
||||
inflatable.consumed += len;
|
||||
return false;
|
||||
#else
|
||||
if (command_cb.process_line(data, 0, len)) {
|
||||
return true;
|
||||
}
|
||||
// cannot inflate and the processing hasn't finishes in the first iteration -> report a failure
|
||||
command_cb.give_up();
|
||||
return true;
|
||||
#endif
|
||||
}
|
||||
// data == nullptr: Terminals which request users to read current data
|
||||
// we're able to use DTE's buffer to defragment it; as long as we consume less that the buffer size
|
||||
if (buffer.size > buffer.consumed) {
|
||||
data = buffer.get();
|
||||
len = primary_term->read(data + buffer.consumed, buffer.size - buffer.consumed);
|
||||
if (command_cb.process_line(data, buffer.consumed, len)) {
|
||||
return true;
|
||||
}
|
||||
buffer.consumed += len;
|
||||
return false;
|
||||
}
|
||||
// we have used the entire DTE's buffer, need to use the inflatable buffer to continue
|
||||
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
|
||||
if (inflatable.consumed == 0) {
|
||||
inflatable.grow(buffer.size + len);
|
||||
std::memcpy(inflatable.begin(), buffer.get(), buffer.size);
|
||||
inflatable.consumed = buffer.size;
|
||||
} else {
|
||||
inflatable.grow(inflatable.consumed + len);
|
||||
}
|
||||
len = primary_term->read(inflatable.current(), len);
|
||||
if (command_cb.process_line(inflatable.begin(), inflatable.consumed, len)) {
|
||||
return true;
|
||||
}
|
||||
inflatable.consumed += len;
|
||||
return false;
|
||||
#else
|
||||
// cannot inflate -> report a failure
|
||||
command_cb.give_up();
|
||||
return true;
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
command_result DTE::command(const std::string &command, got_line_cb got_line, uint32_t time_ms, const char separator)
|
||||
{
|
||||
Scoped<Lock> l(internal_lock);
|
||||
result = command_result::TIMEOUT;
|
||||
signal.clear(GOT_LINE);
|
||||
primary_term->set_read_cb([this, got_line, separator](uint8_t *data, size_t len) {
|
||||
if (!data) {
|
||||
data = buffer.get();
|
||||
len = primary_term->read(data + buffer.consumed, buffer.size - buffer.consumed);
|
||||
} else {
|
||||
buffer.consumed = 0; // if the underlying terminal contains data, we cannot fragment
|
||||
}
|
||||
if (memchr(data + buffer.consumed, separator, len)) {
|
||||
result = got_line(data, buffer.consumed + len);
|
||||
if (result == command_result::OK || result == command_result::FAIL) {
|
||||
signal.set(GOT_LINE);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
buffer.consumed += len;
|
||||
return false;
|
||||
});
|
||||
Scoped<Lock> l1(internal_lock);
|
||||
command_cb.set(got_line, separator);
|
||||
primary_term->write((uint8_t *)command.c_str(), command.length());
|
||||
auto got_lf = signal.wait(GOT_LINE, time_ms);
|
||||
if (got_lf && result == command_result::TIMEOUT) {
|
||||
ESP_MODEM_THROW_IF_ERROR(ESP_ERR_INVALID_STATE);
|
||||
}
|
||||
command_cb.wait_for_line(time_ms);
|
||||
command_cb.set(nullptr);
|
||||
buffer.consumed = 0;
|
||||
primary_term->set_read_cb(nullptr);
|
||||
return result;
|
||||
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
|
||||
inflatable.deflate();
|
||||
#endif
|
||||
return command_cb.result;
|
||||
}
|
||||
|
||||
command_result DTE::command(const std::string &cmd, got_line_cb got_line, uint32_t time_ms)
|
||||
@ -91,6 +156,7 @@ void DTE::exit_cmux_internal()
|
||||
primary_term = std::move(ejected.first);
|
||||
buffer = std::move(ejected.second);
|
||||
secondary_term = primary_term;
|
||||
set_command_callbacks();
|
||||
}
|
||||
|
||||
bool DTE::setup_cmux()
|
||||
@ -113,7 +179,7 @@ bool DTE::setup_cmux()
|
||||
cmux_term = nullptr;
|
||||
return false;
|
||||
}
|
||||
|
||||
set_command_callbacks();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -135,6 +201,7 @@ bool DTE::set_mode(modem_mode m)
|
||||
if (mode == modem_mode::CMUX_MODE || mode == modem_mode::CMUX_MANUAL_MODE || mode == modem_mode::DUAL_MODE) {
|
||||
// mode stays the same, but need to swap terminals (as command has been switched)
|
||||
secondary_term.swap(primary_term);
|
||||
set_command_callbacks();
|
||||
} else {
|
||||
mode = m;
|
||||
}
|
||||
@ -177,6 +244,7 @@ bool DTE::set_mode(modem_mode m)
|
||||
// manual CMUX transitions: Swap terminals
|
||||
if (m == modem_mode::CMUX_MANUAL_SWAP && mode == modem_mode::CMUX_MANUAL_MODE) {
|
||||
secondary_term.swap(primary_term);
|
||||
set_command_callbacks();
|
||||
return true;
|
||||
}
|
||||
mode = modem_mode::UNDEF;
|
||||
@ -185,6 +253,10 @@ bool DTE::set_mode(modem_mode m)
|
||||
|
||||
void DTE::set_read_cb(std::function<bool(uint8_t *, size_t)> f)
|
||||
{
|
||||
if (f == nullptr) {
|
||||
set_command_callbacks();
|
||||
return;
|
||||
}
|
||||
on_data = std::move(f);
|
||||
secondary_term->set_read_cb([this](uint8_t *data, size_t len) {
|
||||
if (!data) { // if no data available from terminal callback -> need to explicitly read some
|
||||
@ -246,6 +318,41 @@ void DTE::on_read(got_line_cb on_read_cb)
|
||||
});
|
||||
}
|
||||
|
||||
bool DTE::command_cb::process_line(uint8_t *data, size_t consumed, size_t len)
|
||||
{
|
||||
if (memchr(data + consumed, separator, len)) {
|
||||
result = got_line(data, consumed + len);
|
||||
if (result == command_result::OK || result == command_result::FAIL) {
|
||||
signal.set(GOT_LINE);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool DTE::command_cb::wait_for_line(uint32_t time_ms)
|
||||
{
|
||||
auto got_lf = signal.wait(command_cb::GOT_LINE, time_ms);
|
||||
if (got_lf && result == command_result::TIMEOUT) {
|
||||
ESP_MODEM_THROW_IF_ERROR(ESP_ERR_INVALID_STATE);
|
||||
}
|
||||
return got_lf;
|
||||
}
|
||||
|
||||
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
|
||||
void DTE::extra_buffer::grow(size_t need_size)
|
||||
{
|
||||
if (need_size == 0) {
|
||||
delete buffer;
|
||||
buffer = nullptr;
|
||||
} else if (buffer == nullptr) {
|
||||
buffer = new std::vector<uint8_t>(need_size);
|
||||
} else {
|
||||
buffer->resize(need_size);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Implemented here to keep all headers C++11 compliant
|
||||
*/
|
||||
|
@ -64,7 +64,6 @@ public:
|
||||
|
||||
void set_read_cb(std::function<bool(uint8_t *data, size_t len)> f) override
|
||||
{
|
||||
ESP_MODEM_THROW_IF_FALSE(signal.wait(TASK_PARAMS, 1000), "Failed to set UART task params");
|
||||
on_read = std::move(f);
|
||||
}
|
||||
|
||||
@ -91,7 +90,6 @@ private:
|
||||
static const size_t TASK_INIT = BIT0;
|
||||
static const size_t TASK_START = BIT1;
|
||||
static const size_t TASK_STOP = BIT2;
|
||||
static const size_t TASK_PARAMS = BIT3;
|
||||
|
||||
QueueHandle_t event_queue;
|
||||
uart_resource uart;
|
||||
@ -118,9 +116,7 @@ void UartTerminal::task()
|
||||
return; // exits to the static method where the task gets deleted
|
||||
}
|
||||
while (signal.is_any(TASK_START)) {
|
||||
signal.set(TASK_PARAMS);
|
||||
if (get_event(event, 100)) {
|
||||
signal.clear(TASK_PARAMS);
|
||||
switch (event.type) {
|
||||
case UART_DATA:
|
||||
uart_get_buffered_data_len(uart.port, &len);
|
||||
|
Reference in New Issue
Block a user