Files
mqtt5/include/async_mqtt5/impl/autoconnect_stream.hpp

199 lines
4.9 KiB
C++

#ifndef ASYNC_MQTT5_AUTOCONNECT_STREAM_HPP
#define ASYNC_MQTT5_AUTOCONNECT_STREAM_HPP
#include <utility>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5/detail/async_mutex.hpp>
#include <async_mqtt5/detail/async_traits.hpp>
#include <async_mqtt5/impl/endpoints.hpp>
#include <async_mqtt5/impl/read_op.hpp>
#include <async_mqtt5/impl/reconnect_op.hpp>
#include <async_mqtt5/impl/write_op.hpp>
namespace async_mqtt5::detail {
namespace asio = boost::asio;
using error_code = boost::system::error_code;
template <
typename StreamType,
typename StreamContext = std::monostate
>
class autoconnect_stream {
public:
using self_type = autoconnect_stream<StreamType, StreamContext>;
using stream_type = StreamType;
using stream_context_type = StreamContext;
using executor_type = typename stream_type::executor_type;
private:
using stream_ptr = std::shared_ptr<stream_type>;
executor_type _stream_executor;
async_mutex _conn_mtx;
asio::steady_timer _read_timer, _connect_timer;
endpoints _endpoints;
stream_ptr _stream_ptr;
stream_context_type& _stream_context;
template <typename Owner, typename Handler>
friend class read_op;
template <typename Owner, typename Handler>
friend class write_op;
template <typename Stream>
friend class reconnect_op;
template <typename Owner, typename DisconnectContext>
friend class disconnect_op;
public:
autoconnect_stream(
const executor_type& ex, stream_context_type& context
) :
_stream_executor(ex),
_conn_mtx(_stream_executor),
_read_timer(_stream_executor), _connect_timer(_stream_executor),
_endpoints(_stream_executor, _connect_timer),
_stream_context(context)
{
replace_next_layer(construct_next_layer());
}
using next_layer_type = stream_type;
next_layer_type& next_layer() {
return *_stream_ptr;
}
const next_layer_type& next_layer() const {
return *_stream_ptr;
}
executor_type get_executor() const noexcept {
return _stream_executor;
}
void brokers(std::string hosts, uint16_t default_port) {
_endpoints.brokers(std::move(hosts), default_port);
}
bool is_open() const noexcept {
return lowest_layer(*_stream_ptr).is_open();
}
void open() {
error_code ec;
lowest_layer(*_stream_ptr).open(asio::ip::tcp::v4(), ec);
}
void cancel() {
error_code ec;
lowest_layer(*_stream_ptr).cancel(ec);
}
void close() {
error_code ec;
shutdown(asio::ip::tcp::socket::shutdown_both);
lowest_layer(*_stream_ptr).close(ec);
_connect_timer.cancel();
}
void shutdown(asio::ip::tcp::socket::shutdown_type what) {
error_code ec;
lowest_layer(*_stream_ptr).shutdown(what, ec);
}
bool was_connected() const {
error_code ec;
lowest_layer(*_stream_ptr).remote_endpoint(ec);
return ec == boost::system::errc::success;
}
template <typename BufferType, typename CompletionToken>
decltype(auto) async_read_some(
const BufferType& buffer, duration wait_for, CompletionToken&& token
) {
using Signature = void (error_code, size_t);
auto initiation = [](
auto handler, self_type& self,
const BufferType& buffer, duration wait_for
) {
read_op { self, std::move(handler) }.perform(buffer, wait_for);
};
return asio::async_initiate<CompletionToken, Signature>(
initiation, token, std::ref(*this), buffer, wait_for
);
}
template <typename BufferType, typename CompletionToken>
decltype(auto) async_write(
const BufferType& buffer, CompletionToken&& token
) {
using Signature = void (error_code, size_t);
auto initiation = [](
auto handler, self_type& self, const BufferType& buffer
) {
write_op { self, std::move(handler) }.perform(buffer);
};
return asio::async_initiate<CompletionToken, Signature>(
initiation, token, std::ref(*this), buffer
);
}
private:
stream_ptr construct_next_layer() const {
stream_ptr sptr;
if constexpr (has_tls_context<StreamContext>)
sptr = std::make_shared<stream_type>(
_stream_executor, _stream_context.tls_context()
);
else
sptr = std::make_shared<stream_type>(_stream_executor);
error_code ec;
lowest_layer(*sptr).set_option(
asio::socket_base::reuse_address(true), ec
);
return sptr;
}
void replace_next_layer(stream_ptr sptr) {
// close() will cancel all outstanding async operations on
// _stream_ptr; cancelling posts operation_aborted to handlers
// but handlers will be executed after std::exchange below;
// handlers should therefore treat (operation_aborted && is_open())
// equivalent to try_again.
if (_stream_ptr)
close();
std::exchange(_stream_ptr, std::move(sptr));
}
template <typename CompletionToken>
decltype(auto) async_reconnect(stream_ptr s, CompletionToken&& token) {
using Signature = void (error_code);
auto initiation = [](auto handler, self_type& self, stream_ptr s) {
reconnect_op { self, std::move(handler) }.perform(s);
};
return asio::async_initiate<CompletionToken, Signature>(
initiation, token, std::ref(*this), s
);
}
};
} // end namespace async_mqtt5::detail
#endif // !ASYNC_MQTT5_AUTOCONNECT_STREAM_HPP