Merge pull request #8 from jedi7/master

Add const in funtions arguments
This commit is contained in:
Tuan PM
2017-05-19 23:32:40 +07:00
committed by GitHub
3 changed files with 144 additions and 52 deletions

View File

@@ -9,10 +9,5 @@
COMPONENT_ADD_INCLUDEDIRS := include
#COMPONENT_PRIV_INCLUDEDIRS :=
COMPONENT_SRCDIRS := .
#EXTRA_CFLAGS := -DICACHE_RODATA_ATTR
CFLAGS += -Wno-error=implicit-function-declaration -Wno-error=format= -DHAVE_CONFIG_H
include $(IDF_PATH)/make/component_common.mk

61
include/mqtt.h Executable file → Normal file
View File

@@ -7,29 +7,46 @@
#include "ringbuf.h"
#if defined(CONFIG_MQTT_SECURITY_ON) // ENABLE MQTT OVER SSL
#include "openssl/ssl.h"
typedef struct mqtt_client mqtt_client;
typedef struct mqtt_event_data_t mqtt_event_data_t;
#define ClientRead(buf,num) SSL_read(client->ssl, buf, num)
#define ClientWrite(buf,num) SSL_write(client->ssl, buf, num)
/**
* \return True on connect success, false on error
*/
typedef bool (* mqtt_connect_callback)(mqtt_client *client);
/**
*/
typedef void (* mqtt_disconnect_callback)(mqtt_client *client);
/**
* \param[out] buffer Pointer to buffer to fill
* \param[in] len Number of bytes to read
* \param[in] timeout_ms Time to wait for completion, or 0 for no timeout
* \return Number of bytes read, less than 0 on error
*/
typedef int (* mqtt_read_callback)(mqtt_client *client, void *buffer, int len, int timeout_ms);
/**
* \param[in] buffer Pointer to buffer to write
* \param[in] len Number of bytes to write
* \param[in] timeout_ms Time to wait for completion, or 0 for no timeout
* \return Number of bytes written, less than 0 on error
*/
typedef int (* mqtt_write_callback)(mqtt_client *client, const void *buffer, int len, int timeout_ms);
typedef void (* mqtt_event_callback)(mqtt_client *client, mqtt_event_data_t *event_data);
#else
typedef struct mqtt_settings {
mqtt_connect_callback connect_cb;
mqtt_disconnect_callback disconnect_cb;
#define ClientRead(buf,num) read(client->socket, buf, num)
#define ClientWrite(buf,num) write(client->socket, buf, num)
#endif
mqtt_read_callback read_cb;
mqtt_write_callback write_cb;
mqtt_event_callback connected_cb;
mqtt_event_callback disconnected_cb; // unused
mqtt_event_callback reconnect_cb; // unused
typedef void (* mqtt_callback)(void *, void *);
typedef struct {
mqtt_callback connected_cb;
mqtt_callback disconnected_cb;
mqtt_callback reconnect_cb;
mqtt_callback subscribe_cb;
mqtt_callback publish_cb;
mqtt_callback data_cb;
mqtt_event_callback subscribe_cb;
mqtt_event_callback publish_cb;
mqtt_event_callback data_cb;
char host[CONFIG_MQTT_MAX_HOST_LEN];
uint32_t port;
@@ -73,7 +90,7 @@ typedef struct mqtt_state_t
int pending_publish_qos;
} mqtt_state_t;
typedef struct {
typedef struct mqtt_client {
int socket;
#if defined(CONFIG_MQTT_SECURITY_ON) // ENABLE MQTT OVER SSL
@@ -92,7 +109,7 @@ typedef struct {
mqtt_client *mqtt_start(mqtt_settings *mqtt_info);
void mqtt_stop();
void mqtt_task(void *pvParameters);
void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos);
void mqtt_publish(mqtt_client* client, char *topic, char *data, int len, int qos, int retain);
void mqtt_detroy();
void mqtt_subscribe(mqtt_client *client, const char *topic, uint8_t qos);
void mqtt_publish(mqtt_client* client, const char *topic, const char *data, int len, int qos, int retain);
void mqtt_destroy();
#endif

130
mqtt.c Executable file → Normal file
View File

@@ -13,6 +13,9 @@
#include "lwip/sockets.h"
#include "lwip/dns.h"
#include "lwip/netdb.h"
#if defined(CONFIG_MQTT_SECURITY_ON)
#include "openssl/ssl.h"
#endif
#include "ringbuf.h"
#include "mqtt.h"
@@ -46,7 +49,6 @@ static void mqtt_queue(mqtt_client *client)
static bool client_connect(mqtt_client *client)
{
int ret;
struct sockaddr_in remote_ip;
while (1) {
@@ -111,8 +113,7 @@ static bool client_connect(mqtt_client *client)
}
mqtt_info("Start SSL connect..");
ret = SSL_connect(client->ssl);
if (!ret) {
if (!SSL_connect(client->ssl)) {
mqtt_error("SSL Connect FAILED");
goto failed4;
}
@@ -176,6 +177,65 @@ void closeclient(mqtt_client *client)
#endif
}
int mqtt_read(mqtt_client *client, void *buffer, int len, int timeout_ms)
{
int result;
struct timeval tv;
if (timeout_ms > 0) {
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
while (tv.tv_usec > 1000 * 1000) {
tv.tv_usec -= 1000 * 1000;
tv.tv_sec++;
}
setsockopt(client->socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
}
#if defined(CONFIG_MQTT_SECURITY_ON)
result = SSL_read(client->ssl, buffer, len);
#else
result = read(client->socket, buffer, len);
#endif
if (timeout_ms > 0) {
tv.tv_sec = 0;
tv.tv_usec = 0;
setsockopt(client->socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
}
return result;
}
int mqtt_write(mqtt_client *client, const void *buffer, int len, int timeout_ms)
{
int result;
struct timeval tv;
if (timeout_ms > 0) {
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
while (tv.tv_usec > 1000 * 1000) {
tv.tv_usec -= 1000 * 1000;
tv.tv_sec++;
}
setsockopt(client->socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
}
#if defined(CONFIG_MQTT_SECURITY_ON)
result = SSL_write(client->ssl, buffer, len)
#else
result = write(client->socket, buffer, len);
#endif
if (timeout_ms > 0) {
tv.tv_sec = 0;
tv.tv_usec = 0;
setsockopt(client->socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
}
return result;
}
/*
* mqtt_connect
* input - client
@@ -184,12 +244,7 @@ void closeclient(mqtt_client *client)
static bool mqtt_connect(mqtt_client *client)
{
int write_len, read_len, connect_rsp_code;
struct timeval tv;
tv.tv_sec = 10; /* 30 Secs Timeout */
tv.tv_usec = 0; // Not init'ing this can cause strange errors
setsockopt(client->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
mqtt_msg_init(&client->mqtt_state.mqtt_connection,
client->mqtt_state.out_buffer,
@@ -203,16 +258,17 @@ static bool mqtt_connect(mqtt_client *client)
client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
write_len = ClientWrite(
write_len = client->settings->write_cb(client,
client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
client->mqtt_state.outbound_message->length, 0);
if(write_len < 0) {
mqtt_error("Writing failed: %d", errno);
return false;
}
mqtt_info("Reading MQTT CONNECT response message");
read_len = ClientRead(client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE);
tv.tv_sec = 0; /* No timeout */
setsockopt(client->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 10 * 1000);
if (read_len < 0) {
mqtt_error("Error network response");
@@ -243,7 +299,8 @@ static bool mqtt_connect(mqtt_client *client)
void mqtt_sending_task(void *pvParameters)
{
mqtt_client *client = (mqtt_client *)pvParameters;
uint32_t msg_len, send_len;
uint32_t msg_len;
int send_len;
mqtt_info("mqtt_sending_task");
while (1) {
@@ -258,7 +315,11 @@ void mqtt_sending_task(void *pvParameters)
rb_read(&client->send_rb, client->mqtt_state.out_buffer, send_len);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.out_buffer);
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.out_buffer, send_len);
ClientWrite(client->mqtt_state.out_buffer, send_len);
send_len = client->settings->write_cb(client, client->mqtt_state.out_buffer, send_len, 0);
if(send_len < 0) {
mqtt_info("Write error: %d", errno);
break; // TODO is this right handling?
}
//TODO: Check sending type, to callback publish message
msg_len -= send_len;
@@ -275,9 +336,9 @@ void mqtt_sending_task(void *pvParameters)
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
mqtt_info("Sending pingreq");
ClientWrite(
client->settings->write_cb(client,
client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
client->mqtt_state.outbound_message->length, 0);
}
}
}
@@ -315,7 +376,11 @@ void deliver_publish(mqtt_client *client, uint8_t *message, int length)
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length)
break;
len_read = ClientRead(client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE);
len_read = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0);
if(len_read < 0) {
mqtt_info("Read error: %d", errno);
break;
}
client->mqtt_state.message_length_read += len_read;
} while (1);
@@ -329,11 +394,17 @@ void mqtt_start_receive_schedule(mqtt_client *client)
while (1) {
read_len = ClientRead(client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE);
read_len = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0);
mqtt_info("Read len %d", read_len);
if (read_len == 0)
break;
if (read_len < 0) {
// ECONNRESET for example
mqtt_info("Read error %d", errno);
break;
}
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
@@ -419,11 +490,11 @@ void mqtt_task(void *pvParameters)
mqtt_client *client = (mqtt_client *)pvParameters;
while (1) {
client_connect(client);
client->settings->connect_cb(client);
mqtt_info("Connected to server %s:%d", client->settings->host, client->settings->port);
if (!mqtt_connect(client)) {
closeclient(client);
client->settings->disconnect_cb(client);
continue;
//return;
}
@@ -436,7 +507,7 @@ void mqtt_task(void *pvParameters)
mqtt_info("mqtt_start_receive_schedule");
mqtt_start_receive_schedule(client);
closeclient(client);
client->settings->disconnect_cb(client);
vTaskDelete(xMqttSendingTask);
vTaskDelay(1000 / portTICK_RATE_MS);
@@ -484,6 +555,15 @@ mqtt_client *mqtt_start(mqtt_settings *settings)
client->socket = -1;
if (!client->settings->connect_cb)
client->settings->connect_cb = client_connect;
if (!client->settings->disconnect_cb)
client->settings->disconnect_cb = closeclient;
if (!client->settings->read_cb)
client->settings->read_cb = mqtt_read;
if (!client->settings->write_cb)
client->settings->write_cb = mqtt_write;
#if defined(CONFIG_MQTT_SECURITY_ON) // ENABLE MQTT OVER SSL
client->ctx = NULL;
client->ssl = NULL;
@@ -509,7 +589,7 @@ mqtt_client *mqtt_start(mqtt_settings *settings)
return client;
}
void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos)
void mqtt_subscribe(mqtt_client *client, const char *topic, uint8_t qos)
{
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
@@ -518,7 +598,7 @@ void mqtt_subscribe(mqtt_client *client, char *topic, uint8_t qos)
mqtt_queue(client);
}
void mqtt_publish(mqtt_client* client, char *topic, char *data, int len, int qos, int retain)
void mqtt_publish(mqtt_client* client, const char *topic, const char *data, int len, int qos, int retain)
{
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,