forked from espressif/esp-mqtt
Merge branch 'bugfix/publish_before_connect' into 'master'
Fixed crash due to publishing before successful transport connect. See merge request espressif/esp-mqtt!48
This commit is contained in:
@@ -53,6 +53,7 @@ build_with_idf:
|
|||||||
- cit_add_ssh_key "${GITLAB_KEY}"
|
- cit_add_ssh_key "${GITLAB_KEY}"
|
||||||
- git clone "${IDF_REPO}"
|
- git clone "${IDF_REPO}"
|
||||||
- cd esp-idf
|
- cd esp-idf
|
||||||
|
- tools/idf_tools.py --non-interactive install && eval "$(tools/idf_tools.py --non-interactive export)" || exit 1
|
||||||
- ./tools/ci/mirror-submodule-update.sh
|
- ./tools/ci/mirror-submodule-update.sh
|
||||||
- export IDF_PATH=$(pwd)
|
- export IDF_PATH=$(pwd)
|
||||||
- cd $IDF_PATH/components/mqtt/esp-mqtt
|
- cd $IDF_PATH/components/mqtt/esp-mqtt
|
||||||
|
@@ -316,11 +316,13 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
|||||||
{
|
{
|
||||||
int write_len, read_len, connect_rsp_code;
|
int write_len, read_len, connect_rsp_code;
|
||||||
client->wait_for_ping_resp = false;
|
client->wait_for_ping_resp = false;
|
||||||
mqtt_msg_init(&client->mqtt_state.mqtt_connection,
|
|
||||||
client->mqtt_state.out_buffer,
|
|
||||||
client->mqtt_state.out_buffer_length);
|
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
|
||||||
client->mqtt_state.connect_info);
|
client->mqtt_state.connect_info);
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Connect message cannot be created");
|
||||||
|
return ESP_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
|
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
|
||||||
client->mqtt_state.outbound_message->length);
|
client->mqtt_state.outbound_message->length);
|
||||||
@@ -410,6 +412,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
|||||||
}
|
}
|
||||||
client->api_lock = xSemaphoreCreateMutex();
|
client->api_lock = xSemaphoreCreateMutex();
|
||||||
if (!client->api_lock) {
|
if (!client->api_lock) {
|
||||||
|
free(client->event.error_handle);
|
||||||
free(client);
|
free(client);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -532,6 +535,10 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
|||||||
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
|
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
|
||||||
client->status_bits = xEventGroupCreate();
|
client->status_bits = xEventGroupCreate();
|
||||||
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
|
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
|
||||||
|
|
||||||
|
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer,
|
||||||
|
client->mqtt_state.out_buffer_length);
|
||||||
|
|
||||||
return client;
|
return client;
|
||||||
_mqtt_init_failed:
|
_mqtt_init_failed:
|
||||||
esp_mqtt_client_destroy(client);
|
esp_mqtt_client_destroy(client);
|
||||||
@@ -976,6 +983,10 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
|||||||
} else if (msg_qos == 2) {
|
} else if (msg_qos == 2) {
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||||
}
|
}
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created");
|
||||||
|
return ESP_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
if (msg_qos == 1 || msg_qos == 2) {
|
if (msg_qos == 1 || msg_qos == 2) {
|
||||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||||
@@ -997,12 +1008,22 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
|||||||
case MQTT_MSG_TYPE_PUBREC:
|
case MQTT_MSG_TYPE_PUBREC:
|
||||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Publish response message PUBREL cannot be created");
|
||||||
|
return ESP_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
|
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
|
||||||
mqtt_write_data(client);
|
mqtt_write_data(client);
|
||||||
break;
|
break;
|
||||||
case MQTT_MSG_TYPE_PUBREL:
|
case MQTT_MSG_TYPE_PUBREL:
|
||||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created");
|
||||||
|
return ESP_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
mqtt_write_data(client);
|
mqtt_write_data(client);
|
||||||
break;
|
break;
|
||||||
case MQTT_MSG_TYPE_PUBCOMP:
|
case MQTT_MSG_TYPE_PUBCOMP:
|
||||||
@@ -1247,6 +1268,10 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
|||||||
if(client->state == MQTT_STATE_CONNECTED) {
|
if(client->state == MQTT_STATE_CONNECTED) {
|
||||||
// Notify the broker we are disconnecting
|
// Notify the broker we are disconnecting
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
|
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Disconnect message cannot be created");
|
||||||
|
return ESP_FAIL;
|
||||||
|
}
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error sending disconnect message");
|
ESP_LOGE(TAG, "Error sending disconnect message");
|
||||||
}
|
}
|
||||||
@@ -1267,6 +1292,10 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
|||||||
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
|
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
|
||||||
{
|
{
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Ping message cannot be created");
|
||||||
|
return ESP_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error sending ping");
|
ESP_LOGE(TAG, "Error sending ping");
|
||||||
@@ -1287,6 +1316,10 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
|||||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic, qos,
|
topic, qos,
|
||||||
&client->mqtt_state.pending_msg_id);
|
&client->mqtt_state.pending_msg_id);
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Subscribe message cannot be created");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
@@ -1315,6 +1348,10 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic,
|
topic,
|
||||||
&client->mqtt_state.pending_msg_id);
|
&client->mqtt_state.pending_msg_id);
|
||||||
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
ESP_LOGE(TAG, "Unubscribe message cannot be created");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
|
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
|
||||||
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
@@ -1336,6 +1373,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||||
{
|
{
|
||||||
uint16_t pending_msg_id = 0;
|
uint16_t pending_msg_id = 0;
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
/* Acceptable publish messages:
|
/* Acceptable publish messages:
|
||||||
data == NULL, len == 0: publish null message
|
data == NULL, len == 0: publish null message
|
||||||
@@ -1352,10 +1390,10 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
qos, retain,
|
qos, retain,
|
||||||
&pending_msg_id);
|
&pending_msg_id);
|
||||||
|
|
||||||
if (publish_msg == NULL) {
|
if (publish_msg->length == 0) {
|
||||||
ESP_LOGE(TAG, "Publish message cannot be created");
|
ESP_LOGE(TAG, "Publish message cannot be created");
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
/* We have to set as pending all the qos>0 messages */
|
/* We have to set as pending all the qos>0 messages */
|
||||||
if (qos > 0) {
|
if (qos > 0) {
|
||||||
@@ -1387,6 +1425,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
esp_mqtt_abort_connection(client);
|
esp_mqtt_abort_connection(client);
|
||||||
|
ret = -1;
|
||||||
goto cannot_publish;
|
goto cannot_publish;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1438,7 +1477,7 @@ cannot_publish:
|
|||||||
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
|
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
|
||||||
}
|
}
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return 0;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user