#ifndef ASYNC_MQTT5_AUTOCONNECT_STREAM_HPP #define ASYNC_MQTT5_AUTOCONNECT_STREAM_HPP #include #include #include #include #include #include #include #include namespace async_mqtt5::detail { namespace asio = boost::asio; using error_code = boost::system::error_code; template < typename StreamType, typename StreamContext = std::monostate > class autoconnect_stream { public: using stream_type = StreamType; using stream_context_type = StreamContext; using executor_type = typename stream_type::executor_type; private: using stream_ptr = std::shared_ptr; executor_type _stream_executor; async_mutex _conn_mtx; asio::steady_timer _read_timer, _connect_timer; endpoints _endpoints; stream_ptr _stream_ptr; stream_context_type& _stream_context; template friend class reconnect_op; template friend class read_op; template friend class write_op; template friend class disconnect_op; public: autoconnect_stream( const executor_type& ex, stream_context_type& context ) : _stream_executor(ex), _conn_mtx(_stream_executor), _read_timer(_stream_executor), _connect_timer(_stream_executor), _endpoints(_stream_executor, _connect_timer), _stream_context(context) { replace_next_layer(construct_next_layer()); } using next_layer_type = stream_type; next_layer_type& next_layer() { return *_stream_ptr; } const next_layer_type& next_layer() const { return *_stream_ptr; } executor_type get_executor() const noexcept { return _stream_executor; } void brokers(std::string hosts, uint16_t default_port) { _endpoints.brokers(std::move(hosts), default_port); } bool is_open() const noexcept { return lowest_layer(*_stream_ptr).is_open(); } void open() { error_code ec; lowest_layer(*_stream_ptr).open(asio::ip::tcp::v4(), ec); } void cancel() { error_code ec; lowest_layer(*_stream_ptr).cancel(ec); } void close() { error_code ec; shutdown(asio::ip::tcp::socket::shutdown_both); lowest_layer(*_stream_ptr).close(ec); _connect_timer.cancel(); } void shutdown(asio::ip::tcp::socket::shutdown_type what) { error_code ec; lowest_layer(*_stream_ptr).shutdown(what, ec); } bool was_connected() const { error_code ec; lowest_layer(*_stream_ptr).remote_endpoint(ec); return ec == boost::system::errc::success; } template decltype(auto) async_read_some( const BufferType& buffer, duration wait_for, CompletionToken&& token ) { auto initiation = [this]( auto handler, const BufferType& buffer, duration wait_for ) { read_op { *this, std::move(handler) } .perform(buffer, wait_for); }; return asio::async_initiate( std::move(initiation), token, buffer, wait_for ); } template decltype(auto) async_write( const BufferType& buffer, CompletionToken&& token ) { auto initiation = [this]( auto handler, const BufferType& buffer ) { write_op { *this, std::move(handler) }.perform(buffer); }; return asio::async_initiate( std::move(initiation), token, buffer ); } private: stream_ptr construct_next_layer() const { stream_ptr sptr; if constexpr (has_tls_context) sptr = std::make_shared( _stream_executor, _stream_context.tls_context() ); else sptr = std::make_shared(_stream_executor); error_code ec; lowest_layer(*sptr).set_option( asio::socket_base::reuse_address(true), ec ); return sptr; } void replace_next_layer(stream_ptr sptr) { // close() will cancel all outstanding async operations on // _stream_ptr; cancelling posts operation_aborted to handlers // but handlers will be executed after std::exchange below; // handlers should therefore treat (operation_aborted && is_open()) // equivalent to try_again. if (_stream_ptr) close(); std::exchange(_stream_ptr, std::move(sptr)); } template decltype(auto) async_reconnect(stream_ptr s, CompletionToken&& token) { auto initiation = [this](auto handler, stream_ptr s) { reconnect_op { *this, std::move(handler) }.perform(s); }; return asio::async_initiate( std::move(initiation), token, s ); } }; } // end namespace async_mqtt5::detail #endif // !ASYNC_MQTT5_AUTOCONNECT_STREAM_HPP