mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-29 18:28:24 +02:00
Merge branch 'bugfix/fix_mqtt5_flow_control' into 'master'
mqtt5: Fix flow control will regard the DUP packet and not consider PUBCOMP packet See merge request espressif/esp-mqtt!158
This commit is contained in:
@ -36,7 +36,8 @@ typedef struct {
|
||||
mqtt5_topic_alias_handle_t peer_topic_alias;
|
||||
} mqtt5_config_storage_t;
|
||||
|
||||
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client);
|
||||
|
@ -16,17 +16,21 @@ static char *esp_mqtt5_client_get_topic_alias(mqtt5_topic_alias_handle_t topic_a
|
||||
static void esp_mqtt5_client_delete_topic_alias(mqtt5_topic_alias_handle_t topic_alias_handle);
|
||||
static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_property_new, const mqtt5_user_property_handle_t user_property_old);
|
||||
|
||||
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client)
|
||||
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
int msg_type = mqtt5_get_type(client->mqtt_state.outbound_message->data);
|
||||
if (msg_type == MQTT_MSG_TYPE_PUBLISH) {
|
||||
int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data);
|
||||
if (msg_qos > 0) {
|
||||
client->send_publish_packet_count ++;
|
||||
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
|
||||
}
|
||||
}
|
||||
bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data);
|
||||
int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data);
|
||||
if ((msg_dup == false) && (msg_qos > 0)) {
|
||||
client->send_publish_packet_count ++;
|
||||
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
|
||||
}
|
||||
}
|
||||
|
||||
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->send_publish_packet_count > 0) {
|
||||
client->send_publish_packet_count --;
|
||||
ESP_LOGD(TAG, "Receive (%d) qos > 0 publish packet with ack", client->send_publish_packet_count);
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,7 +55,6 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
|
||||
client->event.data_len = msg_data_len;
|
||||
client->event.total_data_len = msg_data_len;
|
||||
client->event.current_data_offset = 0;
|
||||
client->send_publish_packet_count --;
|
||||
}
|
||||
}
|
||||
|
||||
@ -291,7 +294,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
|
||||
}
|
||||
|
||||
/* Flow control to check PUBLISH(No PUBACK or PUBCOMP received) packet sent count(Only record QoS1 and QoS2)*/
|
||||
if (client->send_publish_packet_count >= client->mqtt5_config->server_resp_property_info.receive_maximum) {
|
||||
if (client->send_publish_packet_count > client->mqtt5_config->server_resp_property_info.receive_maximum) {
|
||||
ESP_LOGE(TAG, "Client send more than %d QoS1 and QoS2 PUBLISH packet without no ack", client->mqtt5_config->server_resp_property_info.receive_maximum);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
@ -709,6 +709,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
|
||||
client->send_publish_packet_count = 0;
|
||||
return ESP_OK;
|
||||
}
|
||||
#endif
|
||||
@ -943,7 +944,9 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
|
||||
return ESP_FAIL;
|
||||
}
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_flow_control(client);
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_increment_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
return ESP_OK;
|
||||
}
|
||||
@ -1367,6 +1370,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_decrement_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
@ -1413,6 +1421,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_decrement_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
|
Reference in New Issue
Block a user