fix slave tcp skips response from multiple masters

This commit is contained in:
aleks
2025-07-23 12:12:04 +02:00
parent 079b51e23b
commit 57169d5174
18 changed files with 404 additions and 162 deletions

25
Kconfig
View File

@@ -33,7 +33,20 @@ menu "Modbus configuration"
help
Modbus TCP connection timeout in seconds.
Once expired the current connection with the client will be closed
and Modbus slave will be waiting for new connection to accept.
and Modbus TCP slave will be waiting for new connection to accept.
This timeout is applicable for Modbus TCP master when it tries to connect to the Modbus TCP slaves.
config FMB_TCP_KEEP_ALIVE_TOUT_SEC
int "Modbus TCP keep alive timeout"
default 5
range 1 7200
depends on FMB_COMM_MODE_TCP_EN
help
This option is used to set keep-alive time for Modbus TCP connections.
This timeout is the interval during which the device sends keep-alive messages
to maintain a connection with a server before register the error.
The keep-alive time can be configured and should be set based on the actual needs of the application,
considering the trade-off between keeping the connection alive and minimizing system load.
config FMB_TCP_UID_ENABLED
bool "Modbus TCP enable UID (Unit Identifier) support"
@@ -72,12 +85,12 @@ menu "Modbus configuration"
then master can send next frame.
config FMB_QUEUE_LENGTH
int "Modbus serial task queue length"
range 0 200
default 20
int "Modbus event task queue length"
range 10 500
default 50
help
Modbus serial driver queue length. It is used by event queue task.
See the serial driver API for more information.
Modbus event queue length. It is used by event queue tasks
for corresponding communication mode.
config FMB_PORT_TASK_STACK_SIZE
int "Modbus port task stack size"

View File

@@ -78,11 +78,7 @@ typedef enum mb_event_enum {
EV_EXECUTE = 0x0008, /*!< Execute function. */
EV_FRAME_TRANSMIT = 0x0010, /*!< Transmission started . */
EV_FRAME_SENT = 0x0020, /*!< Frame sent. */
EV_ERROR_PROCESS = 0x0040, /*!< Error process state. */
EV_MASTER_ERROR_RESPOND_TIMEOUT = 0x0080, /*!< Request respond timeout. */
EV_MASTER_ERROR_RECEIVE_DATA = 0x0100, /*!< Request receive data error. */
EV_MASTER_ERROR_EXECUTE_FUNCTION = 0x0200, /*!< Request execute function error. */
EV_MASTER_PROCESS_SUCCESS = 0x0400 /*!< Master error process. */
EV_ERROR_PROCESS = 0x0040 /*!< Error process state. */
} mb_event_enum_t;
/*! \ingroup modbus
@@ -108,12 +104,12 @@ typedef mb_exception_t (*mb_fn_handler_fp)(void *, uint8_t *frame_ptr, uint16_t
/*! \ingroup modbus
* \brief Error event type
*/
typedef enum mb_err_event_enum {
EV_ERROR_INIT, /*!< No error, initial state. */
EV_ERROR_RESPOND_TIMEOUT, /*!< Slave respond timeout. */
EV_ERROR_RECEIVE_DATA, /*!< Receive frame data error. */
EV_ERROR_EXECUTE_FUNCTION, /*!< Execute function error. */
EV_ERROR_OK /*!< No error, processing completed. */
typedef enum _mb_err_event_enum {
EV_ERROR_INIT, /*!< No error, initial state. */
EV_ERROR_RESPOND_TIMEOUT, /*!< Slave respond timeout. */
EV_ERROR_RECEIVE_DATA, /*!< Receive frame data error. */
EV_ERROR_EXECUTE_FUNCTION, /*!< Execute function error. */
EV_ERROR_OK /*!< No error, processing completed. */
} mb_err_event_t;
typedef struct mb_event_s {
@@ -150,9 +146,9 @@ typedef enum
*/
typedef enum
{
MB_TMODE_T35, /*!< Master receive frame T3.5 timeout. */
MB_TMODE_RESPOND_TIMEOUT, /*!< Master wait respond for slave. */
MB_TMODE_CONVERT_DELAY /*!< Master sent broadcast , then delay sometime.*/
MB_TMODE_T35, /*!< Master receive frame T3.5 timeout. */
MB_TMODE_RESPOND_TIMEOUT, /*!< Master wait respond for slave. */
MB_TMODE_CONVERT_DELAY /*!< Master sent broadcast , then delay sometime.*/
} mb_timer_mode_enum_t;
#ifdef __cplusplus

View File

@@ -497,25 +497,25 @@ static uint8_t mbm_get_dest_addr(mb_base_t *inst)
void mbm_error_cb_respond_timeout(mb_base_t *inst, uint8_t dest_addr, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_MASTER_ERROR_RESPOND_TIMEOUT);
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_RESPOND_TIMEOUT);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
void mbm_error_cb_receive_data(mb_base_t *inst, uint8_t dest_addr, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_MASTER_ERROR_RECEIVE_DATA);
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_RECEIVE_DATA);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
void mbm_error_cb_execute_function(mb_base_t *inst, uint8_t dest_address, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_MASTER_ERROR_EXECUTE_FUNCTION);
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_EXECUTE_FUNCTION);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
void mbm_error_cb_request_success(mb_base_t *inst, uint8_t dest_address, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_MASTER_PROCESS_SUCCESS);
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_OK);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
@@ -553,6 +553,8 @@ mb_err_enum_t mbm_poll(mb_base_t *inst)
mb_port_event_set_err_type(MB_OBJ(inst->port_obj), EV_ERROR_RESPOND_TIMEOUT);
(void)mb_port_event_post(MB_OBJ(inst->port_obj), EVENT(EV_ERROR_PROCESS));
ESP_LOGE(TAG, MB_OBJ_FMT", frame send error. %d", MB_OBJ_PARENT(inst), (int)status);
} else {
(void)mb_port_event_post(MB_OBJ(inst->port_obj), EVENT(EV_FRAME_SENT));
}
// Initialize modbus transaction
mbm_obj->curr_trans_id = event.trans_id;

View File

@@ -423,6 +423,30 @@ mb_err_enum_t mbs_disable(mb_base_t *inst)
return status;
}
void mbs_error_cb_respond_timeout(mb_base_t *inst, uint8_t dest_addr, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_RESPOND_TIMEOUT);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
void mbs_error_cb_receive_data(mb_base_t *inst, uint8_t dest_addr, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_RECEIVE_DATA);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
void mbs_error_cb_execute_function(mb_base_t *inst, uint8_t dest_address, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_EXECUTE_FUNCTION);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
void mbs_error_cb_request_success(mb_base_t *inst, uint8_t dest_address, const uint8_t *pdu_data, uint16_t pdu_length)
{
mb_port_event_set_resp_flag(MB_BASE2PORT(inst), EV_ERROR_OK);
ESP_LOG_BUFFER_HEX_LEVEL(__func__, (void *)pdu_data, pdu_length, ESP_LOG_DEBUG);
}
mb_err_enum_t mbs_poll(mb_base_t *inst)
{
mbs_object_t *mbs_obj = MB_GET_OBJ_CTX(inst, mbs_object_t, base);;
@@ -430,6 +454,8 @@ mb_err_enum_t mbs_poll(mb_base_t *inst)
mb_exception_t exception;
mb_err_enum_t status = MB_ENOERR;
mb_event_t event;
mb_err_event_t error_type = EV_ERROR_INIT;
uint64_t time_div_us = 0;
/* Check if the protocol stack is ready. */
if (mbs_obj->cur_state != STATE_ENABLED) {
@@ -451,13 +477,19 @@ mb_err_enum_t mbs_poll(mb_base_t *inst)
// Check if the frame is for us. If not ,send an error process event.
if (status == MB_ENOERR) {
// Check if the frame is for us. If not ignore the frame.
if((mbs_obj->rcv_addr == mbs_obj->mb_address) || (mbs_obj->rcv_addr == MB_ADDRESS_BROADCAST)
if((mbs_obj->rcv_addr == mbs_obj->mb_address) || (mbs_obj->rcv_addr == MB_ADDRESS_BROADCAST)
|| (mbs_obj->rcv_addr == MB_TCP_PSEUDO_ADDRESS)) {
mbs_obj->curr_trans_id = event.get_ts;
(void)mb_port_event_post(MB_OBJ(inst->port_obj), EVENT(EV_EXECUTE | EV_TRANS_START));
MB_PRT_BUF(inst->descr.parent_name, ":MB_RECV",
&mbs_obj->frame[MB_PDU_FUNC_OFF], mbs_obj->length, ESP_LOG_DEBUG);
}
} else {
ESP_LOGE(TAG, MB_OBJ_FMT":frame receive error. %d", MB_OBJ_PARENT(inst), (int)status);
// If the frame was not received correctly, post an error event.
mb_port_event_set_err_type(MB_OBJ(inst->port_obj), EV_ERROR_RECEIVE_DATA);
(void)mb_port_event_post(MB_OBJ(inst->port_obj), EVENT(EV_ERROR_PROCESS));
mbs_obj->length = 0; // Reset length to avoid processing junk data.
}
break;
@@ -481,7 +513,11 @@ mb_err_enum_t mbs_poll(mb_base_t *inst)
(uint16_t)mbs_obj->length, ESP_LOG_DEBUG);
status = MB_OBJ(inst->transp_obj)->frm_send(inst->transp_obj, mbs_obj->rcv_addr, mbs_obj->frame, mbs_obj->length);
if (status != MB_ENOERR) {
ESP_LOGE(TAG, MB_OBJ_FMT":frame send error. %d", MB_OBJ_PARENT(inst), (int)status);
ESP_LOGE(TAG, MB_OBJ_FMT": frame send error: %d.", MB_OBJ_PARENT(inst), (int)status);
mb_port_event_set_err_type(MB_OBJ(inst->port_obj), EV_ERROR_RESPOND_TIMEOUT);
(void)mb_port_event_post(MB_OBJ(inst->port_obj), EVENT(EV_ERROR_PROCESS));
} else {
(void)mb_port_event_post(MB_OBJ(inst->port_obj), EVENT(EV_FRAME_SENT));
}
}
break;
@@ -492,9 +528,49 @@ mb_err_enum_t mbs_poll(mb_base_t *inst)
case EV_FRAME_SENT:
ESP_LOGD(TAG, MB_OBJ_FMT":EV_MASTER_FRAME_SENT", MB_OBJ_PARENT(inst));
uint64_t time_div_us = mbs_obj->curr_trans_id ? (event.get_ts - mbs_obj->curr_trans_id) : 0;
time_div_us = mbs_obj->curr_trans_id ? (event.get_ts - mbs_obj->curr_trans_id) : 0;
mbs_obj->curr_trans_id = 0;
ESP_LOGD(TAG, MB_OBJ_FMT", transaction processing time(us) = %" PRId64, MB_OBJ_PARENT(inst), time_div_us);
error_type = mb_port_event_get_err_type(MB_OBJ(inst->port_obj));
if (error_type == EV_ERROR_INIT) {
ESP_LOGD(TAG, MB_OBJ_FMT", set event EV_ERROR_OK", MB_OBJ_PARENT(inst));
mb_port_event_set_err_type(MB_OBJ(inst->port_obj), EV_ERROR_OK);
(void)mb_port_event_post(MB_OBJ(inst->port_obj), EVENT(EV_ERROR_PROCESS));
}
break;
case EV_ERROR_PROCESS:
ESP_LOGD(TAG, MB_OBJ_FMT":EV_ERROR_PROCESS", MB_OBJ_PARENT(inst));
// stop timer and execute specified error process callback function.
mb_port_timer_disable(MB_OBJ(inst->port_obj));
error_type = mb_port_event_get_err_type(MB_OBJ(inst->port_obj));
switch (error_type)
{
case EV_ERROR_RESPOND_TIMEOUT:
mbs_error_cb_respond_timeout(inst, mbs_obj->rcv_addr,
mbs_obj->frame, mbs_obj->length);
break;
case EV_ERROR_RECEIVE_DATA:
mbs_error_cb_receive_data(inst, mbs_obj->rcv_addr,
mbs_obj->frame, mbs_obj->length);
break;
case EV_ERROR_EXECUTE_FUNCTION:
mbs_error_cb_execute_function(inst, mbs_obj->rcv_addr,
mbs_obj->frame, mbs_obj->length);
break;
case EV_ERROR_OK:
mbs_error_cb_request_success(inst, mbs_obj->rcv_addr,
mbs_obj->frame, mbs_obj->length);
break;
default:
ESP_LOGE(TAG, MB_OBJ_FMT", incorrect error type = %d.", MB_OBJ_PARENT(inst), (int)error_type);
break;
}
mb_port_event_set_err_type(MB_OBJ(inst->port_obj), EV_ERROR_INIT);
time_div_us = mbs_obj->curr_trans_id ? (event.get_ts - mbs_obj->curr_trans_id) : 0;
mbs_obj->curr_trans_id = 0;
ESP_LOGD(TAG, MB_OBJ_FMT", transaction processing time(us) = %" PRId64, MB_OBJ_PARENT(inst), time_div_us);
mb_port_event_res_release(MB_OBJ(inst->port_obj));
break;
default:

View File

@@ -16,7 +16,7 @@ typedef struct transaction_item {
uint8_t *buffer;
uint16_t len;
int node_id;
int msg_id;
uint16_t msg_id;
void *pnode;
transaction_tick_t tick;
_Atomic(int) state;
@@ -74,7 +74,7 @@ transaction_item_handle_t transaction_enqueue(transaction_handle_t transaction,
return item;
}
transaction_item_handle_t transaction_get(transaction_handle_t transaction, int msg_id)
transaction_item_handle_t transaction_get(transaction_handle_t transaction, uint16_t msg_id)
{
transaction_item_handle_t item;
CRITICAL_SECTION_LOCK(transaction->lock);
@@ -138,6 +138,14 @@ esp_err_t transaction_delete_item(transaction_handle_t transaction, transaction_
return ESP_FAIL;
}
uint16_t transaction_item_get_id(transaction_item_handle_t item)
{
if (item) {
return atomic_load(&(item->msg_id));
}
return 0xFFFF;
}
uint8_t *transaction_item_get_data(transaction_item_handle_t item, size_t *len, uint16_t *msg_id, int *node_id)
{
if (item) {
@@ -155,7 +163,7 @@ uint8_t *transaction_item_get_data(transaction_item_handle_t item, size_t *len,
return NULL;
}
esp_err_t transaction_delete(transaction_handle_t transaction, int msg_id)
esp_err_t transaction_delete(transaction_handle_t transaction, uint16_t msg_id)
{
transaction_item_handle_t item, tmp;
CRITICAL_SECTION_LOCK(transaction->lock);
@@ -174,7 +182,7 @@ esp_err_t transaction_delete(transaction_handle_t transaction, int msg_id)
return ESP_FAIL;
}
esp_err_t transaction_set_state(transaction_handle_t transaction, int msg_id, pending_state_t state)
esp_err_t transaction_set_state(transaction_handle_t transaction, uint16_t msg_id, pending_state_t state)
{
transaction_item_handle_t item = transaction_get(transaction, msg_id);
if (item) {
@@ -201,7 +209,15 @@ esp_err_t transaction_item_set_state(transaction_item_handle_t item, pending_sta
return ESP_FAIL;
}
esp_err_t transaction_set_tick(transaction_handle_t transaction, int msg_id, transaction_tick_t tick)
transaction_tick_t transaction_item_get_tick(transaction_item_handle_t item)
{
if (item) {
return atomic_load(&(item->tick));
}
return 0;
}
esp_err_t transaction_set_tick(transaction_handle_t transaction, uint16_t msg_id, transaction_tick_t tick)
{
transaction_item_handle_t item = transaction_get(transaction, msg_id);
if (item) {
@@ -211,9 +227,9 @@ esp_err_t transaction_set_tick(transaction_handle_t transaction, int msg_id, tra
return ESP_FAIL;
}
int transaction_delete_single_expired(transaction_handle_t transaction, transaction_tick_t current_tick, transaction_tick_t timeout)
uint16_t transaction_delete_single_expired(transaction_handle_t transaction, transaction_tick_t current_tick, transaction_tick_t timeout)
{
int msg_id = -1;
uint16_t msg_id = 0xFFFF;
transaction_item_handle_t item;
CRITICAL_SECTION_LOCK(transaction->lock);
STAILQ_FOREACH(item, transaction->list, next) {
@@ -232,6 +248,24 @@ int transaction_delete_single_expired(transaction_handle_t transaction, transact
return msg_id;
}
int transaction_delete_by_node_id(transaction_handle_t transaction, int node_id)
{
int deleted_items = 0;
transaction_item_handle_t item, tmp;
CRITICAL_SECTION_LOCK(transaction->lock);
STAILQ_FOREACH_SAFE(item, transaction->list, next, tmp) {
if (item->node_id == node_id) {
STAILQ_REMOVE(transaction->list, item, transaction_item, next);
free(item->buffer);
transaction->size -= item->len;
free(item);
deleted_items ++;
}
}
CRITICAL_SECTION_UNLOCK(transaction->lock);
return deleted_items;
}
int transaction_delete_expired(transaction_handle_t transaction, transaction_tick_t current_tick, transaction_tick_t timeout)
{
int deleted_items = 0;

View File

@@ -27,13 +27,13 @@ typedef struct transaction_item *transaction_item_handle_t;
typedef struct transaction_message {
uint8_t *buffer;
uint16_t len;
int msg_id;
uint16_t msg_id;
int node_id;
void *pnode;
} transaction_message_t;
typedef struct transaction_message *transaction_message_handle_t;
typedef long long transaction_tick_t;
typedef uint64_t transaction_tick_t;
typedef enum pending_state {
INIT,
@@ -48,12 +48,14 @@ typedef enum pending_state {
transaction_handle_t transaction_init(void);
transaction_item_handle_t transaction_enqueue(transaction_handle_t transaction, transaction_message_handle_t message, transaction_tick_t tick);
transaction_item_handle_t transaction_dequeue(transaction_handle_t transaction, pending_state_t state, transaction_tick_t *tick);
transaction_item_handle_t transaction_get(transaction_handle_t transaction, int msg_id);
transaction_item_handle_t transaction_dequeue(transaction_handle_t transaction, pending_state_t pending, transaction_tick_t *tick);
transaction_item_handle_t transaction_get(transaction_handle_t transaction, uint16_t msg_id);
transaction_item_handle_t transaction_get_first(transaction_handle_t transaction);
uint16_t transaction_item_get_id(transaction_item_handle_t item);
uint8_t *transaction_item_get_data(transaction_item_handle_t item, size_t *len, uint16_t *msg_id, int *node_id);
esp_err_t transaction_delete(transaction_handle_t transaction, int msg_id);
esp_err_t transaction_delete(transaction_handle_t transaction, uint16_t msg_id);
esp_err_t transaction_delete_item(transaction_handle_t transaction, transaction_item_handle_t item);
int transaction_delete_by_node_id(transaction_handle_t transaction, int node_id);
int transaction_delete_expired(transaction_handle_t transaction, transaction_tick_t current_tick, transaction_tick_t timeout);
/**
@@ -61,11 +63,12 @@ int transaction_delete_expired(transaction_handle_t transaction, transaction_tic
*
* @return msg id of the deleted message, -1 if no expired message in the transaction
*/
int transaction_delete_single_expired(transaction_handle_t transaction, transaction_tick_t current_tick, transaction_tick_t timeout);
esp_err_t transaction_set_state(transaction_handle_t transaction, int msg_id, pending_state_t state);
uint16_t transaction_delete_single_expired(transaction_handle_t transaction, transaction_tick_t current_tick, transaction_tick_t timeout);
esp_err_t transaction_set_state(transaction_handle_t transaction, uint16_t msg_id, pending_state_t pending);
pending_state_t transaction_item_get_state(transaction_item_handle_t item);
esp_err_t transaction_item_set_state(transaction_item_handle_t item, pending_state_t state);
esp_err_t transaction_set_tick(transaction_handle_t transaction, int msg_id, transaction_tick_t tick);
esp_err_t transaction_set_tick(transaction_handle_t transaction, uint16_t msg_id, transaction_tick_t tick);
transaction_tick_t transaction_item_get_tick(transaction_item_handle_t item);
uint64_t transaction_get_size(transaction_handle_t transaction);
void transaction_destroy(transaction_handle_t transaction);
void transaction_delete_all_items(transaction_handle_t transaction);

View File

@@ -77,10 +77,10 @@ void unlock_obj(_lock_t *lock_ptr);
spinlock_release(&lock); \
} while (0)
#define MB_EVENT_REQ_MASK (EventBits_t)(EV_MASTER_PROCESS_SUCCESS | \
EV_MASTER_ERROR_RESPOND_TIMEOUT | \
EV_MASTER_ERROR_RECEIVE_DATA | \
EV_MASTER_ERROR_EXECUTE_FUNCTION)
#define MB_EVENT_REQ_MASK (EventBits_t)(EV_ERROR_OK | \
EV_ERROR_RESPOND_TIMEOUT | \
EV_ERROR_RECEIVE_DATA | \
EV_ERROR_EXECUTE_FUNCTION)
#define MB_PORT_CHECK_EVENT(event, mask) (event & mask)
#define MB_PORT_CLEAR_EVENT(event, mask) \

View File

@@ -144,7 +144,7 @@ bool mb_port_event_res_take(mb_port_base_t *inst, uint32_t timeout)
{
MB_RETURN_ON_FALSE((inst && inst->event_obj && inst->event_obj->resource_hdl), false, TAG,
"incorrect object handle.");
BaseType_t status = pdTRUE;
BaseType_t status = pdFALSE;
status = xSemaphoreTake(inst->event_obj->resource_hdl, timeout);
ESP_LOGD(TAG, "%s, mb take resource, (%" PRIu32 " ticks).", inst->descr.parent_name, timeout);
return (bool)status;
@@ -185,13 +185,13 @@ mb_err_enum_t mb_port_event_wait_req_finish(mb_port_base_t *inst)
// if we wait for certain event bits but get from poll subset
ESP_LOGE(TAG, "%s, %s: incorrect event set = 0x%x", inst->descr.parent_name, __func__, (int)rcv_event);
}
if (MB_PORT_CHECK_EVENT(rcv_event, EV_MASTER_PROCESS_SUCCESS)) {
if (MB_PORT_CHECK_EVENT(rcv_event, EV_ERROR_OK)) {
err_status = MB_ENOERR;
} else if (MB_PORT_CHECK_EVENT(rcv_event, EV_MASTER_ERROR_RESPOND_TIMEOUT)) {
} else if (MB_PORT_CHECK_EVENT(rcv_event, EV_ERROR_RESPOND_TIMEOUT)) {
err_status = MB_ETIMEDOUT;
} else if (MB_PORT_CHECK_EVENT(rcv_event, EV_MASTER_ERROR_RECEIVE_DATA)) {
} else if (MB_PORT_CHECK_EVENT(rcv_event, EV_ERROR_RECEIVE_DATA)) {
err_status = MB_ERECVDATA;
} else if (MB_PORT_CHECK_EVENT(rcv_event, EV_MASTER_ERROR_EXECUTE_FUNCTION)) {
} else if (MB_PORT_CHECK_EVENT(rcv_event, EV_ERROR_EXECUTE_FUNCTION)) {
err_status = MB_EILLFUNC;
}
} else {

View File

@@ -62,8 +62,16 @@ esp_err_t queue_push(QueueHandle_t queue, void *buf, size_t len, frame_entry_t *
{
frame_entry_t frame_info = {0};
if (!queue) { // || !buf || (len <= 0)
return -1;
if (!queue) { // || !pbuf || (len <= 0)
return ESP_ERR_INVALID_ARG;
}
if (!uxQueueSpacesAvailable(queue)) {
return ESP_ERR_NO_MEM;
}
if (frame) {
frame_info = *frame;
}
if (frame) {
@@ -114,7 +122,7 @@ ssize_t queue_pop(QueueHandle_t queue, void *buf, size_t len, frame_entry_t *fra
}
return len;
err:
return -1;
return ESP_ERR_INVALID_STATE;
}
bool queue_is_empty(QueueHandle_t queue)

View File

@@ -117,7 +117,6 @@ void mb_port_timer_us(mb_port_base_t *inst, uint64_t timeout_us)
atomic_store(&(inst->timer_obj->timer_state), false);
}
inline void mb_port_set_cur_timer_mode(mb_port_base_t *inst, mb_timer_mode_enum_t tmr_mode)
{
atomic_store(&(inst->timer_obj->timer_mode), tmr_mode);

View File

@@ -331,7 +331,7 @@ bool mb_port_ser_send_data(mb_port_base_t *inst, uint8_t *p_ser_frame, uint16_t
count = uart_write_bytes(port_obj->ser_opts.port, p_ser_frame, ser_length);
// Waits while UART sending the packet
esp_err_t status = uart_wait_tx_done(port_obj->ser_opts.port, MB_SERIAL_TX_TOUT_TICKS);
(void)mb_port_event_post(inst, EVENT(EV_FRAME_SENT));
// (void)mb_port_event_post(inst, EVENT(EV_FRAME_SENT)); // the event is sent event after return
ESP_LOGD(TAG, "%s, tx buffer sent: (%d) bytes.", inst->descr.parent_name, (int)count);
MB_RETURN_ON_FALSE((status == ESP_OK), false, TAG, "%s, mb serial sent buffer failure.",
inst->descr.parent_name);

View File

@@ -22,6 +22,7 @@ extern "C" {
#define MB_FRAME_QUEUE_SZ (20)
#define MB_TCP_CHECK_ALIVE_TOUT_MS (20) // check alive timeout in mS
#define MB_RECONNECT_TIME_MS (CONFIG_FMB_TCP_CONNECTION_TOUT_SEC * 1000UL)
#define MB_TCP_KEEP_ALIVE_TOUT_MS (CONFIG_FMB_TCP_KEEP_ALIVE_TOUT_SEC * 1000UL)
#define MB_EVENT_SEND_RCV_TOUT_MS (500)
#define MB_TCP_MBAP_GET_FIELD(buffer, field) ((uint16_t)((buffer[field] << 8U) | buffer[field + 1]))

View File

@@ -302,7 +302,7 @@ int mb_drv_open(void *ctx, mb_uid_info_t addr_info, int flags)
goto err;
}
if (drv_obj->mb_node_open_count > MB_MAX_FDS) {
ESP_LOGD(TAG, "Exceeded maximum node count: %d", drv_obj->mb_node_open_count);
ESP_LOGE(TAG, "Exceeded maximum node count: %d", drv_obj->mb_node_open_count);
goto err;
}
drv_obj->mb_node_open_count++;
@@ -576,7 +576,7 @@ err_t mb_drv_check_node_state(void *ctx, int *fd_ptr, uint32_t timeout_ms)
pnode = mb_drv_get_next_node_from_set(ctx, fd_ptr, &drv_obj->conn_set);
if (pnode && FD_ISSET(pnode->sock_id, &drv_obj->conn_set)) {
uint64_t last_read_div_us = (esp_timer_get_time() - pnode->recv_time);
ESP_LOGD(TAG, "%p, node: %d, sock: %d, IP:%s, check connection timeout = %" PRId64 ", rcv_time: %" PRId64 " %" PRId32,
ESP_LOGD(TAG, "%p, node: %d, sock: %d, IP:%s, check connection timeout = %" PRId64 ", rcv_time: %" PRId64 " %" PRIu32,
ctx, (int)pnode->index, (int)pnode->sock_id, pnode->addr_info.ip_addr_str,
(esp_timer_get_time() / 1000), pnode->recv_time / 1000, timeout_ms);
if (last_read_div_us >= (uint64_t)(timeout_ms * 1000)) {
@@ -633,11 +633,20 @@ void mb_drv_tcp_task(void *ctx)
mb_uid_info_t node_info;
int sock_id = port_accept_connection(drv_obj->listen_sock_fd, &node_info);
if (sock_id) {
int fd = mb_drv_open(drv_obj, node_info, 0);
if (fd < 0) {
ESP_LOGE(TAG, "%p, unable to open node: %s", drv_obj, node_info.ip_addr_str);
if (drv_obj->mb_node_open_count >= MB_MAX_FDS) {
ESP_LOGE(TAG, "%p, unable to accept node, maximum is %u connections.", drv_obj, MB_MAX_FDS);
struct linger sl;
sl.l_onoff = 1; // non-zero value enables linger option in kernel
sl.l_linger = 0; // timeout interval in seconds
setsockopt(sock_id, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
close(sock_id);
} else {
DRIVER_SEND_EVENT(ctx, MB_EVENT_CONNECT, fd);
int fd = mb_drv_open(drv_obj, node_info, 0);
if (fd < 0) {
ESP_LOGE(TAG, "%p, unable to open node: %s", drv_obj, node_info.ip_addr_str);
} else {
DRIVER_SEND_EVENT(ctx, MB_EVENT_CONNECT, fd);
}
}
}
} else {

View File

@@ -5,7 +5,6 @@
*/
#pragma once
//#include <sys/queue.h>
#include <stdatomic.h>
#include "esp_err.h"
@@ -49,6 +48,12 @@ typedef void (*mb_event_handler_fp)(void *ctx, esp_event_base_t base, int32_t id
#define MB_TX_QUEUE_MAX_SIZE (CONFIG_FMB_QUEUE_LENGTH)
#define MB_EVENT_QUEUE_SZ (CONFIG_FMB_QUEUE_LENGTH * MB_TCP_PORT_MAX_CONN)
#if MB_MASTER_TIMEOUT_MS_RESPOND > 1000UL
#define MB_DROP_TRANSACTION_TIME_US (1000UL * (MB_MASTER_TIMEOUT_MS_RESPOND * 2UL))
#else
#define MB_DROP_TRANSACTION_TIME_US (1000UL * 2000UL)
#endif
#define MB_WAIT_DONE_MS (5000)
#define MB_SELECT_WAIT_MS (200)
#define MB_TCP_SEND_TIMEOUT_MS (500)
@@ -90,15 +95,15 @@ typedef struct _port_driver port_driver_t;
#define MB_EVENT_BASE(context) (__extension__( \
{ \
port_driver_t *drv_obj = MB_GET_DRV_PTR(context); \
(drv_obj->loop_name) ? (esp_event_base_t)(drv_obj->loop_name) : "UNK_BASE"; \
port_driver_t *drv_obj = MB_GET_DRV_PTR(context); \
(drv_obj->loop_name) ? (esp_event_base_t)(drv_obj->loop_name) : "UNK_BASE"; \
} \
))
#define MB_ADD_FD(fd, max_fd, fdset) do { \
#define MB_ADD_FD(fd, max_fd, fdset) do { \
if (fd) { \
(max_fd = (fd > max_fd) ? fd : max_fd); \
FD_SET(fd, fdset); \
FD_SET(fd, fdset); \
} \
} while(0)
@@ -106,15 +111,15 @@ typedef struct _port_driver port_driver_t;
// Macro for atomic operations
#define MB_ATOMIC_LOAD(ctx, addr) (__extension__( \
{ \
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx); \
(CRITICAL_LOAD(drv_obj->lock, addr)); \
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx); \
(CRITICAL_LOAD(drv_obj->lock, addr)); \
} \
))
#define MB_ATOMIC_STORE(ctx, addr, val) (__extension__( \
{ \
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx); \
CRITICAL_STORE(drv_obj->lock, addr, val); \
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx); \
CRITICAL_STORE(drv_obj->lock, addr, val); \
} \
))
@@ -122,11 +127,11 @@ typedef struct _port_driver port_driver_t;
// So, the eventfd value keeps last event and its fd.
#define DRIVER_SEND_EVENT(ctx, event, fd) (__extension__( \
{ \
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx); \
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx); \
mb_event_info_t (event_info##__FUNCTION__##__LINE__); \
(event_info##__FUNCTION__##__LINE__).event_id = (int32_t)event; \
(event_info##__FUNCTION__##__LINE__).opt_fd = fd; \
((write_event((void *)drv_obj, &(event_info##__FUNCTION__##__LINE__)) > 0) \
((write_event((void *)drv_obj, &(event_info##__FUNCTION__##__LINE__)) > 0) \
? ((event_info##__FUNCTION__##__LINE__)).event_id : UNDEF_FD); \
} \
))
@@ -196,14 +201,16 @@ typedef struct mb_node_info_s {
typedef enum _mb_sync_event {
MB_SYNC_EVENT_RECV_OK = 0x0001,
MB_SYNC_EVENT_RECV_FAIL = 0x0002,
MB_SYNC_EVENT_SEND_OK = 0x0003,
MB_SYNC_EVENT_READY = 0x002,
MB_SYNC_EVENT_RECV_FAIL = 0x0003,
MB_SYNC_EVENT_SEND_OK = 0x0004,
MB_SYNC_EVENT_SEND_ERR = 0x0005,
MB_SYNC_EVENT_TOUT
} mb_sync_event_t;
typedef enum _mb_status_flags {
MB_FLAG_BLANK = 0x0000,
MB_FLAG_TRANSACTION_DONE = 0x0001,
MB_FLAG_TRANSACTION_READY = 0x0001,
MB_FLAG_DISCONNECTED = 0x0002,
MB_FLAG_CONNECTED = 0x0004,
MB_FLAG_SUSPEND = 0x0008,

View File

@@ -595,7 +595,7 @@ MB_EVENT_HANDLER(mbm_on_send_data)
info_ptr->tid_counter = (uint16_t)(info_ptr->index << 8U);
}
}
drv_obj->event_cbs.mb_sync_event_cb(drv_obj->event_cbs.port_arg, MB_SYNC_EVENT_SEND_OK);
//pdrv_ctx->event_cbs.mb_sync_event_cb(pdrv_ctx->event_cbs.port_arg, MB_SYNC_EVENT_SEND_OK);
mb_drv_lock(ctx);
drv_obj->mb_node_curr = info_ptr;
drv_obj->curr_node_index = info_ptr->index;

View File

@@ -223,7 +223,7 @@ bool mbs_port_tcp_recv_data(mb_port_base_t *inst, uint8_t **frame, uint16_t *len
memcpy(*frame, buf, len);
*length = (uint16_t)len;
status = true;
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", get packet TID: 0x%04" PRIx16 ", %p."),
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", read packet, TID: 0x%04" PRIx16 ", %p."),
port_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, (unsigned)pnode->tid_counter, *frame);
if (ESP_OK != transaction_item_set_state(item, CONFIRMED)) {
@@ -232,9 +232,7 @@ bool mbs_port_tcp_recv_data(mb_port_base_t *inst, uint8_t **frame, uint16_t *len
}
} else {
// Delete expired frames
int frame_cnt = transaction_delete_expired(port_obj->transaction,
port_get_timestamp(),
(1000 * MB_MASTER_TIMEOUT_MS_RESPOND));
int frame_cnt = transaction_delete_expired(port_obj->transaction, port_get_timestamp(), MB_DROP_TRANSACTION_TIME_US);
if (frame_cnt) {
ESP_LOGE(TAG, "Deleted %d expired frames.", frame_cnt);
}
@@ -256,13 +254,16 @@ bool mbs_port_tcp_send_data(mb_port_base_t *inst, uint8_t *frame, uint16_t lengt
transaction_item_handle_t item;
mb_drv_lock(drv_obj);
item = transaction_dequeue(port_obj->transaction, CONFIRMED, NULL);
if (item) {
//item = transaction_dequeue(port_obj->transaction, CONFIRMED, NULL);
//item = transaction_get(port_obj->transaction, tid);
item = transaction_get_first(port_obj->transaction);
if (item && transaction_item_get_state(item) == CONFIRMED) {
uint16_t msg_id = 0;
int node_id = 0;
mb_node_info_t *pnode = NULL;
uint8_t *buf = transaction_item_get_data(item, NULL, &msg_id, &node_id);
if (buf && (tid == msg_id)) {
mb_node_info_t *pnode = mb_drv_get_node(drv_obj, node_id);
pnode = mb_drv_get_node(drv_obj, node_id);
if (pnode && buf && (tid == msg_id)) {
int write_length = mb_drv_write(drv_obj, node_id, frame, length);
if (pnode && write_length) {
frame_sent = true;
@@ -270,22 +271,30 @@ bool mbs_port_tcp_send_data(mb_port_base_t *inst, uint8_t *frame, uint16_t lengt
drv_obj, pnode->index, pnode->sock_id,
pnode->addr_info.node_name_str, (unsigned)tid, (unsigned)msg_id, frame, length);
} else {
ESP_LOGE(TAG, "%p, node: #%d, socket(#%d)[%s], mbs_write fail, TID: 0x%04" PRIx16 ":0x%04" PRIx16 ", %p, len: %d, ",
ESP_LOGE(TAG, "%p, node: #%d, socket(#%d)[%s], modbus write fail, TID: 0x%04" PRIx16 ":0x%04" PRIx16 ", %p, len: %d, ",
drv_obj, pnode->index, pnode->sock_id,
pnode->addr_info.node_name_str, (unsigned)tid, (unsigned)msg_id, frame, length);
}
if (ESP_OK != transaction_item_set_state(item, REPLIED)) {
ESP_LOGE(TAG, "transaction queue set state fail.");
ESP_LOGE(TAG, "transaction queue set reply state fail.");
}
} else {
ESP_LOGE(TAG, "%p, node: #%d, socket(#%d)[%s], could not write transaction, TID: 0x%04" PRIx16 ":0x%04" PRIx16 ", %p, len: %d, ",
drv_obj, pnode->index, pnode->sock_id, pnode->addr_info.node_name_str,
(unsigned)tid, (unsigned)msg_id, frame, length);
// Todo: remove mb_drv_lock(drv_obj);
if (item && transaction_delete_item(port_obj->transaction, item) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued TID:0x%04" PRIx16, tid);
} else {
ESP_LOGD(TAG, "Remove the message TID:0x%04" PRIx16, tid);
}
(void)mb_drv_set_status_flag(drv_obj, MB_FLAG_TRANSACTION_READY);
}
} else {
ESP_LOGE(TAG, "queue can not find the item to send.");
ESP_LOGE(TAG, "can not find the confirmed transaction TID: 0x%04" PRIx16 ", drop the frame", tid);
}
mb_drv_unlock(drv_obj);
if (!frame_sent) {
ESP_LOGE(TAG, "incorrect frame to send.");
}
return frame_sent;
}
@@ -299,6 +308,10 @@ static uint64_t mbs_port_tcp_sync_event(void *inst, mb_sync_event_t sync_event)
mb_port_event_post(inst, EVENT(EV_FRAME_RECEIVED));
break;
case MB_SYNC_EVENT_READY:
mb_port_event_post(inst, EVENT(EV_READY));
break;
case MB_SYNC_EVENT_RECV_FAIL:
mb_port_timer_disable(inst);
mb_port_event_set_err_type(inst, EV_ERROR_RECEIVE_DATA);
@@ -308,6 +321,13 @@ static uint64_t mbs_port_tcp_sync_event(void *inst, mb_sync_event_t sync_event)
case MB_SYNC_EVENT_SEND_OK:
mb_port_event_post(inst, EVENT(EV_FRAME_SENT));
break;
case MB_SYNC_EVENT_SEND_ERR:
mb_port_timer_disable(inst);
mb_port_event_set_err_type(inst, EV_ERROR_RESPOND_TIMEOUT);
mb_port_event_post(inst, EVENT(EV_ERROR_PROCESS));
break;
default:
break;
}
@@ -349,7 +369,9 @@ MB_EVENT_HANDLER(mbs_on_ready)
drv_obj->listen_sock_fd = listen_sock;
// so, all accepted sockets will inherit the keep-alive feature
(void)port_keep_alive(drv_obj->listen_sock_fd);
(void)mb_drv_set_status_flag(drv_obj, MB_FLAG_TRANSACTION_READY);
mb_drv_unlock(ctx);
drv_obj->event_cbs.mb_sync_event_cb(drv_obj->event_cbs.port_arg, MB_SYNC_EVENT_READY);
ESP_LOGI(TAG, "%s %s: fd: %d, bind is done", (char *)base, __func__, (int)event_info->opt_fd);
}
}
@@ -390,15 +412,15 @@ MB_EVENT_HANDLER(mbs_on_recv_data)
transaction_item_handle_t item = NULL;
if (pnode) {
if (!queue_is_empty(pnode->rx_queue)) {
ESP_LOGD(TAG, "%p, node #%d(%d) [%s], receive data ready.", ctx, (int)event_info->opt_fd,
ESP_LOGD(TAG, "%p, node #%d, socket(#%d) [%s], receive data ready.", ctx, (int)event_info->opt_fd,
(int)pnode->sock_id, pnode->addr_info.ip_addr_str);
frame_entry_t frame_entry;
size_t sz = queue_pop(pnode->rx_queue, NULL, MB_BUFFER_SIZE, &frame_entry);
if (sz > MB_TCP_FUNC) {
uint16_t tid_counter = MB_TCP_MBAP_GET_FIELD(frame_entry.buf, MB_TCP_TID);
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", received packet TID: 0x%04" PRIx16 ", %p."),
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", received packet TID: 0x%04" PRIx16 ", frame: 0x%p, %u"),
drv_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, (unsigned)tid_counter, frame_entry.buf);
pnode->addr_info.ip_addr_str, (unsigned)tid_counter, frame_entry.buf, frame_entry.len);
mb_drv_lock(drv_obj);
transaction_message_t msg;
msg.buffer = frame_entry.buf;
@@ -406,44 +428,59 @@ MB_EVENT_HANDLER(mbs_on_recv_data)
msg.msg_id = frame_entry.tid;
msg.node_id = pnode->index;
msg.pnode = pnode;
// Enqueue the transaction, keep time of receiving.
item = transaction_enqueue(port_obj->transaction, &msg, port_get_timestamp());
pnode->tid_counter = tid_counter; // assign the TID from frame to use it on send
mb_drv_unlock(drv_obj);
}
}
mb_drv_lock(drv_obj);
item = transaction_get_first(port_obj->transaction);
if (item) {
//mb_status_flags_t flags = mb_drv_wait_status_flag(port_obj->pdriver, MB_FLAG_TRANSACTION_READY, TRANSACTION_TICKS);
if (transaction_item_get_state(item) == QUEUED) {
if (mb_port_event_res_take(&port_obj->base, TRANSACTION_TICKS)) {
(void)mb_drv_clear_status_flag(drv_obj, MB_FLAG_TRANSACTION_READY);
} else {
if (port_get_timestamp() - transaction_item_get_tick(item) > MB_DROP_TRANSACTION_TIME_US) {
ESP_LOGD(TAG, "Transaction TID:0x%04" PRIx16 " is expired.", transaction_item_get_id(item));
} else {
// postpone the packet processing to next cycle
DRIVER_SEND_EVENT(ctx, MB_EVENT_RECV_DATA, pnode->index);
}
mb_drv_lock(drv_obj);
transaction_delete_expired(port_obj->transaction, port_get_timestamp(), MB_DROP_TRANSACTION_TIME_US);
mb_drv_unlock(drv_obj);
mb_drv_check_suspend_shutdown(ctx);
return;
}
// send receive event to modbus object to get the new data
uint16_t msg_id = 0;
uint64_t tick = 0;
(void)transaction_item_get_data(item, NULL, &msg_id, NULL);
tick = port_get_timestamp();
drv_obj->event_cbs.mb_sync_event_cb(drv_obj->event_cbs.port_arg, MB_SYNC_EVENT_RECV_OK);
transaction_set_tick(port_obj->transaction, msg_id, (transaction_tick_t)tick);
mb_drv_lock(drv_obj);
uint16_t msg_id = 0;
int node_id = 0;
(void)transaction_item_get_data(item, NULL, &msg_id, &node_id);
pnode = mb_drv_get_node(drv_obj, node_id);
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", acknoledged packet TID: 0x%04" PRIx16 ", start transaction."),
drv_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, (unsigned)msg_id);
// transaction_set_tick(port_obj->transaction, msg_id, (transaction_tick_t)port_get_timestamp());
if (ESP_OK == transaction_item_set_state(item, ACKNOWLEDGED)) {
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", acknoledged packet TID: 0x%04" PRIx16 "."),
drv_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, (unsigned)msg_id);
}
mb_drv_unlock(drv_obj);
} else {
if (transaction_item_get_state(item) != TRANSMITTED) {
// Todo: for test removing expired item
transaction_delete_expired(port_obj->transaction, port_get_timestamp(), 1000 * 1000);
// Transaction procesing is ongoing, just delete expired transactions
mb_drv_lock(drv_obj);
transaction_delete_expired(port_obj->transaction, port_get_timestamp(), MB_DROP_TRANSACTION_TIME_US);
mb_drv_unlock(drv_obj);
}
if (MB_FLAG_TRANSACTION_DONE == mb_drv_wait_status_flag(port_obj->drv_obj,
MB_FLAG_TRANSACTION_DONE,
TRANSACTION_TICKS)) {
(void)mb_drv_clear_status_flag(drv_obj, MB_FLAG_TRANSACTION_DONE);
}
// postpone the packet processing
DRIVER_SEND_EVENT(ctx, MB_EVENT_RECV_DATA, pnode->index);
}
} else {
ESP_LOGE(TAG, "%p, no queued items found", ctx);
ESP_LOGD(TAG, "%p, no queued items found", ctx);
}
mb_drv_unlock(drv_obj);
}
mb_drv_check_suspend_shutdown(ctx);
}
@@ -453,59 +490,101 @@ MB_EVENT_HANDLER(mbs_on_send_data)
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx);
mb_event_info_t *event_info = (mb_event_info_t *)data;
mbs_tcp_port_t *port_obj = (mbs_tcp_port_t *)drv_obj->parent;
transaction_item_handle_t item = NULL;
frame_entry_t frame_entry = {0};
ESP_LOGD(TAG, "%s %s: fd: %d", (char *)base, __func__, (int)event_info->opt_fd);
mb_node_info_t *pnode = mb_drv_get_node(drv_obj, event_info->opt_fd);
if (pnode && !queue_is_empty(pnode->tx_queue)) {
frame_entry_t frame_entry;
// pop the frame entry, keep the buffer
// Pop the frame entry, keep the buffer
size_t sz = queue_pop(pnode->tx_queue, NULL, MB_BUFFER_SIZE, &frame_entry);
if (!sz || (MB_GET_NODE_STATE(pnode) < MB_SOCK_STATE_CONNECTED)) {
ESP_LOGE(TAG, "%p, "MB_NODE_FMT(", is invalid, drop data."),
ctx, (int)pnode->index, (int)pnode->sock_id, pnode->addr_info.ip_addr_str);
return;
}
uint16_t tid = MB_TCP_MBAP_GET_FIELD(frame_entry.buf, MB_TCP_TID);
pnode->error = 0;
int ret = port_write_poll(pnode, frame_entry.buf, sz, MB_TCP_SEND_TIMEOUT_MS);
if (ret < 0) {
ESP_LOGE(TAG, "%p, " MB_NODE_FMT(", send data failure, err(errno) = %d(%u)."),
ctx, (int)pnode->index, (int)pnode->sock_id,
pnode->addr_info.ip_addr_str, (int)ret, (unsigned)errno);
DRIVER_SEND_EVENT(ctx, MB_EVENT_ERROR, pnode->index);
pnode->error = ret;
} else {
pnode->error = 0;
if (tid != pnode->tid_counter) {
ESP_LOGE(TAG, "%p, " MB_NODE_FMT(", send incorrect frame TID:0x%04" PRIx16 "!= 0x%04" PRIx16 ", %d (bytes), errno %d"),
ctx, (int)pnode->index, (int)pnode->sock_id,
pnode->addr_info.ip_addr_str, pnode->tid_counter, tid, (int)ret, (unsigned)errno);
if (sz) {
uint16_t tid = MB_TCP_MBAP_GET_FIELD(frame_entry.buf, MB_TCP_TID);
// Try to find actual transaction for current TID,
// if not found just ignore the frame as expired
//item = transaction_get(port_obj->transaction, tid);
item = transaction_get_first(port_obj->transaction);
if (item && pnode) {
uint16_t msg_id = 0;
int node_id = 0;
(void)transaction_item_get_data(item, NULL, &msg_id, &node_id);
// Check if the TID is equal to the current received TID for this node.
// If not, means the slave was not able to process the previous transaction on time.
if ((node_id != pnode->index) || (tid != msg_id) || (tid != pnode->tid_counter) || (MB_GET_NODE_STATE(pnode) < MB_SOCK_STATE_CONNECTED)) {
mb_drv_lock(drv_obj);
// drv_obj->event_cbs.mb_sync_event_cb(drv_obj->event_cbs.port_arg, MB_SYNC_EVENT_SEND_ERR);
if (transaction_delete(port_obj->transaction, tid) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued TID:0x%04" PRIx16, (int)tid);
} else {
ESP_LOGD(TAG, "Remove the message TID:0x%04" PRIx16, (int)tid);
}
(void)mb_drv_set_status_flag(drv_obj, MB_FLAG_TRANSACTION_READY);
mb_drv_unlock(drv_obj);
uint64_t tick = (transaction_tick_t)transaction_item_get_tick(item);
uint64_t time_div_us = (esp_timer_get_time() - tick);
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", frame TID:0x%04" PRIx16 "!=0x%04" PRIx16 ", slave is busy."),
ctx, (int)pnode->index, (int)pnode->sock_id,
pnode->addr_info.ip_addr_str, pnode->tid_counter, tid);
ESP_LOGW(TAG, "%p, " MB_NODE_FMT(", handling time [ms]: %" PRIu64 ", exceeds slave response time in master."),
ctx, (int)pnode->index, (int)pnode->sock_id,
pnode->addr_info.ip_addr_str, (time_div_us / 1000));
} else {
mb_drv_lock(drv_obj);
int ret = port_write_poll(pnode, frame_entry.buf, sz, MB_TCP_SEND_TIMEOUT_MS);
if (ret < 0) {
ESP_LOGE(TAG, "%p, " MB_NODE_FMT(", send data failure, err(errno) = %d(%u)."),
ctx, (int)pnode->index, (int)pnode->sock_id,
pnode->addr_info.ip_addr_str, (int)ret, (unsigned)errno);
DRIVER_SEND_EVENT(ctx, MB_EVENT_ERROR, pnode->index);
pnode->error = ret;
} else {
pnode->error = 0;
ESP_LOG_BUFFER_HEX_LEVEL("SENT", frame_entry.buf, ret, ESP_LOG_DEBUG);
}
(void)mb_drv_set_status_flag(drv_obj, MB_FLAG_TRANSACTION_READY);
//drv_obj->event_cbs.mb_sync_event_cb(drv_obj->event_cbs.port_arg, MB_SYNC_EVENT_SEND_OK);
esp_err_t err = transaction_set_state(port_obj->transaction, tid, TRANSMITTED);
if (err == ESP_OK) {
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", sent packet TID: 0x%04" PRIx16 ", %p."),
drv_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, tid, frame_entry.buf);
} else {
ESP_LOGE(TAG, "%p, " MB_NODE_FMT(", transaction set state fail for TID: 0x%04" PRIx16 ", %p."),
drv_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, tid, frame_entry.buf);
}
if (transaction_delete_item(port_obj->transaction, item) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued TID:0x%04" PRIx16, tid);
} else {
ESP_LOGD(TAG, "Remove the message TID:0x%04" PRIx16, tid);
}
pnode->send_time = esp_timer_get_time();
pnode->send_counter = (pnode->send_counter < (USHRT_MAX - 1)) ? (pnode->send_counter + 1) : 0;
mb_drv_unlock(drv_obj);
}
} else {
ESP_LOGD(TAG, "%p, " MB_NODE_FMT(", send data successful: TID:0x%04" PRIx16 ":0x%04" PRIx16 ", %d (bytes), errno %d"),
ctx, (int)pnode->index, (int)pnode->sock_id,
pnode->addr_info.ip_addr_str, pnode->tid_counter, tid, (int)ret, (unsigned)errno);
// Note: the transaction processing time is increased proportional to number of connected Masters.
// If it is still needed to connect several number of Masters simultaneously,
// then the slave response time option needs to be increased in Masters.
ESP_LOGE(TAG, "%p, " MB_NODE_FMT(", transaction not found for TID: 0x%04" PRIx16 ", drop data %p."),
ctx, (int)pnode->index, (int)pnode->sock_id,
pnode->addr_info.ip_addr_str, tid, pnode);
//drv_obj->event_cbs.mb_sync_event_cb(drv_obj->event_cbs.port_arg, MB_SYNC_EVENT_SEND_ERR);
(void)mb_drv_set_status_flag(drv_obj, MB_FLAG_TRANSACTION_READY);
}
ESP_LOG_BUFFER_HEX_LEVEL("SENT", frame_entry.buf, ret, ESP_LOG_DEBUG);
}
(void)mb_drv_set_status_flag(drv_obj, MB_FLAG_TRANSACTION_DONE);
drv_obj->event_cbs.mb_sync_event_cb(drv_obj->event_cbs.port_arg, MB_SYNC_EVENT_SEND_OK);
mb_drv_lock(drv_obj);
transaction_set_state(port_obj->transaction, tid, TRANSMITTED);
if (transaction_delete(port_obj->transaction, tid) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued TID:0x%04" PRIx16, tid);
} else {
ESP_LOGD(TAG, "Remove the message TID:0x%04" PRIx16, tid);
ESP_LOGE(TAG, "%p, "MB_NODE_FMT(", frame is invalid, drop data."),
ctx, (int)pnode->index, (int)pnode->sock_id, pnode->addr_info.ip_addr_str);
}
free(frame_entry.buf);
pnode->send_time = esp_timer_get_time();
pnode->send_counter = (pnode->send_counter < (USHRT_MAX - 1)) ? (pnode->send_counter + 1) : 0;
mb_drv_unlock(drv_obj);
}
mb_drv_check_suspend_shutdown(ctx);
}
MB_EVENT_HANDLER(mbs_on_error)
{
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx);
mb_event_info_t *event_info = (mb_event_info_t *)data;
mbs_tcp_port_t *port_obj = __containerof(drv_obj->parent, mbs_tcp_port_t, base);
ESP_LOGD(TAG, "%s %s: fd: %d", (char *)base, __func__, (int)event_info->opt_fd);
mb_node_info_t *pnode = mb_drv_get_node(drv_obj, event_info->opt_fd);
if (!pnode) {
@@ -513,11 +592,18 @@ MB_EVENT_HANDLER(mbs_on_error)
return;
}
// Check if the node is not alive for timeout
int ret = mb_drv_check_node_state(drv_obj, (int *)&event_info->opt_fd, MB_TCP_EVENT_LOOP_TICK_MS);
int ret = mb_drv_check_node_state(drv_obj, (int *)&event_info->opt_fd, MB_EVENT_SEND_RCV_TOUT_MS);
if ((ret != ERR_OK) && (ret != ERR_TIMEOUT)) {
ESP_LOGE(TAG, "Node: #%d is not alive, err= %d", (int)event_info->opt_fd, ret);
ESP_LOGE(TAG, "%p, " MB_NODE_FMT(", communication fail, err= %d"),
port_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, (int)ret);
mb_drv_lock(drv_obj);
// delete all queued transactions for the node to be closed.
(void)transaction_delete_by_node_id(port_obj->transaction, event_info->opt_fd);
mb_drv_unlock(drv_obj);
mb_drv_close(drv_obj, event_info->opt_fd);
}
mb_drv_check_suspend_shutdown(ctx);
}
MB_EVENT_HANDLER(mbs_on_close)
@@ -525,6 +611,7 @@ MB_EVENT_HANDLER(mbs_on_close)
mb_event_info_t *event_info = (mb_event_info_t *)data;
ESP_LOGD(TAG, "%s %s, fd: %d", (char *)base, __func__, (int)event_info->opt_fd);
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx);
mbs_tcp_port_t *port_obj = __containerof(drv_obj->parent, mbs_tcp_port_t, base);
mb_node_info_t *pnode =NULL;
// if close all sockets event is received
if (event_info->opt_fd < 0)
@@ -545,6 +632,9 @@ MB_EVENT_HANDLER(mbs_on_close)
pnode = mb_drv_get_node(drv_obj, event_info->opt_fd);
if (pnode && (MB_GET_NODE_STATE(pnode) >= MB_SOCK_STATE_OPENED)) {
if ((pnode->sock_id < 0) && FD_ISSET(pnode->sock_id, &drv_obj->open_set)) {
mb_drv_lock(drv_obj);
(void)transaction_delete_by_node_id(port_obj->transaction, event_info->opt_fd);
mb_drv_unlock(drv_obj);
mb_drv_close(ctx, event_info->opt_fd);
}
}
@@ -555,14 +645,21 @@ MB_EVENT_HANDLER(mbs_on_close)
MB_EVENT_HANDLER(mbs_on_timeout)
{
// Slave timeout triggered
//mb_event_info_t *event_info = (mb_event_info_t *)data;
port_driver_t *drv_obj = MB_GET_DRV_PTR(ctx);
mbs_tcp_port_t *port_obj = __containerof(drv_obj->parent, mbs_tcp_port_t, base);
static int curr_fd = 0;
ESP_LOGD(TAG, "%s %s: fd: %d, %d", (char *)base, __func__, (int)curr_fd, drv_obj->node_conn_count);
mb_node_info_t *pnode = mb_drv_get_node(drv_obj, curr_fd);
ESP_LOGD(TAG, "%s %s: fd: %d, count: %d", (char *)base, __func__, (int)curr_fd, drv_obj->node_conn_count);
mb_drv_check_suspend_shutdown(ctx);
int ret = mb_drv_check_node_state(drv_obj, &curr_fd, MB_RECONNECT_TIME_MS);
int ret = mb_drv_check_node_state(drv_obj, &curr_fd, MB_TCP_KEEP_ALIVE_TOUT_MS);
if ((ret != ERR_OK) && (ret != ERR_TIMEOUT)) {
ESP_LOGE(TAG, "Node: %d, connection lost, err= %d", curr_fd, ret);
ESP_LOGE(TAG, "%p, " MB_NODE_FMT(", connection lost, err=%d, drop connection."),
port_obj, pnode->index, pnode->sock_id,
pnode->addr_info.ip_addr_str, (int)ret);
mb_drv_lock(drv_obj);
(void)transaction_delete_by_node_id(port_obj->transaction, curr_fd);
mb_drv_unlock(drv_obj);
mb_drv_close(drv_obj, curr_fd);
}
if ((curr_fd + 1) >= (drv_obj->node_conn_count)) {

View File

@@ -198,11 +198,9 @@ int port_read_packet(mb_node_info_t *info_ptr)
if (ret < 0) {
info_ptr->recv_err = ret;
return ret;
}
if (ret != MB_TCP_UID) {
ESP_LOGD(TAG, "Socket (#%d)(%s), fail to read modbus header. ret=%d",
info_ptr->sock_id, info_ptr->addr_info.ip_addr_str, ret);
} else if (ret != MB_TCP_UID) {
ESP_LOGD(TAG, "node #%d, Socket (#%d)(%s), fail to read modbus header, err=%d",
info_ptr->fd, info_ptr->sock_id, info_ptr->addr_info.ip_addr_str, ret);
info_ptr->recv_err = ERR_VAL;
return ERR_VAL;
}
@@ -294,7 +292,7 @@ int port_keep_alive(int sock)
return -1;
}
// Set count of probes before timing out
optval = CONFIG_FMB_TCP_CONNECTION_TOUT_SEC;
optval = CONFIG_FMB_TCP_KEEP_ALIVE_TOUT_SEC;
ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval));
if (ret != 0) {
ESP_LOGD(TAG, "Sock %d, set keep alive probes count fail., err = (%d).", sock, ret);
@@ -917,7 +915,7 @@ int port_accept_connection(int listen_sock_id, mb_uid_info_t *info_ptr)
// Make sure ss_family is valid
abort();
}
ESP_LOGI(TAG, "Socket (#%d), accept client connection from address[port]: %s[%d]", (int)sock_id, addr_str, info_ptr->port);
ESP_LOGI(TAG, "Socket (#%d), accept client connection from address[port]: %s[%u]", (int)sock_id, addr_str, info_ptr->port);
paddr = strdup(addr_str);
if (paddr) {
info_ptr->fd = sock_id;

View File

@@ -616,7 +616,6 @@ bool mb_port_adapter_send_data(mb_port_base_t *inst, uint8_t addrets, uint8_t *f
(void)mb_port_event_post(inst, EVENT(EV_FRAME_SENT));
ESP_LOGD(TAG, "%s, tx completed, flags = 0x%04x.", inst->descr.parent_name, (int)flags);
ret = true;
}
else
{