forked from espressif/esp-mqtt
add msg_len
This commit is contained in:
29
mqtt.c
29
mqtt.c
@@ -2,7 +2,7 @@
|
|||||||
* @Author: Tuan PM
|
* @Author: Tuan PM
|
||||||
* @Date: 2016-09-10 09:33:06
|
* @Date: 2016-09-10 09:33:06
|
||||||
* @Last Modified by: Tuan PM
|
* @Last Modified by: Tuan PM
|
||||||
* @Last Modified time: 2017-02-15 13:02:53
|
* @Last Modified time: 2017-02-15 13:11:53
|
||||||
*/
|
*/
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include "freertos/FreeRTOS.h"
|
#include "freertos/FreeRTOS.h"
|
||||||
@@ -33,6 +33,7 @@ static int resolve_dns(const char *host, struct sockaddr_in *ip) {
|
|||||||
}
|
}
|
||||||
static void mqtt_queue(mqtt_client *client)
|
static void mqtt_queue(mqtt_client *client)
|
||||||
{
|
{
|
||||||
|
int msg_len;
|
||||||
while (rb_available(&client->send_rb) < client->mqtt_state.outbound_message->length) {
|
while (rb_available(&client->send_rb) < client->mqtt_state.outbound_message->length) {
|
||||||
xQueueReceive(client->xSendingQueue, &msg_len, 1000 / portTICK_RATE_MS);
|
xQueueReceive(client->xSendingQueue, &msg_len, 1000 / portTICK_RATE_MS);
|
||||||
rb_read(&client->send_rb, client->mqtt_state.out_buffer, msg_len);
|
rb_read(&client->send_rb, client->mqtt_state.out_buffer, msg_len);
|
||||||
@@ -47,8 +48,8 @@ static bool client_connect(mqtt_client *client)
|
|||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
struct sockaddr_in remote_ip;
|
struct sockaddr_in remote_ip;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
||||||
bzero(&remote_ip, sizeof(struct sockaddr_in));
|
bzero(&remote_ip, sizeof(struct sockaddr_in));
|
||||||
remote_ip.sin_family = AF_INET;
|
remote_ip.sin_family = AF_INET;
|
||||||
@@ -77,7 +78,7 @@ static bool client_connect(mqtt_client *client)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
client->socket = socket(PF_INET, SOCK_STREAM, 0);
|
client->socket = socket(PF_INET, SOCK_STREAM, 0);
|
||||||
if (client->socket == -1) {
|
if (client->socket == -1) {
|
||||||
mqtt_error("Failed to create socket");
|
mqtt_error("Failed to create socket");
|
||||||
goto failed2;
|
goto failed2;
|
||||||
@@ -90,7 +91,7 @@ static bool client_connect(mqtt_client *client)
|
|||||||
client->settings->port,
|
client->settings->port,
|
||||||
remote_ip.sin_port);
|
remote_ip.sin_port);
|
||||||
|
|
||||||
|
|
||||||
if (connect(client->socket, (struct sockaddr *)(&remote_ip), sizeof(struct sockaddr)) != 00) {
|
if (connect(client->socket, (struct sockaddr *)(&remote_ip), sizeof(struct sockaddr)) != 00) {
|
||||||
mqtt_error("Connect failed");
|
mqtt_error("Connect failed");
|
||||||
goto failed3;
|
goto failed3;
|
||||||
@@ -119,11 +120,11 @@ static bool client_connect(mqtt_client *client)
|
|||||||
mqtt_info("Connected!");
|
mqtt_info("Connected!");
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
//failed5:
|
//failed5:
|
||||||
// SSL_shutdown(client->ssl);
|
// SSL_shutdown(client->ssl);
|
||||||
|
|
||||||
#if defined(CONFIG_MQTT_SECURITY_ON)
|
#if defined(CONFIG_MQTT_SECURITY_ON)
|
||||||
failed4:
|
failed4:
|
||||||
SSL_free(client->ssl);
|
SSL_free(client->ssl);
|
||||||
client->ssl = NULL;
|
client->ssl = NULL;
|
||||||
@@ -134,12 +135,12 @@ static bool client_connect(mqtt_client *client)
|
|||||||
client->socket = -1;
|
client->socket = -1;
|
||||||
|
|
||||||
failed2:
|
failed2:
|
||||||
#if defined(CONFIG_MQTT_SECURITY_ON)
|
#if defined(CONFIG_MQTT_SECURITY_ON)
|
||||||
SSL_CTX_free(client->ctx);
|
SSL_CTX_free(client->ctx);
|
||||||
|
|
||||||
failed1:
|
failed1:
|
||||||
client->ctx = NULL;
|
client->ctx = NULL;
|
||||||
#endif
|
#endif
|
||||||
vTaskDelay(1000 / portTICK_RATE_MS);
|
vTaskDelay(1000 / portTICK_RATE_MS);
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -151,7 +152,7 @@ static bool client_connect(mqtt_client *client)
|
|||||||
void closeclient(mqtt_client *client)
|
void closeclient(mqtt_client *client)
|
||||||
{
|
{
|
||||||
|
|
||||||
#if defined(CONFIG_MQTT_SECURITY_ON)
|
#if defined(CONFIG_MQTT_SECURITY_ON)
|
||||||
if (client->ssl != NULL)
|
if (client->ssl != NULL)
|
||||||
{
|
{
|
||||||
SSL_shutdown(client->ssl);
|
SSL_shutdown(client->ssl);
|
||||||
@@ -166,13 +167,13 @@ void closeclient(mqtt_client *client)
|
|||||||
client->socket = -1;
|
client->socket = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined(CONFIG_MQTT_SECURITY_ON)
|
#if defined(CONFIG_MQTT_SECURITY_ON)
|
||||||
if (client->ctx != NULL)
|
if (client->ctx != NULL)
|
||||||
{
|
{
|
||||||
SSL_CTX_free(client->ctx);
|
SSL_CTX_free(client->ctx);
|
||||||
client->ctx = NULL;
|
client->ctx = NULL;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
@@ -419,7 +420,7 @@ void mqtt_task(void *pvParameters)
|
|||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
client_connect(client);
|
client_connect(client);
|
||||||
|
|
||||||
mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port);
|
mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port);
|
||||||
if (!mqtt_connect(client)) {
|
if (!mqtt_connect(client)) {
|
||||||
closeclient(client);
|
closeclient(client);
|
||||||
@@ -434,7 +435,7 @@ void mqtt_task(void *pvParameters)
|
|||||||
|
|
||||||
mqtt_info("mqtt_start_receive_schedule");
|
mqtt_info("mqtt_start_receive_schedule");
|
||||||
mqtt_start_receive_schedule(client);
|
mqtt_start_receive_schedule(client);
|
||||||
|
|
||||||
closeclient(client);
|
closeclient(client);
|
||||||
vTaskDelete(xMqttSendingTask);
|
vTaskDelete(xMqttSendingTask);
|
||||||
vTaskDelay(1000 / portTICK_RATE_MS);
|
vTaskDelay(1000 / portTICK_RATE_MS);
|
||||||
|
Reference in New Issue
Block a user