mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-11-01 15:42:19 +01:00
Fixed formatting for all files to comply with idf style formats
This commit is contained in:
409
mqtt_client.c
409
mqtt_client.c
@@ -27,8 +27,7 @@
|
||||
|
||||
static const char *TAG = "MQTT_CLIENT";
|
||||
|
||||
typedef struct mqtt_state
|
||||
{
|
||||
typedef struct mqtt_state {
|
||||
mqtt_connect_info_t *connect_info;
|
||||
uint8_t *in_buffer;
|
||||
uint8_t *out_buffer;
|
||||
@@ -268,9 +267,9 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
client->mqtt_state.pending_msg_id);
|
||||
|
||||
write_len = esp_transport_write(client->transport,
|
||||
(char *)client->mqtt_state.outbound_message->data,
|
||||
client->mqtt_state.outbound_message->length,
|
||||
client->config->network_timeout_ms);
|
||||
(char *)client->mqtt_state.outbound_message->data,
|
||||
client->mqtt_state.outbound_message->length,
|
||||
client->config->network_timeout_ms);
|
||||
if (write_len < 0) {
|
||||
ESP_LOGE(TAG, "Writing failed, errno= %d", errno);
|
||||
return ESP_FAIL;
|
||||
@@ -283,7 +282,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
read_len = mqtt_message_receive(client, client->config->network_timeout_ms);
|
||||
if (read_len <= 0) {
|
||||
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len);
|
||||
return ESP_FAIL;
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) {
|
||||
@@ -292,24 +291,24 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
}
|
||||
connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
|
||||
switch (connect_rsp_code) {
|
||||
case CONNECTION_ACCEPTED:
|
||||
ESP_LOGD(TAG, "Connected");
|
||||
return ESP_OK;
|
||||
case CONNECTION_REFUSE_PROTOCOL:
|
||||
ESP_LOGW(TAG, "Connection refused, bad protocol");
|
||||
return ESP_FAIL;
|
||||
case CONNECTION_REFUSE_SERVER_UNAVAILABLE:
|
||||
ESP_LOGW(TAG, "Connection refused, server unavailable");
|
||||
return ESP_FAIL;
|
||||
case CONNECTION_REFUSE_BAD_USERNAME:
|
||||
ESP_LOGW(TAG, "Connection refused, bad username or password");
|
||||
return ESP_FAIL;
|
||||
case CONNECTION_REFUSE_NOT_AUTHORIZED:
|
||||
ESP_LOGW(TAG, "Connection refused, not authorized");
|
||||
return ESP_FAIL;
|
||||
default:
|
||||
ESP_LOGW(TAG, "Connection refused, Unknow reason");
|
||||
return ESP_FAIL;
|
||||
case CONNECTION_ACCEPTED:
|
||||
ESP_LOGD(TAG, "Connected");
|
||||
return ESP_OK;
|
||||
case CONNECTION_REFUSE_PROTOCOL:
|
||||
ESP_LOGW(TAG, "Connection refused, bad protocol");
|
||||
return ESP_FAIL;
|
||||
case CONNECTION_REFUSE_SERVER_UNAVAILABLE:
|
||||
ESP_LOGW(TAG, "Connection refused, server unavailable");
|
||||
return ESP_FAIL;
|
||||
case CONNECTION_REFUSE_BAD_USERNAME:
|
||||
ESP_LOGW(TAG, "Connection refused, bad username or password");
|
||||
return ESP_FAIL;
|
||||
case CONNECTION_REFUSE_NOT_AUTHORIZED:
|
||||
ESP_LOGW(TAG, "Connection refused, not authorized");
|
||||
return ESP_FAIL;
|
||||
default:
|
||||
ESP_LOGW(TAG, "Connection refused, Unknow reason");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
return ESP_OK;
|
||||
}
|
||||
@@ -491,7 +490,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
||||
}
|
||||
|
||||
if (puri.field_data[UF_PORT].len) {
|
||||
client->config->port = strtol((const char*)(uri + puri.field_data[UF_PORT].off), NULL, 10);
|
||||
client->config->port = strtol((const char *)(uri + puri.field_data[UF_PORT].off), NULL, 10);
|
||||
}
|
||||
|
||||
char *user_info = create_string(uri + puri.field_data[UF_USERINFO].off, puri.field_data[UF_USERINFO].len);
|
||||
@@ -513,9 +512,9 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
||||
static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
int write_len = esp_transport_write(client->transport,
|
||||
(char *)client->mqtt_state.outbound_message->data,
|
||||
client->mqtt_state.outbound_message->length,
|
||||
client->config->network_timeout_ms);
|
||||
(char *)client->mqtt_state.outbound_message->data,
|
||||
client->mqtt_state.outbound_message->length,
|
||||
client->config->network_timeout_ms);
|
||||
// client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
if (write_len <= 0) {
|
||||
ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno);
|
||||
@@ -564,7 +563,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
|
||||
|
||||
// get payload
|
||||
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
|
||||
if(msg_data_len > 0 && msg_data == NULL) {
|
||||
if (msg_data_len > 0 && msg_data == NULL) {
|
||||
ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@@ -575,7 +574,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
|
||||
|
||||
post_data_event:
|
||||
ESP_LOGD(TAG, "Get data len= %d, topic len=%d, total_data: %d offset: %d", msg_data_len, msg_topic_len,
|
||||
client->event.total_data_len, msg_data_offset);
|
||||
client->event.total_data_len, msg_data_offset);
|
||||
client->event.event_id = MQTT_EVENT_DATA;
|
||||
client->event.data = msg_data_len > 0 ? msg_data : NULL;
|
||||
client->event.data_len = msg_data_len;
|
||||
@@ -589,7 +588,7 @@ post_data_event:
|
||||
size_t buf_len = client->mqtt_state.in_buffer_length;
|
||||
esp_transport_handle_t transport = esp_transport_get_payload_transport_handle(client->transport);
|
||||
|
||||
msg_data = (char*)client->mqtt_state.in_buffer;
|
||||
msg_data = (char *)client->mqtt_state.in_buffer;
|
||||
msg_topic = NULL;
|
||||
msg_topic_len = 0;
|
||||
msg_data_offset += msg_data_len;
|
||||
@@ -706,7 +705,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
/* any further reading only the underlying payload */
|
||||
t = esp_transport_get_payload_transport_handle(t);
|
||||
if ((client->mqtt_state.in_buffer_read_len == 1) ||
|
||||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
|
||||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
|
||||
do {
|
||||
/*
|
||||
* Read the "remaining length" part of mqtt packet fixed header. It
|
||||
@@ -758,8 +757,8 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
}
|
||||
}
|
||||
int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8;
|
||||
topic_len |= client->mqtt_state.in_buffer[fixed_header_len+1];
|
||||
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer)>0?2:0);
|
||||
topic_len |= client->mqtt_state.in_buffer[fixed_header_len + 1];
|
||||
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer) > 0 ? 2 : 0);
|
||||
ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len);
|
||||
if (client->mqtt_state.in_buffer_length < total_len) {
|
||||
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
|
||||
@@ -822,75 +821,73 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
|
||||
switch (msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "Subscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "UnSubscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
if (deliver_publish(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
|
||||
switch (msg_type) {
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "Subscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "UnSubscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
if (deliver_publish(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (msg_qos == 1) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
} else if (msg_qos == 2) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (msg_qos == 1) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
else if (msg_qos == 2) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
client->mqtt_state.in_buffer_read_len = 0;
|
||||
@@ -901,9 +898,9 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
|
||||
{
|
||||
// decode queued data
|
||||
client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id,
|
||||
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
|
||||
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
|
||||
// set duplicate flag for QoS-2 message
|
||||
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) {
|
||||
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 2) {
|
||||
mqtt_set_dup(client->mqtt_state.outbound_message->data);
|
||||
}
|
||||
|
||||
@@ -940,111 +937,111 @@ static void esp_mqtt_task(void *pv)
|
||||
while (client->run) {
|
||||
MQTT_API_LOCK(client);
|
||||
switch ((int)client->state) {
|
||||
case MQTT_STATE_INIT:
|
||||
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
|
||||
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
case MQTT_STATE_INIT:
|
||||
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
|
||||
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
|
||||
if (client->transport == NULL) {
|
||||
ESP_LOGE(TAG, "There are no transport");
|
||||
client->run = false;
|
||||
}
|
||||
if (client->transport == NULL) {
|
||||
ESP_LOGE(TAG, "There are no transport");
|
||||
client->run = false;
|
||||
}
|
||||
|
||||
if (esp_transport_connect(client->transport,
|
||||
if (esp_transport_connect(client->transport,
|
||||
client->config->host,
|
||||
client->config->port,
|
||||
client->config->network_timeout_ms) < 0) {
|
||||
ESP_LOGE(TAG, "Error transport connect");
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
}
|
||||
ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port);
|
||||
if (esp_mqtt_connect(client, client->config->network_timeout_ms) != ESP_OK) {
|
||||
ESP_LOGI(TAG, "Error MQTT Connected");
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
}
|
||||
client->event.event_id = MQTT_EVENT_CONNECTED;
|
||||
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
|
||||
client->state = MQTT_STATE_CONNECTED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
client->refresh_connection_tick = platform_tick_get_ms();
|
||||
|
||||
ESP_LOGE(TAG, "Error transport connect");
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
case MQTT_STATE_CONNECTED:
|
||||
// receive and process data
|
||||
if (mqtt_process_receive(client) == ESP_FAIL) {
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
}
|
||||
}
|
||||
ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port);
|
||||
if (esp_mqtt_connect(client, client->config->network_timeout_ms) != ESP_OK) {
|
||||
ESP_LOGI(TAG, "Error MQTT Connected");
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
}
|
||||
client->event.event_id = MQTT_EVENT_CONNECTED;
|
||||
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
|
||||
client->state = MQTT_STATE_CONNECTED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
client->refresh_connection_tick = platform_tick_get_ms();
|
||||
|
||||
// resend all non-transmitted messages first
|
||||
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
|
||||
if (item) {
|
||||
if (mqtt_resend_queued(client, item) == ESP_OK) {
|
||||
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
||||
}
|
||||
break;
|
||||
case MQTT_STATE_CONNECTED:
|
||||
// receive and process data
|
||||
if (mqtt_process_receive(client) == ESP_FAIL) {
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
}
|
||||
|
||||
// resend all non-transmitted messages first
|
||||
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
|
||||
if (item) {
|
||||
if (mqtt_resend_queued(client, item) == ESP_OK) {
|
||||
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
||||
}
|
||||
// resend other "transmitted" messages after 1s
|
||||
} else if (platform_tick_get_ms() - last_retransmit > 1000) {
|
||||
last_retransmit = platform_tick_get_ms();
|
||||
item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick);
|
||||
if (item && (last_retransmit - msg_tick > 1000)) {
|
||||
mqtt_resend_queued(client, item);
|
||||
}
|
||||
} else if (platform_tick_get_ms() - last_retransmit > 1000) {
|
||||
last_retransmit = platform_tick_get_ms();
|
||||
item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick);
|
||||
if (item && (last_retransmit - msg_tick > 1000)) {
|
||||
mqtt_resend_queued(client, item);
|
||||
}
|
||||
}
|
||||
|
||||
if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) {
|
||||
//No ping resp from last ping => Disconnected
|
||||
if(client->wait_for_ping_resp){
|
||||
ESP_LOGE(TAG, "No PING_RESP, disconnected");
|
||||
esp_mqtt_abort_connection(client);
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
}
|
||||
if (esp_mqtt_client_ping(client) == ESP_FAIL) {
|
||||
ESP_LOGE(TAG, "Can't send ping, disconnected");
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
} else {
|
||||
client->wait_for_ping_resp = true;
|
||||
}
|
||||
ESP_LOGD(TAG, "PING sent");
|
||||
}
|
||||
|
||||
if (client->config->refresh_connection_after_ms &&
|
||||
platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) {
|
||||
ESP_LOGD(TAG, "Refreshing the connection...");
|
||||
if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) {
|
||||
//No ping resp from last ping => Disconnected
|
||||
if (client->wait_for_ping_resp) {
|
||||
ESP_LOGE(TAG, "No PING_RESP, disconnected");
|
||||
esp_mqtt_abort_connection(client);
|
||||
client->state = MQTT_STATE_INIT;
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
}
|
||||
if (esp_mqtt_client_ping(client) == ESP_FAIL) {
|
||||
ESP_LOGE(TAG, "Can't send ping, disconnected");
|
||||
esp_mqtt_abort_connection(client);
|
||||
break;
|
||||
} else {
|
||||
client->wait_for_ping_resp = true;
|
||||
}
|
||||
ESP_LOGD(TAG, "PING sent");
|
||||
}
|
||||
|
||||
//Delete mesaage after 30 senconds
|
||||
int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||
client->mqtt_state.pending_msg_count -= deleted;
|
||||
if (client->mqtt_state.pending_msg_count < 0) {
|
||||
client->mqtt_state.pending_msg_count = 0;
|
||||
}
|
||||
//
|
||||
outbox_cleanup(client->outbox, OUTBOX_MAX_SIZE);
|
||||
if (client->config->refresh_connection_after_ms &&
|
||||
platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) {
|
||||
ESP_LOGD(TAG, "Refreshing the connection...");
|
||||
esp_mqtt_abort_connection(client);
|
||||
client->state = MQTT_STATE_INIT;
|
||||
}
|
||||
|
||||
//Delete mesaage after 30 senconds
|
||||
int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||
client->mqtt_state.pending_msg_count -= deleted;
|
||||
if (client->mqtt_state.pending_msg_count < 0) {
|
||||
client->mqtt_state.pending_msg_count = 0;
|
||||
}
|
||||
//
|
||||
outbox_cleanup(client->outbox, OUTBOX_MAX_SIZE);
|
||||
break;
|
||||
case MQTT_STATE_WAIT_TIMEOUT:
|
||||
|
||||
if (!client->config->auto_reconnect) {
|
||||
client->run = false;
|
||||
client->state = MQTT_STATE_UNKNOWN;
|
||||
break;
|
||||
case MQTT_STATE_WAIT_TIMEOUT:
|
||||
|
||||
if (!client->config->auto_reconnect) {
|
||||
client->run = false;
|
||||
client->state = MQTT_STATE_UNKNOWN;
|
||||
break;
|
||||
}
|
||||
if (platform_tick_get_ms() - client->reconnect_tick > client->wait_timeout_ms) {
|
||||
client->state = MQTT_STATE_INIT;
|
||||
client->reconnect_tick = platform_tick_get_ms();
|
||||
ESP_LOGD(TAG, "Reconnecting...");
|
||||
break;
|
||||
}
|
||||
MQTT_API_UNLOCK(client);
|
||||
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
|
||||
client->wait_timeout_ms / 2 / portTICK_RATE_MS);
|
||||
// continue the while loop insted of break, as the mutex is unlocked
|
||||
continue;
|
||||
}
|
||||
if (platform_tick_get_ms() - client->reconnect_tick > client->wait_timeout_ms) {
|
||||
client->state = MQTT_STATE_INIT;
|
||||
client->reconnect_tick = platform_tick_get_ms();
|
||||
ESP_LOGD(TAG, "Reconnecting...");
|
||||
break;
|
||||
}
|
||||
MQTT_API_UNLOCK(client);
|
||||
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
|
||||
client->wait_timeout_ms / 2 / portTICK_RATE_MS);
|
||||
// continue the while loop insted of break, as the mutex is unlocked
|
||||
continue;
|
||||
}
|
||||
MQTT_API_UNLOCK(client);
|
||||
if (MQTT_STATE_CONNECTED == client->state) {
|
||||
@@ -1072,17 +1069,17 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
|
||||
return ESP_FAIL;
|
||||
}
|
||||
#if MQTT_CORE_SELECTION_ENABLED
|
||||
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
|
||||
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
|
||||
ESP_LOGE(TAG, "Error create mqtt task");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
|
||||
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
|
||||
ESP_LOGE(TAG, "Error create mqtt task");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
#else
|
||||
ESP_LOGD(TAG, "Core selection disabled");
|
||||
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
|
||||
ESP_LOGE(TAG, "Error create mqtt task");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
ESP_LOGD(TAG, "Core selection disabled");
|
||||
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
|
||||
ESP_LOGE(TAG, "Error create mqtt task");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
#endif
|
||||
return ESP_OK;
|
||||
}
|
||||
@@ -1109,7 +1106,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
||||
ESP_LOGE(TAG, "Error sending disconnect message");
|
||||
}
|
||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||
|
||||
|
||||
client->run = false;
|
||||
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
|
||||
client->state = MQTT_STATE_UNKNOWN;
|
||||
@@ -1247,7 +1244,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
current_data += data_sent;
|
||||
|
||||
if (remaining_len > 0) {
|
||||
mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection;
|
||||
mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection;
|
||||
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
|
||||
if (connection->message.fragmented_msg_data_offset) {
|
||||
// asked to enqueue oversized message (first time only)
|
||||
@@ -1255,7 +1252,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
connection->message.fragmented_msg_total_length = 0;
|
||||
if (qos > 0) {
|
||||
// internally enqueue all big messages, as they dont fit 'pending msg' structure
|
||||
mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len);
|
||||
mqtt_enqueue_oversized(client, (uint8_t *)current_data, remaining_len);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user