diff --git a/CHANGELOG.md b/CHANGELOG.md index 45ac731b..43a3eaea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,19 @@ Version 84: * bind_handler allows placeholders * Add consuming_buffers::get +WebSocket: + +* WebSocket read optimization + +API Changes: + +* websocket::stream::read_buffer_size is removed + +Actions Required: + +* Remove calls websocket::stream::read_buffer_size +* Use read_some and write_some instead of read_frame and write_frame + -------------------------------------------------------------------------------- Version 83: diff --git a/doc/qbk/08_design/3_websocket_zaphoyd.qbk b/doc/qbk/08_design/3_websocket_zaphoyd.qbk index 80fbe793..d08ca6f6 100644 --- a/doc/qbk/08_design/3_websocket_zaphoyd.qbk +++ b/doc/qbk/08_design/3_websocket_zaphoyd.qbk @@ -328,7 +328,7 @@ without bringing it all into memory. [``` template void - write_frame(bool fin, + write_some(bool fin, ConstBufferSequence const& buffers); ```] [ diff --git a/example/server-framework/main.cpp b/example/server-framework/main.cpp index fcaffa8c..b89582b2 100644 --- a/example/server-framework/main.cpp +++ b/example/server-framework/main.cpp @@ -22,6 +22,7 @@ #include "file_service.hpp" #include "ws_upgrade_service.hpp" +#include #include #include diff --git a/example/websocket-server-async/CMakeLists.txt b/example/websocket-server-async/CMakeLists.txt index b0f5ad18..4aa0846b 100644 --- a/example/websocket-server-async/CMakeLists.txt +++ b/example/websocket-server-async/CMakeLists.txt @@ -1,10 +1,12 @@ # Part of Beast GroupSources(include/beast beast) +GroupSources(example/common common) GroupSources(example/websocket-server-async "/") add_executable (websocket-server-async ${BEAST_INCLUDES} + ${COMMON_INCLUDES} websocket_server_async.cpp ) diff --git a/include/beast/websocket/detail/frame.hpp b/include/beast/websocket/detail/frame.hpp index ccc653ac..3cc1b3e7 100644 --- a/include/beast/websocket/detail/frame.hpp +++ b/include/beast/websocket/detail/frame.hpp @@ -96,14 +96,14 @@ enum class opcode : std::uint8_t // Contents of a WebSocket frame header struct frame_header { - opcode op; - bool fin; - bool mask; - bool rsv1; - bool rsv2; - bool rsv3; std::uint64_t len; std::uint32_t key; + opcode op; + bool fin : 1; + bool mask : 1; + bool rsv1 : 1; + bool rsv2 : 1; + bool rsv3 : 1; }; // holds the largest possible frame header @@ -226,7 +226,7 @@ write(DynamicBuffer& db, frame_header const& fh) // template void -read(ping_data& data, Buffers const& bs) +read_ping(ping_data& data, Buffers const& bs) { using boost::asio::buffer_copy; using boost::asio::buffer_size; @@ -242,7 +242,7 @@ read(ping_data& data, Buffers const& bs) // template void -read(close_reason& cr, +read_close(close_reason& cr, Buffers const& bs, close_code& code) { using boost::asio::buffer; @@ -279,7 +279,7 @@ read(close_reason& cr, { cr.reason.resize(n); buffer_copy(buffer(&cr.reason[0], n), cb); - if(! detail::check_utf8( + if(! check_utf8( cr.reason.data(), cr.reason.size())) { code = close_code::protocol_error; diff --git a/include/beast/websocket/detail/pausation.hpp b/include/beast/websocket/detail/pausation.hpp index 53a99884..8fd1d2a6 100644 --- a/include/beast/websocket/detail/pausation.hpp +++ b/include/beast/websocket/detail/pausation.hpp @@ -82,6 +82,60 @@ class pausation void operator()(); }; + template + class saved_op + { + Op* op_ = nullptr; + + public: + ~saved_op() + { + using boost::asio::asio_handler_deallocate; + if(op_) + { + auto h = std::move(op_->handler()); + op_->~Op(); + asio_handler_deallocate(op_, + sizeof(*op_), std::addressof(h)); + } + } + + saved_op(saved_op&& other) + : op_(other.op_) + { + other.op_ = nullptr; + } + + saved_op& operator=(saved_op&& other) + { + BOOST_ASSERT(! op_); + op_ = other.op_; + other.op_ = 0; + return *this; + } + + explicit + saved_op(Op&& op) + { + using boost::asio::asio_handler_allocate; + new(asio_handler_allocate(sizeof(Op), + std::addressof(op.handler()))) Op{ + std::move(op)}; + } + + void + operator()() + { + BOOST_ASSERT(op_); + Op op{std::move(*op_)}; + using boost::asio::asio_handler_deallocate; + asio_handler_deallocate(op_, + sizeof(*op_), std::addressof(op_->handler())); + op_ = nullptr; + op(); + } + }; + using buf_type = char[sizeof(holder)]; base* base_ = nullptr; @@ -125,6 +179,10 @@ public: void emplace(F&& f); + template + void + save(F&& f); + bool maybe_invoke() { @@ -150,6 +208,13 @@ pausation::emplace(F&& f) base_ = ::new(buf_) type{std::forward(f)}; } +template +void +pausation::save(F&& f) +{ + emplace(saved_op{std::move(f)}); +} + } // detail } // websocket } // beast diff --git a/include/beast/websocket/error.hpp b/include/beast/websocket/error.hpp index e36a37fa..6511d280 100644 --- a/include/beast/websocket/error.hpp +++ b/include/beast/websocket/error.hpp @@ -24,7 +24,10 @@ enum class error failed, /// Upgrade handshake failed - handshake_failed + handshake_failed, + + /// buffer overflow + buffer_overflow }; } // websocket diff --git a/include/beast/websocket/impl/accept.ipp b/include/beast/websocket/impl/accept.ipp index 6702364f..658df55f 100644 --- a/include/beast/websocket/impl/accept.ipp +++ b/include/beast/websocket/impl/accept.ipp @@ -28,8 +28,6 @@ namespace beast { namespace websocket { -//------------------------------------------------------------------------------ - // Respond to an upgrade HTTP request template template diff --git a/include/beast/websocket/impl/close.ipp b/include/beast/websocket/impl/close.ipp index 2d3d3187..186c323f 100644 --- a/include/beast/websocket/impl/close.ipp +++ b/include/beast/websocket/impl/close.ipp @@ -35,11 +35,13 @@ class stream::close_op close_reason cr; detail::frame_streambuf fb; int state = 0; + token tok; data(Handler&, stream& ws_, close_reason const& cr_) : ws(ws_) , cr(cr_) + , tok(ws.t_.unique()) { ws.template write_close< flat_static_buffer_base>(fb, cr); @@ -114,7 +116,7 @@ operator()(error_code ec, std::size_t) auto& d = *d_; if(ec) { - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.ws.failed_ = true; goto upcall; } @@ -128,7 +130,7 @@ operator()(error_code ec, std::size_t) d.ws.close_op_.emplace(std::move(*this)); return; } - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; if(d.ws.failed_ || d.ws.wr_close_) { // call handler @@ -140,7 +142,7 @@ operator()(error_code ec, std::size_t) do_write: // send close frame - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.state = 3; d.ws.wr_close_ = true; boost::asio::async_write(d.ws.stream_, @@ -149,7 +151,7 @@ operator()(error_code ec, std::size_t) case 1: BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; d.state = 2; // The current context is safe but might not be // the same as the one for this operation (since @@ -161,7 +163,7 @@ operator()(error_code ec, std::size_t) return; case 2: - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); if(d.ws.failed_ || d.ws.wr_close_) { // call handler @@ -174,30 +176,15 @@ operator()(error_code ec, std::size_t) break; } upcall: - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.wr_block_ = nullptr; + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + d.ws.wr_block_.reset(); d.ws.rd_op_.maybe_invoke() || d.ws.ping_op_.maybe_invoke() || d.ws.wr_op_.maybe_invoke(); d_.invoke(ec); } -template -template -async_return_type< - CloseHandler, void(error_code)> -stream:: -async_close(close_reason const& cr, CloseHandler&& handler) -{ - static_assert(is_async_stream::value, - "AsyncStream requirements not met"); - async_completion init{handler}; - close_op>{ - init.completion_handler, *this, cr}({}); - return init.result.get(); -} +//------------------------------------------------------------------------------ template void @@ -219,9 +206,12 @@ close(close_reason const& cr, error_code& ec) { static_assert(is_sync_stream::value, "SyncStream requirements not met"); - BOOST_ASSERT(! wr_close_); + // If rd_close_ is set then we already sent a close + BOOST_ASSERT(! rd_close_); if(wr_close_) { + // Can't call close twice, abort operation + BOOST_ASSERT(! wr_close_); ec = boost::asio::error::operation_aborted; return; } @@ -230,9 +220,26 @@ close(close_reason const& cr, error_code& ec) write_close(fb, cr); boost::asio::write(stream_, fb.data(), ec); failed_ = !!ec; + if(failed_) + return; } -//------------------------------------------------------------------------------ +template +template +async_return_type< + CloseHandler, void(error_code)> +stream:: +async_close(close_reason const& cr, CloseHandler&& handler) +{ + static_assert(is_async_stream::value, + "AsyncStream requirements not met"); + async_completion init{handler}; + close_op>{ + init.completion_handler, *this, cr}({}); + return init.result.get(); +} } // websocket } // beast diff --git a/include/beast/websocket/impl/error.ipp b/include/beast/websocket/impl/error.ipp index 96b03585..bd6856fe 100644 --- a/include/beast/websocket/impl/error.ipp +++ b/include/beast/websocket/impl/error.ipp @@ -39,6 +39,7 @@ public: case error::closed: return "WebSocket connection closed normally"; case error::failed: return "WebSocket connection failed due to a protocol violation"; case error::handshake_failed: return "WebSocket Upgrade handshake failed"; + case error::buffer_overflow: return "buffer overflow"; default: return "beast.websocket error"; diff --git a/include/beast/websocket/impl/fail.ipp b/include/beast/websocket/impl/fail.ipp new file mode 100644 index 00000000..23ff297f --- /dev/null +++ b/include/beast/websocket/impl/fail.ipp @@ -0,0 +1,233 @@ +// +// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BEAST_WEBSOCKET_IMPL_FAIL_IPP +#define BEAST_WEBSOCKET_IMPL_FAIL_IPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace beast { +namespace websocket { + +// _Fail the WebSocket Connection_ +// +template +template +class stream::fail_op +{ + Handler h_; + stream& ws_; + int step_ = 0; + bool dispatched_ = false; + fail_how how_; + token tok_; + +public: + fail_op(fail_op&&) = default; + fail_op(fail_op const&) = default; + + // send close code, then teardown + template + fail_op( + DeducedHandler&& h, + stream& ws, + close_code code) + : h_(std::forward(h)) + , ws_(ws) + , how_(fail_how::code) + , tok_(ws_.t_.unique()) + { + ws_.rd_.fb.consume(ws_.rd_.fb.size()); + ws_.template write_close< + flat_static_buffer_base>( + ws_.rd_.fb, code); + } + + // maybe send frame in fb, then teardown + template + fail_op( + DeducedHandler&& h, + stream& ws, + fail_how how) + : h_(std::forward(h)) + , ws_(ws) + , how_(how) + , tok_(ws_.t_.unique()) + { + } + + Handler& + handler() + { + return h_; + } + + void operator()(error_code ec = {}, + std::size_t bytes_transferred = 0); + + friend + void* asio_handler_allocate( + std::size_t size, fail_op* op) + { + using boost::asio::asio_handler_allocate; + return asio_handler_allocate( + size, std::addressof(op->h_)); + } + + friend + void asio_handler_deallocate( + void* p, std::size_t size, fail_op* op) + { + using boost::asio::asio_handler_deallocate; + asio_handler_deallocate( + p, size, std::addressof(op->h_)); + } + + friend + bool asio_handler_is_continuation(fail_op* op) + { + using boost::asio::asio_handler_is_continuation; + return op->dispatched_ || + asio_handler_is_continuation( + std::addressof(op->h_)); + } + + template + friend + void asio_handler_invoke(Function&& f, fail_op* op) + { + using boost::asio::asio_handler_invoke; + asio_handler_invoke(f, + std::addressof(op->h_)); + } +}; + +template +template +void +stream:: +fail_op:: +operator()(error_code ec, std::size_t) +{ + enum + { + do_start = 0, + do_resume = 20, + do_teardown = 40 + }; + switch(step_) + { + case do_start: + // Acquire write block + if(ws_.wr_block_) + { + // suspend + BOOST_ASSERT(ws_.wr_block_ != tok_); + step_ = do_resume; + ws_.rd_op_.save(std::move(*this)); + return; + } + ws_.wr_block_ = tok_; + goto go_write; + + case do_resume: + BOOST_ASSERT(! ws_.wr_block_); + ws_.wr_block_ = tok_; + step_ = do_resume + 1; + // We were invoked from a foreign context, so post + return ws_.get_io_service().post(std::move(*this)); + + case do_resume + 1: + BOOST_ASSERT(ws_.wr_block_ == tok_); + dispatched_ = true; + go_write: + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ws_.failed_) + { + ws_.wr_block_.reset(); + ec = boost::asio::error::operation_aborted; + break; + } + if(how_ == fail_how::teardown) + goto go_teardown; + if(ws_.wr_close_) + goto go_teardown; + // send close frame + step_ = do_teardown; + ws_.wr_close_ = true; + return boost::asio::async_write( + ws_.stream_, ws_.rd_.fb.data(), + std::move(*this)); + + case do_teardown: + BOOST_ASSERT(ws_.wr_block_ == tok_); + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + { + ws_.wr_block_.reset(); + break; + } + go_teardown: + BOOST_ASSERT(ws_.wr_block_ == tok_); + step_ = do_teardown + 1; + websocket_helpers::call_async_teardown( + ws_.next_layer(), std::move(*this)); + return; + + case do_teardown + 1: + BOOST_ASSERT(ws_.wr_block_ == tok_); + dispatched_ = true; + ws_.failed_ = true; + ws_.wr_block_.reset(); + if(ec == boost::asio::error::eof) + { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec.assign(0, ec.category()); + } + if(! ec) + { + switch(how_) + { + default: + case fail_how::code: + case fail_how::teardown: ec = error::failed; break; + case fail_how::close: ec = error::closed; break; + } + } + break; + } + // upcall + BOOST_ASSERT(ws_.wr_block_ != tok_); + ws_.close_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke() || + ws_.wr_op_.maybe_invoke(); + if(! dispatched_) + ws_.stream_.get_io_service().post( + bind_handler(std::move(h_), ec)); + else + h_(ec); +} + +} // websocket +} // beast + +#endif diff --git a/include/beast/websocket/impl/ping.ipp b/include/beast/websocket/impl/ping.ipp index 77b1c806..3b4d0426 100644 --- a/include/beast/websocket/impl/ping.ipp +++ b/include/beast/websocket/impl/ping.ipp @@ -35,10 +35,12 @@ class stream::ping_op stream& ws; detail::frame_streambuf fb; int state = 0; + token tok; data(Handler&, stream& ws_, detail::opcode op_, ping_data const& payload) : ws(ws_) + , tok(ws.t_.unique()) { using boost::asio::buffer; using boost::asio::buffer_copy; @@ -115,7 +117,7 @@ operator()(error_code ec, std::size_t) auto& d = *d_; if(ec) { - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.ws.failed_ = true; goto upcall; } @@ -129,7 +131,7 @@ operator()(error_code ec, std::size_t) d.ws.ping_op_.emplace(std::move(*this)); return; } - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; if(d.ws.failed_ || d.ws.wr_close_) { // call handler @@ -140,7 +142,7 @@ operator()(error_code ec, std::size_t) do_write: // send ping frame - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.state = 3; boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); @@ -148,7 +150,7 @@ operator()(error_code ec, std::size_t) case 1: BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; d.state = 2; // The current context is safe but might not be // the same as the one for this operation (since @@ -160,7 +162,7 @@ operator()(error_code ec, std::size_t) return; case 2: - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); if(d.ws.failed_ || d.ws.wr_close_) { // call handler @@ -173,8 +175,8 @@ operator()(error_code ec, std::size_t) break; } upcall: - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.wr_block_ = nullptr; + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + d.ws.wr_block_.reset(); d.ws.close_op_.maybe_invoke() || d.ws.rd_op_.maybe_invoke() || d.ws.wr_op_.maybe_invoke(); diff --git a/include/beast/websocket/impl/read.ipp b/include/beast/websocket/impl/read.ipp index 3a279385..8f1cec9b 100644 --- a/include/beast/websocket/impl/read.ipp +++ b/include/beast/websocket/impl/read.ipp @@ -9,8 +9,9 @@ #define BEAST_WEBSOCKET_IMPL_READ_IPP #include +#include #include -#include +#include #include #include #include @@ -22,6 +23,7 @@ #include #include #include +#include #include #include @@ -30,1006 +32,674 @@ namespace websocket { //------------------------------------------------------------------------------ -// Reads a single message frame, -// processes any received control frames. -// +// read a frame header, process control frames template -template -class stream::read_frame_op +template +class stream::read_fh_op { - using fb_type = - detail::frame_streambuf; - - using fmb_type = - typename fb_type::mutable_buffers_type; - - using dmb_type = - typename DynamicBuffer::mutable_buffers_type; - - struct data : op - { - bool cont; - stream& ws; - DynamicBuffer& db; - fb_type fb; - std::uint64_t remain; - detail::frame_header fh; - detail::prepared_key key; - boost::optional dmb; - boost::optional fmb; - int state = 0; - - data(Handler& handler, stream& ws_, - DynamicBuffer& sb_) - : ws(ws_) - , db(sb_) - { - using boost::asio::asio_handler_is_continuation; - cont = asio_handler_is_continuation(std::addressof(handler)); - } - }; - - handler_ptr d_; + Handler h_; + stream& ws_; + int step_ = 0; + bool dispatched_ = false; + token tok_; public: - read_frame_op(read_frame_op&&) = default; - read_frame_op(read_frame_op const&) = default; + read_fh_op(read_fh_op&&) = default; + read_fh_op(read_fh_op const&) = default; - template - read_frame_op(DeducedHandler&& h, - stream& ws, Args&&... args) - : d_(std::forward(h), - ws, std::forward(args)...) + template + read_fh_op( + DeducedHandler&& h, + stream& ws) + : h_(std::forward(h)) + , ws_(ws) + , tok_(ws_.t_.unique()) { } - void operator()() + Handler& + handler() { - (*this)(error_code{}, 0, true); + return h_; } - void operator()(error_code const& ec) - { - (*this)(ec, 0, true); - } - - void operator()(error_code ec, - std::size_t bytes_transferred); - - void operator()(error_code ec, - std::size_t bytes_transferred, bool again); + void operator()(error_code ec = {}, + std::size_t bytes_transferred = 0); friend void* asio_handler_allocate( - std::size_t size, read_frame_op* op) + std::size_t size, read_fh_op* op) { using boost::asio::asio_handler_allocate; return asio_handler_allocate( - size, std::addressof(op->d_.handler())); + size, std::addressof(op->h_)); } friend void asio_handler_deallocate( - void* p, std::size_t size, read_frame_op* op) + void* p, std::size_t size, read_fh_op* op) { using boost::asio::asio_handler_deallocate; asio_handler_deallocate( - p, size, std::addressof(op->d_.handler())); + p, size, std::addressof(op->h_)); } friend - bool asio_handler_is_continuation(read_frame_op* op) + bool asio_handler_is_continuation(read_fh_op* op) { - return op->d_->cont; + using boost::asio::asio_handler_is_continuation; + return op->dispatched_ || + asio_handler_is_continuation( + std::addressof(op->h_)); } template friend - void asio_handler_invoke(Function&& f, read_frame_op* op) + void asio_handler_invoke(Function&& f, read_fh_op* op) { using boost::asio::asio_handler_invoke; - asio_handler_invoke( - f, std::addressof(op->d_.handler())); + asio_handler_invoke(f, std::addressof(op->h_)); } }; template -template +template void -stream::read_frame_op:: -operator()(error_code ec, std::size_t bytes_transferred) -{ - auto& d = *d_; - if(ec) - d.ws.failed_ = true; - (*this)(ec, bytes_transferred, true); -} - -template -template -void -stream::read_frame_op:: -operator()(error_code ec, - std::size_t bytes_transferred, bool again) +stream:: +read_fh_op:: +operator()( + error_code ec, + std::size_t bytes_transferred) { using beast::detail::clamp; - using boost::asio::buffer; enum { - do_start = 0, - do_read_payload = 1, - do_inflate_payload = 30, - do_frame_done = 4, - do_read_fh = 5, - do_control_payload = 8, - do_control = 9, - do_pong_resume = 10, - do_ponged = 12, - do_close_resume = 14, - do_teardown = 17, - do_fail = 19, - - do_call_handler = 99 + do_loop = 0, + do_pong = 10 }; - - auto& d = *d_; - if(d.state == do_teardown + 1 && ec == boost::asio::error::eof) + switch(step_) { - // Rationale: - // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error - ec.assign(0, ec.category()); - } - if(! ec) + case do_loop: + go_loop: { - d.cont = d.cont || again; - close_code code = close_code::none; - do + BOOST_ASSERT( + ws_.rd_.remain == 0 && + (! ws_.rd_.fh.fin || ws_.rd_.done)); + if(ws_.failed_) { - switch(d.state) - { - case do_start: - if(d.ws.failed_) - { - d.state = do_call_handler; - d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted, 0)); - return; - } - d.state = do_read_fh; - break; - - //------------------------------------------------------------------ - - case do_read_payload: - if(d.fh.len == 0) - { - d.state = do_frame_done; - break; - } - // Enforce message size limit - if(d.ws.rd_msg_max_ && d.fh.len > - d.ws.rd_msg_max_ - d.ws.rd_.size) - { - code = close_code::too_big; - d.state = do_fail; - break; - } - d.ws.rd_.size += d.fh.len; - d.remain = d.fh.len; - if(d.fh.mask) - detail::prepare_key(d.key, d.fh.key); - BEAST_FALLTHROUGH; - - case do_read_payload + 1: - d.state = do_read_payload + 2; - d.dmb = d.db.prepare(clamp(d.remain)); - // Read frame payload data - d.ws.stream_.async_read_some( - *d.dmb, std::move(*this)); - return; - - case do_read_payload + 2: - { - d.remain -= bytes_transferred; - auto const pb = buffer_prefix( - bytes_transferred, *d.dmb); - if(d.fh.mask) - detail::mask_inplace(pb, d.key); - if(d.ws.rd_.op == detail::opcode::text) - { - if(! d.ws.rd_.utf8.write(pb) || - (d.remain == 0 && d.fh.fin && - ! d.ws.rd_.utf8.finish())) - { - // invalid utf8 - code = close_code::bad_payload; - d.state = do_fail; - break; - } - } - d.db.commit(bytes_transferred); - if(d.remain > 0) - { - d.state = do_read_payload + 1; - break; - } - d.state = do_frame_done; - break; - } - - //------------------------------------------------------------------ - - case do_inflate_payload: - d.remain = d.fh.len; - if(d.fh.len == 0) - { - // inflate even if fh.len == 0, otherwise we - // never emit the end-of-stream deflate block. - bytes_transferred = 0; - d.state = do_inflate_payload + 2; - break; - } - if(d.fh.mask) - detail::prepare_key(d.key, d.fh.key); - // fall through - - case do_inflate_payload + 1: - { - d.state = do_inflate_payload + 2; - // Read compressed frame payload data - d.ws.stream_.async_read_some( - buffer(d.ws.rd_.buf.get(), clamp( - d.remain, d.ws.rd_.buf_size)), - std::move(*this)); - return; - } - - case do_inflate_payload + 2: - { - d.remain -= bytes_transferred; - auto const in = buffer( - d.ws.rd_.buf.get(), bytes_transferred); - if(d.fh.mask) - detail::mask_inplace(in, d.key); - auto const prev = d.db.size(); - detail::inflate(d.ws.pmd_->zi, d.db, in, ec); - d.ws.failed_ = !!ec; - if(d.ws.failed_) - break; - if(d.remain == 0 && d.fh.fin) - { - static std::uint8_t constexpr - empty_block[4] = { - 0x00, 0x00, 0xff, 0xff }; - detail::inflate(d.ws.pmd_->zi, d.db, - buffer(&empty_block[0], 4), ec); - d.ws.failed_ = !!ec; - if(d.ws.failed_) - break; - } - if(d.ws.rd_.op == detail::opcode::text) - { - consuming_buffers cb{d.db.data()}; - cb.consume(prev); - if(! d.ws.rd_.utf8.write(cb) || - (d.remain == 0 && d.fh.fin && - ! d.ws.rd_.utf8.finish())) - { - // invalid utf8 - code = close_code::bad_payload; - d.state = do_fail; - break; - } - } - if(d.remain > 0) - { - d.state = do_inflate_payload + 1; - break; - } - if(d.fh.fin && ( - (d.ws.role_ == role_type::client && - d.ws.pmd_config_.server_no_context_takeover) || - (d.ws.role_ == role_type::server && - d.ws.pmd_config_.client_no_context_takeover))) - d.ws.pmd_->zi.reset(); - d.state = do_frame_done; - break; - } - - //------------------------------------------------------------------ - - case do_frame_done: - goto upcall; - - //------------------------------------------------------------------ - - case do_read_fh: - d.state = do_read_fh + 1; - boost::asio::async_read(d.ws.stream_, - d.fb.prepare(2), std::move(*this)); - return; - - case do_read_fh + 1: - { - d.fb.commit(bytes_transferred); - code = close_code::none; - auto const n = d.ws.read_fh1( - d.fh, d.fb, code); - if(code != close_code::none) - { - // protocol error - d.state = do_fail; - break; - } - d.state = do_read_fh + 2; - if(n == 0) - { - bytes_transferred = 0; - break; - } - // read variable header - boost::asio::async_read(d.ws.stream_, - d.fb.prepare(n), std::move(*this)); - return; - } - - case do_read_fh + 2: - d.fb.commit(bytes_transferred); - code = close_code::none; - d.ws.read_fh2(d.fh, d.fb, code); - if(code != close_code::none) - { - // protocol error - d.state = do_fail; - break; - } - if(detail::is_control(d.fh.op)) - { - if(d.fh.len > 0) - { - // read control payload - d.state = do_control_payload; - d.fmb = d.fb.prepare(static_cast< - std::size_t>(d.fh.len)); - boost::asio::async_read(d.ws.stream_, - *d.fmb, std::move(*this)); - return; - } - d.state = do_control; - break; - } - if(d.fh.op == detail::opcode::text || - d.fh.op == detail::opcode::binary) - d.ws.rd_begin(); - if(d.fh.len == 0 && ! d.fh.fin) - { - // Empty message frame - d.state = do_frame_done; - break; - } - if(! d.ws.pmd_ || ! d.ws.pmd_->rd_set) - d.state = do_read_payload; - else - d.state = do_inflate_payload; - break; - - //------------------------------------------------------------------ - - case do_control_payload: - if(d.fh.mask) - { - detail::prepare_key(d.key, d.fh.key); - detail::mask_inplace(*d.fmb, d.key); - } - d.fb.commit(bytes_transferred); - d.state = do_control; // VFALCO fall through? - break; - - //------------------------------------------------------------------ - - case do_control: - if(d.fh.op == detail::opcode::ping) - { - ping_data payload; - detail::read(payload, d.fb.data()); - d.fb.consume(d.fb.size()); - if(d.ws.ctrl_cb_) - d.ws.ctrl_cb_( - frame_type::ping, payload); - if(d.ws.wr_close_) - { - // ignore ping when closing - d.state = do_read_fh; - break; - } - d.ws.template write_ping( - d.fb, detail::opcode::pong, payload); - if(d.ws.wr_block_) - { - // suspend - BOOST_ASSERT(d.ws.wr_block_ != &d); - d.state = do_pong_resume; - d.ws.rd_op_.emplace(std::move(*this)); - return; - } - d.ws.wr_block_ = &d; - goto go_pong; - } - if(d.fh.op == detail::opcode::pong) - { - code = close_code::none; - ping_data payload; - detail::read(payload, d.fb.data()); - if(d.ws.ctrl_cb_) - d.ws.ctrl_cb_( - frame_type::pong, payload); - d.fb.consume(d.fb.size()); - d.state = do_read_fh; - break; - } - BOOST_ASSERT(d.fh.op == detail::opcode::close); - { - BOOST_ASSERT(! d.ws.rd_close_); - d.ws.rd_close_ = true; - detail::read(d.ws.cr_, d.fb.data(), code); - if(code != close_code::none) - { - // protocol error - d.state = do_fail; - break; - } - if(d.ws.ctrl_cb_) - d.ws.ctrl_cb_(frame_type::close, - d.ws.cr_.reason); - if(! d.ws.wr_close_) - { - auto cr = d.ws.cr_; - if(cr.code == close_code::none) - cr.code = close_code::normal; - cr.reason = ""; - d.fb.consume(d.fb.size()); - d.ws.template write_close< - flat_static_buffer_base>(d.fb, cr); - if(d.ws.wr_block_) - { - // suspend - BOOST_ASSERT(d.ws.wr_block_ != &d); - d.state = do_close_resume; - d.ws.rd_op_.emplace(std::move(*this)); - return; - } - d.ws.wr_block_ = &d; - goto go_close; - } - d.state = do_teardown; - break; - } - - //------------------------------------------------------------------ - - case do_pong_resume: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - d.state = do_pong_resume + 1; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - d.ws.get_io_service().post(bind_handler( - std::move(*this), ec, 0)); - return; - - case do_pong_resume + 1: - BOOST_ASSERT(d.ws.wr_block_ == &d); - if(d.ws.failed_) - { - // call handler - ec = boost::asio::error::operation_aborted; - goto upcall; - } - if(d.ws.wr_close_) - { - // ignore ping when closing - d.ws.wr_block_ = nullptr; - d.fb.consume(d.fb.size()); - d.state = do_read_fh; - break; - } - - //------------------------------------------------------------------ - - go_pong: - // send pong - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.state = do_ponged; - boost::asio::async_write(d.ws.stream_, - d.fb.data(), std::move(*this)); - return; - - case do_ponged: - d.ws.wr_block_ = nullptr; - d.fb.consume(d.fb.size()); - d.state = do_read_fh; - break; - - //------------------------------------------------------------------ - - case do_close_resume: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - d.state = do_close_resume + 1; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - d.ws.get_io_service().post(bind_handler( - std::move(*this), ec, bytes_transferred)); - return; - - case do_close_resume + 1: - BOOST_ASSERT(d.ws.wr_block_ == &d); - if(d.ws.failed_) - { - // call handler - ec = boost::asio::error::operation_aborted; - goto upcall; - } - if(d.ws.wr_close_) - { - // already sent a close frame - ec = error::closed; - goto upcall; - } - - //------------------------------------------------------------------ - - go_close: - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.state = do_teardown; - d.ws.wr_close_ = true; - boost::asio::async_write(d.ws.stream_, - d.fb.data(), std::move(*this)); - return; - - //------------------------------------------------------------------ - - case do_teardown: - d.state = do_teardown + 1; - websocket_helpers::call_async_teardown( - d.ws.next_layer(), std::move(*this)); - return; - - case do_teardown + 1: - // call handler - ec = error::closed; - goto upcall; - - //------------------------------------------------------------------ - - case do_fail: - if(d.ws.wr_close_) - { - d.state = do_fail + 4; - break; - } - d.fb.consume(d.fb.size()); - d.ws.template write_close< - flat_static_buffer_base>(d.fb, code); - if(d.ws.wr_block_) - { - // suspend - BOOST_ASSERT(d.ws.wr_block_ != &d); - d.state = do_fail + 2; - d.ws.rd_op_.emplace(std::move(*this)); - return; - } - d.ws.wr_block_ = &d; - BEAST_FALLTHROUGH; - - case do_fail + 1: - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.failed_ = true; - // send close frame - d.state = do_fail + 4; - d.ws.wr_close_ = true; - boost::asio::async_write(d.ws.stream_, - d.fb.data(), std::move(*this)); - return; - - case do_fail + 2: - // resume - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - d.state = do_fail + 3; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - d.ws.get_io_service().post(bind_handler( - std::move(*this), ec, bytes_transferred)); - return; - - case do_fail + 3: - BOOST_ASSERT(d.ws.wr_block_ == &d); - if(d.ws.failed_ || d.ws.wr_close_) - { - // call handler - ec = error::failed; - goto upcall; - } - d.state = do_fail + 1; - break; - - case do_fail + 4: - d.state = do_fail + 5; - websocket_helpers::call_async_teardown( - d.ws.next_layer(), std::move(*this)); - return; - - case do_fail + 5: - // call handler - ec = error::failed; - goto upcall; - - //------------------------------------------------------------------ - - case do_call_handler: - goto upcall; - } + // Reads after failure are aborted + ec = boost::asio::error::operation_aborted; + break; } - while(! ec); - } -upcall: - if(d.ws.wr_block_ == &d) - d.ws.wr_block_ = nullptr; - d.ws.close_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke() || - d.ws.wr_op_.maybe_invoke(); - bool const fin = (! ec) ? d.fh.fin : false; - d_.invoke(ec, fin); -} - -template -template -async_return_type< - ReadHandler, void(error_code, bool)> -stream:: -async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler) -{ - static_assert(is_async_stream::value, - "AsyncStream requirements requirements not met"); - static_assert(beast::is_dynamic_buffer::value, - "DynamicBuffer requirements not met"); - async_completion init{handler}; - read_frame_op>{ - init.completion_handler,*this, buffer}( - {}, 0, false); - return init.result.get(); -} - -template -template -bool -stream:: -read_frame(DynamicBuffer& buffer) -{ - static_assert(is_sync_stream::value, - "SyncStream requirements not met"); - static_assert(beast::is_dynamic_buffer::value, - "DynamicBuffer requirements not met"); - error_code ec; - auto const fin = read_frame(buffer, ec); - if(ec) - BOOST_THROW_EXCEPTION(system_error{ec}); - return fin; -} - -template -template -bool -stream:: -read_frame(DynamicBuffer& dynabuf, error_code& ec) -{ - static_assert(is_sync_stream::value, - "SyncStream requirements not met"); - static_assert(beast::is_dynamic_buffer::value, - "DynamicBuffer requirements not met"); - using beast::detail::clamp; - using boost::asio::buffer; - using boost::asio::buffer_cast; - using boost::asio::buffer_size; - close_code code{}; - for(;;) - { + close_code code{}; // Read frame header - detail::frame_header fh; - detail::frame_streambuf fb; + if(! ws_.parse_fh(ws_.rd_.fh, ws_.rd_.buf, code)) { - fb.commit(boost::asio::read( - stream_, fb.prepare(2), ec)); - failed_ = !!ec; - if(failed_) - return false; - { - auto const n = read_fh1(fh, fb, code); - if(code != close_code::none) - goto do_close; - if(n > 0) - { - fb.commit(boost::asio::read( - stream_, fb.prepare(n), ec)); - failed_ = !!ec; - if(failed_) - return false; - } - } - read_fh2(fh, fb, code); - - failed_ = !!ec; - if(failed_) - return false; if(code != close_code::none) - goto do_close; - } - if(detail::is_control(fh.op)) - { - // Read control frame payload - if(fh.len > 0) { - auto const mb = fb.prepare( - static_cast(fh.len)); - fb.commit(boost::asio::read(stream_, mb, ec)); - failed_ = !!ec; - if(failed_) - return false; - if(fh.mask) - { - detail::prepared_key key; - detail::prepare_key(key, fh.key); - detail::mask_inplace(mb, key); - } - fb.commit(static_cast(fh.len)); + // _Fail the WebSocket Connection_ + return fail_op{ + std::move(h_), ws_, code}(); } + step_ = do_loop + 1; + return ws_.stream_.async_read_some( + ws_.rd_.buf.prepare(read_size( + ws_.rd_.buf, ws_.rd_.buf.max_size())), + std::move(*this)); + } + // Immediately apply the mask to the portion + // of the buffer holding payload data. + if(ws_.rd_.fh.len > 0 && ws_.rd_.fh.mask) + detail::mask_inplace(buffer_prefix( + clamp(ws_.rd_.fh.len), + ws_.rd_.buf.mutable_data()), + ws_.rd_.key); + if(detail::is_control(ws_.rd_.fh.op)) + { + // Get control frame payload + auto const cb = buffer_prefix(clamp( + ws_.rd_.fh.len), ws_.rd_.buf.data()); + auto const len = buffer_size(cb); + BOOST_ASSERT(len == ws_.rd_.fh.len); // Process control frame - if(fh.op == detail::opcode::ping) + if(ws_.rd_.fh.op == detail::opcode::ping) { ping_data payload; - detail::read(payload, fb.data()); - fb.consume(fb.size()); - if(ctrl_cb_) - ctrl_cb_(frame_type::ping, payload); - write_ping(fb, - detail::opcode::pong, payload); - boost::asio::write(stream_, fb.data(), ec); - failed_ = !!ec; - if(failed_) - return false; - continue; + detail::read_ping(payload, cb); + ws_.rd_.buf.consume(len); + if(ws_.wr_close_) + { + // Ignore ping when closing + goto go_loop; + } + if(ws_.ctrl_cb_) + ws_.ctrl_cb_(frame_type::ping, payload); + ws_.rd_.fb.consume(ws_.rd_.fb.size()); + ws_.template write_ping< + flat_static_buffer_base>(ws_.rd_.fb, + detail::opcode::pong, payload); + goto go_pong; } - else if(fh.op == detail::opcode::pong) + if(ws_.rd_.fh.op == detail::opcode::pong) { + code = close_code::none; ping_data payload; - detail::read(payload, fb.data()); - if(ctrl_cb_) - ctrl_cb_(frame_type::pong, payload); - continue; + detail::read_ping(payload, cb); + ws_.rd_.buf.consume(len); + // Ignore pong when closing + if(! ws_.wr_close_ && ws_.ctrl_cb_) + ws_.ctrl_cb_(frame_type::pong, payload); + goto go_loop; } - BOOST_ASSERT(fh.op == detail::opcode::close); + BOOST_ASSERT(ws_.rd_.fh.op == detail::opcode::close); { - BOOST_ASSERT(! rd_close_); - rd_close_ = true; - detail::read(cr_, fb.data(), code); + BOOST_ASSERT(! ws_.rd_close_); + ws_.rd_close_ = true; + detail::read_close(ws_.cr_, cb, code); if(code != close_code::none) - goto do_close; - if(ctrl_cb_) - ctrl_cb_(frame_type::close, cr_.reason); - if(! wr_close_) { - auto cr = cr_; - if(cr.code == close_code::none) - cr.code = close_code::normal; - cr.reason = ""; - fb.consume(fb.size()); - wr_close_ = true; - write_close(fb, cr); - boost::asio::write(stream_, fb.data(), ec); - failed_ = !!ec; - if(failed_) - return false; + // _Fail the WebSocket Connection_ + return fail_op{ + std::move(h_), ws_, code}(); } - goto do_close; + ws_.rd_.buf.consume(len); + if(ws_.ctrl_cb_) + ws_.ctrl_cb_(frame_type::close, + ws_.cr_.reason); + if(ws_.wr_close_) + { + // _Close the WebSocket Connection_ + return fail_op{ + std::move(h_), ws_, fail_how::close}(); + } + auto cr = ws_.cr_; + if(cr.code == close_code::none) + cr.code = close_code::normal; + cr.reason = ""; + ws_.rd_.fb.consume(ws_.rd_.fb.size()); + ws_.template write_close< + flat_static_buffer_base>( + ws_.rd_.fb, cr); + // _Start the WebSocket Closing Handshake_ + return fail_op{ + std::move(h_), ws_, fail_how::close}(); } } - if(fh.op != detail::opcode::cont) - rd_begin(); - if(fh.len == 0 && ! fh.fin) + if(ws_.rd_.fh.len == 0 && ! ws_.rd_.fh.fin) { - // empty frame - continue; + // Empty non-final frame + goto go_loop; } - auto remain = fh.len; - detail::prepared_key key; - if(fh.mask) - detail::prepare_key(key, fh.key); - if(! pmd_ || ! pmd_->rd_set) - { - // Enforce message size limit - if(rd_msg_max_ && fh.len > - rd_msg_max_ - rd_.size) - { - code = close_code::too_big; - goto do_close; - } - rd_.size += fh.len; - // Read message frame payload - while(remain > 0) - { - auto b = - dynabuf.prepare(clamp(remain)); - auto const bytes_transferred = - stream_.read_some(b, ec); - failed_ = !!ec; - if(failed_) - return false; - BOOST_ASSERT(bytes_transferred > 0); - remain -= bytes_transferred; - auto const pb = buffer_prefix( - bytes_transferred, b); - if(fh.mask) - detail::mask_inplace(pb, key); - if(rd_.op == detail::opcode::text) - { - if(! rd_.utf8.write(pb) || - (remain == 0 && fh.fin && - ! rd_.utf8.finish())) - { - code = close_code::bad_payload; - goto do_close; - } - } - dynabuf.commit(bytes_transferred); - } - } - else - { - // Read compressed message frame payload: - // inflate even if fh.len == 0, otherwise we - // never emit the end-of-stream deflate block. - for(;;) - { - auto const bytes_transferred = - stream_.read_some(buffer(rd_.buf.get(), - clamp(remain, rd_.buf_size)), ec); - failed_ = !!ec; - if(failed_) - return false; - remain -= bytes_transferred; - auto const in = buffer( - rd_.buf.get(), bytes_transferred); - if(fh.mask) - detail::mask_inplace(in, key); - auto const prev = dynabuf.size(); - detail::inflate(pmd_->zi, dynabuf, in, ec); - failed_ = !!ec; - if(failed_) - return false; - if(remain == 0 && fh.fin) - { - static std::uint8_t constexpr - empty_block[4] = { - 0x00, 0x00, 0xff, 0xff }; - detail::inflate(pmd_->zi, dynabuf, - buffer(&empty_block[0], 4), ec); - failed_ = !!ec; - if(failed_) - return false; - } - if(rd_.op == detail::opcode::text) - { - consuming_buffers cb{dynabuf.data()}; - cb.consume(prev); - if(! rd_.utf8.write(cb) || ( - remain == 0 && fh.fin && - ! rd_.utf8.finish())) - { - code = close_code::bad_payload; - goto do_close; - } - } - if(remain == 0) - break; - } - if(fh.fin && ( - (role_ == role_type::client && - pmd_config_.server_no_context_takeover) || - (role_ == role_type::server && - pmd_config_.client_no_context_takeover))) - pmd_->zi.reset(); - } - return fh.fin; + ws_.rd_.done = false; + break; } -do_close: - if(code != close_code::none) - { - // Fail the connection (per rfc6455) - if(! wr_close_) + + case do_loop + 1: + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + ws_.rd_.buf.commit(bytes_transferred); + goto go_loop; + + go_pong: + if(ws_.wr_block_) { - wr_close_ = true; - detail::frame_streambuf fb; - write_close(fb, code); - boost::asio::write(stream_, fb.data(), ec); - failed_ = !!ec; - if(failed_) - return false; + // suspend + BOOST_ASSERT(ws_.wr_block_ != tok_); + step_ = do_pong; + ws_.rd_op_.save(std::move(*this)); + return; } - websocket_helpers::call_teardown(next_layer(), ec); - if(ec == boost::asio::error::eof) + ws_.wr_block_ = tok_; + goto go_pong_send; + + case do_pong: + BOOST_ASSERT(! ws_.wr_block_); + ws_.wr_block_ = tok_; + step_ = do_pong + 1; + // The current context is safe but might not be + // the same as the one for this operation (since + // we are being called from a write operation). + // Call post to make sure we are invoked the same + // way as the final handler for this operation. + ws_.get_io_service().post(bind_handler( + std::move(*this), ec, 0)); + return; + + case do_pong + 1: + BOOST_ASSERT(ws_.wr_block_ == tok_); + dispatched_ = true; + if(ws_.failed_) { - // Rationale: - // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error - ec.assign(0, ec.category()); + // call handler + ws_.wr_block_.reset(); + ec = boost::asio::error::operation_aborted; + break; } - failed_ = !!ec; - if(failed_) - return false; - ec = error::failed; - failed_ = true; - return false; + if(ws_.wr_close_) + { + // ignore ping when closing + ws_.wr_block_.reset(); + ws_.rd_.fb.consume(ws_.rd_.fb.size()); + goto go_loop; + } + go_pong_send: + // send pong + BOOST_ASSERT(ws_.wr_block_ == tok_); + step_ = do_pong + 2; + boost::asio::async_write(ws_.stream_, + ws_.rd_.fb.data(), std::move(*this)); + return; + + case do_pong + 2: + BOOST_ASSERT(ws_.wr_block_ == tok_); + dispatched_ = true; + ws_.wr_block_.reset(); + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + ws_.rd_.fb.consume(ws_.rd_.fb.size()); + goto go_loop; } - if(! ec) - { - websocket_helpers::call_teardown(next_layer(), ec); - if(ec == boost::asio::error::eof) - { - // (See above) - ec.assign(0, ec.category()); - } - } - if(! ec) - ec = error::closed; - failed_ = !!ec; - if(failed_) - return false; - return true; + // upcall + BOOST_ASSERT(ws_.wr_block_ != tok_); + ws_.close_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke() || + ws_.wr_op_.maybe_invoke(); + if(! dispatched_) + ws_.stream_.get_io_service().post( + bind_handler(std::move(h_), ec)); + else + h_(ec); } //------------------------------------------------------------------------------ -// read an entire message +// Reads a single message frame, +// processes any received control frames. // template -template +template< + class MutableBufferSequence, + class Handler> +class stream::read_some_op +{ + Handler h_; + stream& ws_; + consuming_buffers cb_; + std::size_t bytes_written_ = 0; + int step_ = 0; + bool did_read_ = false; + bool dispatched_ = false; + +public: + read_some_op(read_some_op&&) = default; + read_some_op(read_some_op const&) = default; + + template + read_some_op( + DeducedHandler&& h, + stream& ws, + MutableBufferSequence const& bs) + : h_(std::forward(h)) + , ws_(ws) + , cb_(bs) + { + } + + void operator()(error_code ec = {}, + std::size_t bytes_transferred = 0); + + friend + void* asio_handler_allocate( + std::size_t size, read_some_op* op) + { + using boost::asio::asio_handler_allocate; + return asio_handler_allocate( + size, std::addressof(op->h_)); + } + + friend + void asio_handler_deallocate( + void* p, std::size_t size, read_some_op* op) + { + using boost::asio::asio_handler_deallocate; + asio_handler_deallocate(p, size, + std::addressof(op->h_)); + } + + friend + bool asio_handler_is_continuation(read_some_op* op) + { + using boost::asio::asio_handler_is_continuation; + return op->dispatched_ || + asio_handler_is_continuation( + std::addressof(op->h_)); + } + + template + friend + void asio_handler_invoke(Function&& f, read_some_op* op) + { + using boost::asio::asio_handler_invoke; + asio_handler_invoke( + f, std::addressof(op->h_)); + } +}; + +template +template +void +stream:: +read_some_op:: +operator()( + error_code ec, + std::size_t bytes_transferred) +{ + enum + { + do_start = 0, + do_maybe_fill = 10, + do_read = 20, + do_inflate = 30 + }; + using beast::detail::clamp; + using boost::asio::buffer; + using boost::asio::buffer_cast; + using boost::asio::buffer_size; + switch(step_) + { + case do_start: + if(ws_.failed_) + { + // Reads after failure are aborted + ec = boost::asio::error::operation_aborted; + break; + } + // See if we need to read a frame header. This + // condition is structured to give the decompressor + // a chance to emit the final empty deflate block + // + if(ws_.rd_.remain == 0 && + (! ws_.rd_.fh.fin || ws_.rd_.done)) + { + step_ = do_maybe_fill; + return read_fh_op{ + std::move(*this), ws_}({}, 0); + } + goto go_maybe_fill; + + case do_maybe_fill: + if(ec) + break; + dispatched_ = true; + + go_maybe_fill: + if(ws_.pmd_ && ws_.pmd_->rd_set) + goto go_inflate; + if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > + (std::min)(clamp(ws_.rd_.remain), + buffer_size(cb_))) + { + // Fill the read buffer first, otherwise we + // get fewer bytes at the cost of one I/O. + auto const mb = ws_.rd_.buf.prepare( + read_size(ws_.rd_.buf, + ws_.rd_.buf.max_size())); + step_ = do_maybe_fill + 1; + return ws_.stream_.async_read_some( + mb, std::move(*this)); + } + goto go_rd_buf; + + case do_maybe_fill + 1: + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + ws_.rd_.buf.commit(bytes_transferred); + if(ws_.rd_.fh.mask) + detail::mask_inplace(buffer_prefix(clamp( + ws_.rd_.remain), ws_.rd_.buf.mutable_data()), + ws_.rd_.key); + + go_rd_buf: + if(ws_.rd_.buf.size() > 0) + { + // Copy from the read buffer. + // The mask was already applied. + bytes_transferred = buffer_copy(cb_, + ws_.rd_.buf.data(), clamp(ws_.rd_.remain)); + auto const mb = buffer_prefix( + bytes_transferred, cb_); + ws_.rd_.remain -= bytes_transferred; + if(ws_.rd_.op == detail::opcode::text) + { + if(! ws_.rd_.utf8.write(mb) || + (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + return fail_op{std::move(h_), + ws_, close_code::bad_payload}({}, 0); + } + } + bytes_written_ += bytes_transferred; + ws_.rd_.size += bytes_transferred; + ws_.rd_.buf.consume(bytes_transferred); + goto go_done; + } + // Read into caller's buffer + step_ = do_read; + return ws_.stream_.async_read_some(buffer_prefix( + clamp(ws_.rd_.remain), cb_), std::move(*this)); + + case do_read: + { + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + BOOST_ASSERT(bytes_transferred > 0); + auto const mb = buffer_prefix( + bytes_transferred, cb_); + ws_.rd_.remain -= bytes_transferred; + if(ws_.rd_.fh.mask) + detail::mask_inplace(mb, ws_.rd_.key); + if(ws_.rd_.op == detail::opcode::text) + { + if(! ws_.rd_.utf8.write(mb) || + (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + return fail_op{std::move(h_), + ws_, close_code::bad_payload}(); + } + } + bytes_written_ += bytes_transferred; + ws_.rd_.size += bytes_transferred; + } + + go_done: + if(ws_.rd_.remain == 0 && ws_.rd_.fh.fin) + ws_.rd_.done = true; + break; + + case do_inflate: + go_inflate: + { + // Read compressed message frame payload: + // inflate even if rd_.fh.len == 0, otherwise we + // never emit the end-of-stream deflate block. + while(buffer_size(cb_) > 0) + { + zlib::z_params zs; + { + auto const out = buffer_front(cb_); + zs.next_out = buffer_cast(out); + zs.avail_out = buffer_size(out); + BOOST_ASSERT(zs.avail_out > 0); + } + if(ws_.rd_.remain > 0) + { + if(ws_.rd_.buf.size() > 0) + { + // use what's there + auto const in = buffer_prefix( + clamp(ws_.rd_.remain), buffer_front( + ws_.rd_.buf.data())); + zs.avail_in = buffer_size(in); + zs.next_in = buffer_cast(in); + } + else if(! did_read_) + { + // read new + step_ = do_inflate + 1; + return ws_.stream_.async_read_some( + ws_.rd_.buf.prepare(read_size( + ws_.rd_.buf, ws_.rd_.buf.max_size())), + std::move(*this)); + } + else + { + break; + } + } + else if(ws_.rd_.fh.fin) + { + // append the empty block codes + static std::uint8_t constexpr + empty_block[4] = { + 0x00, 0x00, 0xff, 0xff }; + zs.next_in = empty_block; + zs.avail_in = sizeof(empty_block); + ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); + BOOST_ASSERT(! ec); + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + // VFALCO See: + // https://github.com/madler/zlib/issues/280 + BOOST_ASSERT(zs.total_out == 0); + cb_.consume(zs.total_out); + ws_.rd_.size += zs.total_out; + bytes_written_ += zs.total_out; + if( + (ws_.role_ == role_type::client && + ws_.pmd_config_.server_no_context_takeover) || + (ws_.role_ == role_type::server && + ws_.pmd_config_.client_no_context_takeover)) + ws_.pmd_->zi.reset(); + ws_.rd_.done = true; + break; + } + else + { + break; + } + ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); + BOOST_ASSERT(ec != zlib::error::end_of_stream); + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( + ws_.rd_.size, zs.total_out, ws_.rd_msg_max_)) + { + // _Fail the WebSocket Connection_ + return fail_op{std::move(h_), + ws_, close_code::too_big}(); + } + cb_.consume(zs.total_out); + ws_.rd_.size += zs.total_out; + ws_.rd_.remain -= zs.total_in; + ws_.rd_.buf.consume(zs.total_in); + bytes_written_ += zs.total_out; + } + if(ws_.rd_.op == detail::opcode::text) + { + // check utf8 + if(! ws_.rd_.utf8.write( + buffer_prefix(bytes_written_, cb_.get())) || ( + ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + return fail_op{std::move(h_), + ws_, close_code::bad_payload}(); + } + } + break; + } + + case do_inflate + 1: + { + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + BOOST_ASSERT(bytes_transferred > 0); + ws_.rd_.buf.commit(bytes_transferred); + if(ws_.rd_.fh.mask) + detail::mask_inplace( + buffer_prefix(clamp(ws_.rd_.remain), + ws_.rd_.buf.mutable_data()), ws_.rd_.key); + did_read_ = true; + goto go_inflate; + } + } + // upcall + if(! dispatched_) + { + ws_.stream_.get_io_service().post( + bind_handler(std::move(h_), + ec, bytes_written_)); + } + else + { + h_(ec, bytes_written_); + } +} + +//------------------------------------------------------------------------------ + +template +template< + class DynamicBuffer, + class Handler> class stream::read_op { - int state_ = 0; + Handler h_; stream& ws_; DynamicBuffer& b_; - Handler h_; + std::size_t limit_; + std::size_t bytes_written_ = 0; + int step_ = 0; + bool some_; public: read_op(read_op&&) = default; read_op(read_op const&) = default; template - read_op(DeducedHandler&& h, - stream& ws, DynamicBuffer& b) - : ws_(ws) + read_op( + DeducedHandler&& h, + stream& ws, + DynamicBuffer& b, + std::size_t limit, + bool some) + : h_(std::forward(h)) + , ws_(ws) , b_(b) - , h_(std::forward(h)) + , limit_(limit ? limit : ( + std::numeric_limits::max)()) + , some_(some) { } - void operator()(error_code const& ec, bool fin); + void operator()( + error_code ec = {}, + std::size_t bytes_transferred = 0); friend void* asio_handler_allocate( @@ -1053,7 +723,7 @@ public: bool asio_handler_is_continuation(read_op* op) { using boost::asio::asio_handler_is_continuation; - return op->state_ >= 2 ? true: + return op->step_ >= 2 || asio_handler_is_continuation(std::addressof(op->h_)); } @@ -1062,58 +732,72 @@ public: void asio_handler_invoke(Function&& f, read_op* op) { using boost::asio::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); + asio_handler_invoke( + f, std::addressof(op->h_)); } }; template template void -stream::read_op:: -operator()(error_code const& ec, bool fin) +stream:: +read_op:: +operator()( + error_code ec, + std::size_t bytes_transferred) { - switch(state_) + using beast::detail::clamp; + switch(ec ? 3 : step_) { case 0: - state_ = 1; - goto do_read; + { + if(ws_.failed_) + { + // Reads after failure are aborted + ec = boost::asio::error::operation_aborted; + break; + } + step_ = 1; + do_read: + using buffers_type = typename + DynamicBuffer::mutable_buffers_type; + auto const size = clamp( + ws_.read_size_hint(b_), limit_); + boost::optional mb; + try + { + mb.emplace(b_.prepare(size)); + } + catch(std::length_error const&) + { + ec = error::buffer_overflow; + break; + } + return read_some_op{ + std::move(*this), ws_, *mb}(); + } case 1: - state_ = 2; - BEAST_FALLTHROUGH; - case 2: - if(ec) - goto upcall; - if(fin) - goto upcall; - do_read: - return ws_.async_read_frame( - b_, std::move(*this)); + b_.commit(bytes_transferred); + bytes_written_ += bytes_transferred; + if(some_ || ws_.is_message_done()) + break; + step_ = 2; + goto do_read; + + case 3: + break; } -upcall: - h_(ec); + if(step_ == 0) + return ws_.get_io_service().post( + bind_handler(std::move(h_), + ec, bytes_written_)); + else + h_(ec, bytes_written_); } -template -template -async_return_type< - ReadHandler, void(error_code)> -stream:: -async_read(DynamicBuffer& buffer, ReadHandler&& handler) -{ - static_assert(is_async_stream::value, - "AsyncStream requirements requirements not met"); - static_assert(beast::is_dynamic_buffer::value, - "DynamicBuffer requirements not met"); - async_completion init{handler}; - read_op>{ - init.completion_handler, *this, buffer}( - {}, false); - return init.result.get(); -} +//------------------------------------------------------------------------------ template template @@ -1141,18 +825,532 @@ read(DynamicBuffer& buffer, error_code& ec) "SyncStream requirements not met"); static_assert(beast::is_dynamic_buffer::value, "DynamicBuffer requirements not met"); - for(;;) + do { - auto const fin = read_frame(buffer, ec); + read_some(buffer, 0, ec); if(ec) - break; - if(fin) - break; + return; } + while(! is_message_done()); +} + +template +template +async_return_type +stream:: +async_read(DynamicBuffer& buffer, ReadHandler&& handler) +{ + static_assert(is_async_stream::value, + "AsyncStream requirements requirements not met"); + static_assert(beast::is_dynamic_buffer::value, + "DynamicBuffer requirements not met"); + async_completion< + ReadHandler, void(error_code)> init{handler}; + read_op< + DynamicBuffer, + beast::detail::bound_handler< + handler_type, + decltype(std::placeholders::_1) const&> >{ + beast::bind_handler( + init.completion_handler, + std::placeholders::_1), + *this, + buffer, + 0, + false}(); + return init.result.get(); } //------------------------------------------------------------------------------ +template +template +std::size_t +stream:: +read_some( + DynamicBuffer& buffer, + std::size_t limit) +{ + static_assert(is_sync_stream::value, + "SyncStream requirements not met"); + static_assert(beast::is_dynamic_buffer::value, + "DynamicBuffer requirements not met"); + error_code ec; + auto const bytes_written = + read_some(buffer, limit, ec); + if(ec) + BOOST_THROW_EXCEPTION(system_error{ec}); + return bytes_written; +} + +template +template +std::size_t +stream:: +read_some( + DynamicBuffer& buffer, + std::size_t limit, + error_code& ec) +{ + static_assert(is_sync_stream::value, + "SyncStream requirements not met"); + static_assert(is_dynamic_buffer::value, + "DynamicBuffer requirements not met"); + using beast::detail::clamp; + if(! limit) + limit = (std::numeric_limits::max)(); + auto const size = + clamp(read_size_hint(buffer), limit); + BOOST_ASSERT(size > 0); + boost::optional mb; + try + { + mb.emplace(buffer.prepare(size)); + } + catch(std::length_error const&) + { + ec = error::buffer_overflow; + return 0; + } + auto const bytes_written = read_some(*mb, ec); + buffer.commit(bytes_written); + return bytes_written; +} + +template +template +async_return_type +stream:: +async_read_some( + DynamicBuffer& buffer, + std::size_t limit, + ReadHandler&& handler) +{ + static_assert(is_async_stream::value, + "AsyncStream requirements requirements not met"); + static_assert(is_dynamic_buffer::value, + "DynamicBuffer requirements not met"); + async_completion< + ReadHandler, void(error_code)> init{handler}; + read_op< + DynamicBuffer, + handler_type>{ + init.completion_handler, + *this, + buffer, + limit, + true}({}, 0); + return init.result.get(); +} + +//------------------------------------------------------------------------------ + +template +template +std::size_t +stream:: +read_some( + MutableBufferSequence const& buffers) +{ + static_assert(is_sync_stream::value, + "SyncStream requirements not met"); + static_assert(is_mutable_buffer_sequence< + MutableBufferSequence>::value, + "MutableBufferSequence requirements not met"); + error_code ec; + auto const bytes_written = read_some(buffers, ec); + if(ec) + BOOST_THROW_EXCEPTION(system_error{ec}); + return bytes_written; +} + +template +template +std::size_t +stream:: +read_some( + MutableBufferSequence const& buffers, + error_code& ec) +{ + static_assert(is_sync_stream::value, + "SyncStream requirements not met"); + static_assert(is_mutable_buffer_sequence< + MutableBufferSequence>::value, + "MutableBufferSequence requirements not met"); + using beast::detail::clamp; + using boost::asio::buffer; + using boost::asio::buffer_cast; + using boost::asio::buffer_size; + close_code code{}; + std::size_t bytes_written = 0; +loop: + // See if we need to read a frame header. This + // condition is structured to give the decompressor + // a chance to emit the final empty deflate block + // + if(rd_.remain == 0 && (! rd_.fh.fin || rd_.done)) + { + // Read frame header + while(! parse_fh(rd_.fh, rd_.buf, code)) + { + if(code != close_code::none) + goto do_close; + auto const bytes_transferred = + stream_.read_some( + rd_.buf.prepare(read_size( + rd_.buf, rd_.buf.max_size())), + ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + rd_.buf.commit(bytes_transferred); + } + // Immediately apply the mask to the portion + // of the buffer holding payload data. + if(rd_.fh.len > 0 && rd_.fh.mask) + detail::mask_inplace(buffer_prefix( + clamp(rd_.fh.len), rd_.buf.mutable_data()), + rd_.key); + if(detail::is_control(rd_.fh.op)) + { + // Get control frame payload + auto const cb = buffer_prefix( + clamp(rd_.fh.len), rd_.buf.data()); + auto const len = buffer_size(cb); + BOOST_ASSERT(len == rd_.fh.len); + // Process control frame + if(rd_.fh.op == detail::opcode::ping) + { + ping_data payload; + detail::read_ping(payload, cb); + rd_.buf.consume(len); + if(wr_close_) + { + // Ignore ping when closing + goto loop; + } + if(ctrl_cb_) + ctrl_cb_(frame_type::ping, payload); + detail::frame_streambuf fb; + write_ping(fb, + detail::opcode::pong, payload); + boost::asio::write(stream_, fb.data(), ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + goto loop; + } + else if(rd_.fh.op == detail::opcode::pong) + { + ping_data payload; + detail::read_ping(payload, cb); + rd_.buf.consume(len); + if(ctrl_cb_) + ctrl_cb_(frame_type::pong, payload); + goto loop; + } + BOOST_ASSERT(rd_.fh.op == detail::opcode::close); + { + BOOST_ASSERT(! rd_close_); + rd_close_ = true; + detail::read_close(cr_, cb, code); + if(code != close_code::none) + goto do_close; + rd_.buf.consume(len); + if(ctrl_cb_) + ctrl_cb_(frame_type::close, cr_.reason); + if(! wr_close_) + { + auto cr = cr_; + if(cr.code == close_code::none) + cr.code = close_code::normal; + cr.reason = ""; + detail::frame_streambuf fb; + wr_close_ = true; + write_close< + flat_static_buffer_base>(fb, cr); + boost::asio::write(stream_, fb.data(), ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + } + goto do_close; + } + } + if(rd_.fh.len == 0 && ! rd_.fh.fin) + { + // Empty non-final frame + goto loop; + } + rd_.done = false; + } + else + { + ec.assign(0, ec.category()); + } + if(! pmd_ || ! pmd_->rd_set) + { + if(rd_.buf.size() == 0 && rd_.buf.max_size() > + (std::min)(clamp(rd_.remain), + buffer_size(buffers))) + { + // Fill the read buffer first, otherwise we + // get fewer bytes at the cost of one I/O. + auto const mb = rd_.buf.prepare( + read_size(rd_.buf, rd_.buf.max_size())); + auto const bytes_transferred = + stream_.read_some(mb, ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + if(rd_.fh.mask) + detail::mask_inplace(buffer_prefix( + clamp(rd_.remain), mb), rd_.key); + rd_.buf.commit(bytes_transferred); + } + if(rd_.buf.size() > 0) + { + // Copy from the read buffer. + // The mask was already applied. + auto const bytes_transferred = + buffer_copy(buffers, rd_.buf.data(), + clamp(rd_.remain)); + auto const mb = buffer_prefix( + bytes_transferred, buffers); + rd_.remain -= bytes_transferred; + if(rd_.op == detail::opcode::text) + { + if(! rd_.utf8.write(mb) || + (rd_.remain == 0 && rd_.fh.fin && + ! rd_.utf8.finish())) + { + code = close_code::bad_payload; + goto do_close; + } + } + bytes_written += bytes_transferred; + rd_.size += bytes_transferred; + rd_.buf.consume(bytes_transferred); + } + else + { + // Read into caller's buffer + auto const bytes_transferred = + stream_.read_some(buffer_prefix( + clamp(rd_.remain), buffers), ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + BOOST_ASSERT(bytes_transferred > 0); + auto const mb = buffer_prefix( + bytes_transferred, buffers); + rd_.remain -= bytes_transferred; + if(rd_.fh.mask) + detail::mask_inplace(mb, rd_.key); + if(rd_.op == detail::opcode::text) + { + if(! rd_.utf8.write(mb) || + (rd_.remain == 0 && rd_.fh.fin && + ! rd_.utf8.finish())) + { + code = close_code::bad_payload; + goto do_close; + } + } + bytes_written += bytes_transferred; + rd_.size += bytes_transferred; + } + if(rd_.remain == 0 && rd_.fh.fin) + rd_.done = true; + } + else + { + // Read compressed message frame payload: + // inflate even if rd_.fh.len == 0, otherwise we + // never emit the end-of-stream deflate block. + // + bool did_read = false; + consuming_buffers cb{buffers}; + while(buffer_size(cb) > 0) + { + zlib::z_params zs; + { + auto const out = buffer_front(cb); + zs.next_out = buffer_cast(out); + zs.avail_out = buffer_size(out); + BOOST_ASSERT(zs.avail_out > 0); + } + if(rd_.remain > 0) + { + if(rd_.buf.size() > 0) + { + // use what's there + auto const in = buffer_prefix( + clamp(rd_.remain), buffer_front( + rd_.buf.data())); + zs.avail_in = buffer_size(in); + zs.next_in = buffer_cast(in); + } + else if(! did_read) + { + // read new + auto const bytes_transferred = + stream_.read_some( + rd_.buf.prepare(read_size( + rd_.buf, rd_.buf.max_size())), + ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + BOOST_ASSERT(bytes_transferred > 0); + rd_.buf.commit(bytes_transferred); + if(rd_.fh.mask) + detail::mask_inplace( + buffer_prefix(clamp(rd_.remain), + rd_.buf.mutable_data()), rd_.key); + auto const in = buffer_prefix( + clamp(rd_.remain), buffer_front( + rd_.buf.data())); + zs.avail_in = buffer_size(in); + zs.next_in = buffer_cast(in); + did_read = true; + } + else + { + break; + } + } + else if(rd_.fh.fin) + { + // append the empty block codes + static std::uint8_t constexpr + empty_block[4] = { + 0x00, 0x00, 0xff, 0xff }; + zs.next_in = empty_block; + zs.avail_in = sizeof(empty_block); + pmd_->zi.write(zs, zlib::Flush::sync, ec); + BOOST_ASSERT(! ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + // VFALCO See: + // https://github.com/madler/zlib/issues/280 + BOOST_ASSERT(zs.total_out == 0); + cb.consume(zs.total_out); + rd_.size += zs.total_out; + bytes_written += zs.total_out; + if( + (role_ == role_type::client && + pmd_config_.server_no_context_takeover) || + (role_ == role_type::server && + pmd_config_.client_no_context_takeover)) + pmd_->zi.reset(); + rd_.done = true; + break; + } + else + { + break; + } + pmd_->zi.write(zs, zlib::Flush::sync, ec); + BOOST_ASSERT(ec != zlib::error::end_of_stream); + failed_ = !!ec; + if(failed_) + return bytes_written; + if(rd_msg_max_ && beast::detail::sum_exceeds( + rd_.size, zs.total_out, rd_msg_max_)) + { + code = close_code::too_big; + goto do_close; + } + cb.consume(zs.total_out); + rd_.size += zs.total_out; + rd_.remain -= zs.total_in; + rd_.buf.consume(zs.total_in); + bytes_written += zs.total_out; + } + if(rd_.op == detail::opcode::text) + { + // check utf8 + if(! rd_.utf8.write( + buffer_prefix(bytes_written, buffers)) || ( + rd_.remain == 0 && rd_.fh.fin && + ! rd_.utf8.finish())) + { + code = close_code::bad_payload; + goto do_close; + } + } + } + return bytes_written; +do_close: + if(code != close_code::none) + { + // Fail the connection (per rfc6455) + if(! wr_close_) + { + wr_close_ = true; + detail::frame_streambuf fb; + write_close(fb, code); + boost::asio::write(stream_, fb.data(), ec); + failed_ = !!ec; + if(failed_) + return bytes_written; + } + websocket_helpers::call_teardown(next_layer(), ec); + if(ec == boost::asio::error::eof) + { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec.assign(0, ec.category()); + } + failed_ = !!ec; + if(failed_) + return bytes_written; + ec = error::failed; + failed_ = true; + return bytes_written; + } + if(! ec) + { + websocket_helpers::call_teardown(next_layer(), ec); + if(ec == boost::asio::error::eof) + { + // (See above) + ec.assign(0, ec.category()); + } + } + if(! ec) + ec = error::closed; + failed_ = !!ec; + return bytes_written; +} + +template +template +async_return_type +stream:: +async_read_some( + MutableBufferSequence const& buffers, + ReadHandler&& handler) +{ + static_assert(is_async_stream::value, + "AsyncStream requirements requirements not met"); + static_assert(is_mutable_buffer_sequence< + MutableBufferSequence>::value, + "MutableBufferSequence requirements not met"); + async_completion init{handler}; + read_some_op>{ + init.completion_handler,*this, buffers}( + {}, 0); + return init.result.get(); +} + } // websocket } // beast diff --git a/include/beast/websocket/impl/stream.ipp b/include/beast/websocket/impl/stream.ipp index 005ee98d..65e494dc 100644 --- a/include/beast/websocket/impl/stream.ipp +++ b/include/beast/websocket/impl/stream.ipp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,61 @@ stream:: stream(Args&&... args) : stream_(std::forward(args)...) { + BOOST_ASSERT(rd_.buf.max_size() >= + max_control_frame_size); +} + +template +std::size_t +stream:: +read_size_hint( + std::size_t initial_size) const +{ + using beast::detail::clamp; + // no permessage-deflate + if(! pmd_ || (! rd_.done && ! pmd_->rd_set)) + { + // fresh message + if(rd_.done) + return initial_size; + + if(rd_.fh.fin) + return clamp(rd_.remain); + } + return (std::max)( + initial_size, clamp(rd_.remain)); +} + +template +template +std::size_t +stream:: +read_size_hint( + DynamicBuffer& buffer) const +{ + static_assert(is_dynamic_buffer::value, + "DynamicBuffer requirements not met"); + using beast::detail::clamp; + // no permessage-deflate + if(! pmd_ || (! rd_.done && ! pmd_->rd_set)) + { + // fresh message + if(rd_.done) + return (std::min)( + buffer.max_size(), + (std::max)(+tcp_frame_size, + buffer.capacity() - buffer.size())); + + if(rd_.fh.fin) + { + BOOST_ASSERT(rd_.remain != 0); + return (std::min)( + buffer.max_size(), clamp(rd_.remain)); + } + } + return (std::min)(buffer.max_size(), (std::max)( + (std::max)(+tcp_frame_size, clamp(rd_.remain)), + buffer.capacity() - buffer.size())); } template @@ -78,10 +134,13 @@ open(role_type role) // VFALCO TODO analyze and remove dupe code in reset() role_ = role; failed_ = false; + rd_.remain = 0; rd_.cont = false; + rd_.done = true; + rd_.buf.consume(rd_.buf.size()); rd_close_ = false; wr_close_ = false; - wr_block_ = nullptr; // should be nullptr on close anyway + wr_block_.reset(); ping_data_ = nullptr; // should be nullptr on close anyway wr_.cont = false; @@ -121,7 +180,6 @@ void stream:: close() { - rd_.buf.reset(); wr_.buf.reset(); pmd_.reset(); } @@ -132,35 +190,20 @@ stream:: reset() { failed_ = false; + rd_.remain = 0; rd_.cont = false; + rd_.done = true; + rd_.buf.consume(rd_.buf.size()); rd_close_ = false; wr_close_ = false; wr_.cont = false; - wr_block_ = nullptr; // should be nullptr on close anyway + wr_block_.reset(); ping_data_ = nullptr; // should be nullptr on close anyway stream_.buffer().consume( stream_.buffer().size()); } -// Called before each read frame -template -void -stream:: -rd_begin() -{ - // Maintain the read buffer - if(pmd_) - { - if(! rd_.buf || rd_.buf_size != rd_buf_size_) - { - rd_.buf_size = rd_buf_size_; - rd_.buf = boost::make_unique_noinit< - std::uint8_t[]>(rd_.buf_size); - } - } -} - // Called before each write frame template void @@ -190,6 +233,194 @@ wr_begin() //------------------------------------------------------------------------------ +// Attempt to read a complete frame header. +// Returns `false` if more bytes are needed +template +template +bool +stream:: +parse_fh( + detail::frame_header& fh, + DynamicBuffer& b, + close_code& code) +{ + using boost::asio::buffer; + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + auto const err = + [&](close_code cv) + { + code = cv; + return false; + }; + if(buffer_size(b.data()) < 2) + { + code = close_code::none; + return false; + } + consuming_buffers cb{ + b.data()}; + { + std::uint8_t tmp[2]; + cb.consume(buffer_copy(buffer(tmp), cb)); + std::size_t need; + fh.len = tmp[1] & 0x7f; + switch(fh.len) + { + case 126: need = 2; break; + case 127: need = 8; break; + default: + need = 0; + } + fh.mask = (tmp[1] & 0x80) != 0; + if(fh.mask) + need += 4; + if(buffer_size(cb) < need) + { + code = close_code::none; + return false; + } + fh.op = static_cast< + detail::opcode>(tmp[0] & 0x0f); + fh.fin = (tmp[0] & 0x80) != 0; + fh.rsv1 = (tmp[0] & 0x40) != 0; + fh.rsv2 = (tmp[0] & 0x20) != 0; + fh.rsv3 = (tmp[0] & 0x10) != 0; + } + switch(fh.op) + { + case detail::opcode::binary: + case detail::opcode::text: + if(rd_.cont) + { + // new data frame when continuation expected + return err(close_code::protocol_error); + } + if((fh.rsv1 && ! pmd_) || + fh.rsv2 || fh.rsv3) + { + // reserved bits not cleared + return err(close_code::protocol_error); + } + if(pmd_) + pmd_->rd_set = fh.rsv1; + break; + + case detail::opcode::cont: + if(! rd_.cont) + { + // continuation without an active message + return err(close_code::protocol_error); + } + if(fh.rsv1 || fh.rsv2 || fh.rsv3) + { + // reserved bits not cleared + return err(close_code::protocol_error); + } + break; + + default: + if(detail::is_reserved(fh.op)) + { + // reserved opcode + return err(close_code::protocol_error); + } + if(! fh.fin) + { + // fragmented control message + return err(close_code::protocol_error); + } + if(fh.len > 125) + { + // invalid length for control message + return err(close_code::protocol_error); + } + if(fh.rsv1 || fh.rsv2 || fh.rsv3) + { + // reserved bits not cleared + return err(close_code::protocol_error); + } + break; + } + // unmasked frame from client + if(role_ == role_type::server && ! fh.mask) + return err(close_code::protocol_error); + // masked frame from server + if(role_ == role_type::client && fh.mask) + return err(close_code::protocol_error); + if(detail::is_control(fh.op) && + buffer_size(cb) < fh.len) + { + // Make the entire control frame payload + // get read in before we return `true` + return false; + } + switch(fh.len) + { + case 126: + { + std::uint8_t tmp[2]; + BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp)); + cb.consume(buffer_copy(buffer(tmp), cb)); + fh.len = detail::big_uint16_to_native(&tmp[0]); + // length not canonical + if(fh.len < 126) + return err(close_code::protocol_error); + break; + } + case 127: + { + std::uint8_t tmp[8]; + BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp)); + cb.consume(buffer_copy(buffer(tmp), cb)); + fh.len = detail::big_uint64_to_native(&tmp[0]); + // length not canonical + if(fh.len < 65536) + return err(close_code::protocol_error); + break; + } + } + if(fh.mask) + { + std::uint8_t tmp[4]; + BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp)); + cb.consume(buffer_copy(buffer(tmp), cb)); + fh.key = detail::little_uint32_to_native(&tmp[0]); + detail::prepare_key(rd_.key, fh.key); + } + else + { + // initialize this otherwise operator== breaks + fh.key = 0; + } + if(! detail::is_control(fh.op)) + { + if(fh.op != detail::opcode::cont) + { + rd_.size = 0; + rd_.op = fh.op; + } + else + { + if(rd_.size > (std::numeric_limits< + std::uint64_t>::max)() - fh.len) + return err(close_code::too_big); + } + if(! pmd_ || ! pmd_->rd_set) + { + if(rd_msg_max_ && beast::detail::sum_exceeds( + rd_.size, fh.len, rd_msg_max_)) + return err(close_code::too_big); + } + rd_.cont = ! fh.fin; + rd_.remain = fh.len; + } + b.consume(b.size() - buffer_size(cb)); + code = close_code::none; + return true; +} + // Read fixed frame header from buffer // Requires at least 2 bytes // @@ -357,7 +588,7 @@ read_fh2(detail::frame_header& fh, // initialize this otherwise operator== breaks fh.key = 0; } - if(! is_control(fh.op)) + if(! detail::is_control(fh.op)) { if(fh.op != detail::opcode::cont) { diff --git a/include/beast/websocket/impl/write.ipp b/include/beast/websocket/impl/write.ipp index 911acfda..cef0a334 100644 --- a/include/beast/websocket/impl/write.ipp +++ b/include/beast/websocket/impl/write.ipp @@ -32,7 +32,7 @@ namespace websocket { template template -class stream::write_frame_op +class stream::write_some_op { struct data : op { @@ -46,12 +46,14 @@ class stream::write_frame_op std::uint64_t remain; int step = 0; int entry_state; + token tok; data(Handler& handler, stream& ws_, bool fin_, Buffers const& bs) : ws(ws_) , cb(bs) , fin(fin_) + , tok(ws.t_.unique()) { using boost::asio::asio_handler_is_continuation; cont = asio_handler_is_continuation(std::addressof(handler)); @@ -61,11 +63,11 @@ class stream::write_frame_op handler_ptr d_; public: - write_frame_op(write_frame_op&&) = default; - write_frame_op(write_frame_op const&) = default; + write_some_op(write_some_op&&) = default; + write_some_op(write_some_op const&) = default; template - write_frame_op(DeducedHandler&& h, + write_some_op(DeducedHandler&& h, stream& ws, Args&&... args) : d_(std::forward(h), ws, std::forward(args)...) @@ -83,7 +85,7 @@ public: friend void* asio_handler_allocate( - std::size_t size, write_frame_op* op) + std::size_t size, write_some_op* op) { using boost::asio::asio_handler_allocate; return asio_handler_allocate( @@ -92,7 +94,7 @@ public: friend void asio_handler_deallocate( - void* p, std::size_t size, write_frame_op* op) + void* p, std::size_t size, write_some_op* op) { using boost::asio::asio_handler_deallocate; asio_handler_deallocate( @@ -100,14 +102,14 @@ public: } friend - bool asio_handler_is_continuation(write_frame_op* op) + bool asio_handler_is_continuation(write_some_op* op) { return op->d_->cont; } template friend - void asio_handler_invoke(Function&& f, write_frame_op* op) + void asio_handler_invoke(Function&& f, write_some_op* op) { using boost::asio::asio_handler_invoke; asio_handler_invoke( @@ -119,7 +121,7 @@ template template void stream:: -write_frame_op:: +write_some_op:: operator()(error_code ec, std::size_t bytes_transferred, bool again) { @@ -142,7 +144,7 @@ operator()(error_code ec, d.cont = d.cont || again; if(ec) { - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.ws.failed_ = true; goto upcall; } @@ -212,7 +214,7 @@ loop: //---------------------------------------------------------------------- case do_nomask_nofrag: - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.fh.fin = d.fin; d.fh.len = buffer_size(d.cb); detail::write( @@ -229,7 +231,7 @@ loop: go_nomask_frag: case do_nomask_frag: { - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); auto const n = clamp( d.remain, d.ws.wr_.buf_size); d.remain -= n; @@ -248,8 +250,8 @@ loop: } case do_nomask_frag + 1: - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.wr_block_ = nullptr; + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + d.ws.wr_block_.reset(); d.cb.consume( bytes_transferred - d.fh_buf.size()); d.fh_buf.consume(d.fh_buf.size()); @@ -264,14 +266,14 @@ loop: return d.ws.get_io_service().post( std::move(*this)); } - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; goto go_nomask_frag; //---------------------------------------------------------------------- case do_mask_nofrag: { - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.remain = buffer_size(d.cb); d.fh.fin = d.fin; d.fh.len = d.remain; @@ -317,7 +319,7 @@ loop: go_mask_frag: case do_mask_frag: { - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); auto const n = clamp( d.remain, d.ws.wr_.buf_size); d.remain -= n; @@ -342,8 +344,8 @@ loop: } case do_mask_frag + 1: - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.wr_block_ = nullptr; + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + d.ws.wr_block_.reset(); d.cb.consume( bytes_transferred - d.fh_buf.size()); d.fh_buf.consume(d.fh_buf.size()); @@ -359,7 +361,7 @@ loop: std::move(*this)); return; } - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; goto go_mask_frag; //---------------------------------------------------------------------- @@ -367,7 +369,7 @@ loop: go_deflate: case do_deflate: { - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); auto b = buffer(d.ws.wr_.buf.get(), d.ws.wr_.buf_size); auto const more = detail::deflate( @@ -414,9 +416,9 @@ loop: } case do_deflate + 1: - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.fh_buf.consume(d.fh_buf.size()); - d.ws.wr_block_ = nullptr; + d.ws.wr_block_.reset(); d.fh.op = detail::opcode::cont; d.fh.rsv1 = false; // Allow outgoing control frames to @@ -430,11 +432,11 @@ loop: std::move(*this)); return; } - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; goto go_deflate; case do_deflate + 2: - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); if(d.fh.fin && ( (d.ws.role_ == role_type::client && d.ws.pmd_config_.client_no_context_takeover) || @@ -449,12 +451,12 @@ loop: if(d.ws.wr_block_) { // suspend - BOOST_ASSERT(d.ws.wr_block_ != &d); + BOOST_ASSERT(d.ws.wr_block_ != d.tok); d.step = do_maybe_suspend + 1; d.ws.wr_op_.emplace(std::move(*this)); return; } - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; if(d.ws.failed_ || d.ws.wr_close_) { // call handler @@ -467,7 +469,7 @@ loop: case do_maybe_suspend + 1: BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; + d.ws.wr_block_ = d.tok; d.step = do_maybe_suspend + 2; // The current context is safe but might not be // the same as the one for this operation (since @@ -479,7 +481,7 @@ loop: return; case do_maybe_suspend + 2: - BOOST_ASSERT(d.ws.wr_block_ == &d); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); if(d.ws.failed_ || d.ws.wr_close_) { // call handler @@ -495,8 +497,8 @@ loop: goto upcall; } upcall: - if(d.ws.wr_block_ == &d) - d.ws.wr_block_ = nullptr; + if(d.ws.wr_block_ == d.tok) + d.ws.wr_block_.reset(); d.ws.close_op_.maybe_invoke() || d.ws.rd_op_.maybe_invoke() || d.ws.ping_op_.maybe_invoke(); @@ -515,12 +517,14 @@ class stream::write_op stream& ws; consuming_buffers cb; std::size_t remain; + token tok; data(Handler&, stream& ws_, Buffers const& bs) : ws(ws_) , cb(bs) , remain(boost::asio::buffer_size(cb)) + , tok(ws.t_.unique()) { } }; @@ -604,7 +608,7 @@ operator()(error_code ec) d.step = d.step ? 3 : 2; auto const pb = buffer_prefix(n, d.cb); d.cb.consume(n); - return d.ws.async_write_frame( + return d.ws.async_write_some( fin, pb, std::move(*this)); } @@ -621,7 +625,7 @@ template template void stream:: -write_frame(bool fin, ConstBufferSequence const& buffers) +write_some(bool fin, ConstBufferSequence const& buffers) { static_assert(is_sync_stream::value, "SyncStream requirements not met"); @@ -629,7 +633,7 @@ write_frame(bool fin, ConstBufferSequence const& buffers) ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); error_code ec; - write_frame(fin, buffers, ec); + write_some(fin, buffers, ec); if(ec) BOOST_THROW_EXCEPTION(system_error{ec}); } @@ -638,7 +642,7 @@ template template void stream:: -write_frame(bool fin, +write_some(bool fin, ConstBufferSequence const& buffers, error_code& ec) { static_assert(is_sync_stream::value, @@ -845,7 +849,7 @@ template async_return_type< WriteHandler, void(error_code)> stream:: -async_write_frame(bool fin, +async_write_some(bool fin, ConstBufferSequence const& bs, WriteHandler&& handler) { static_assert(is_async_stream::value, @@ -855,7 +859,7 @@ async_write_frame(bool fin, "ConstBufferSequence requirements not met"); async_completion init{handler}; - write_frame_op>{init.completion_handler, *this, fin, bs}({}, 0, false); return init.result.get(); @@ -891,7 +895,7 @@ write(ConstBufferSequence const& buffers, error_code& ec) static_assert(beast::is_const_buffer_sequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); - write_frame(true, buffers, ec); + write_some(true, buffers, ec); } template diff --git a/include/beast/websocket/stream.hpp b/include/beast/websocket/stream.hpp index 70864135..dd59339e 100644 --- a/include/beast/websocket/stream.hpp +++ b/include/beast/websocket/stream.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -29,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -117,8 +117,30 @@ class stream friend class stream_test; friend class frame_test; + /* The read buffer has to be at least as large + as the largest possible control frame including + the frame header. + */ + static std::size_t constexpr max_control_frame_size = 2 + 8 + 4 + 125; + static std::size_t constexpr tcp_frame_size = 1536; + struct op {}; + // tokens are used to order reads and writes + class token + { + unsigned char id_ = 1; + explicit token(unsigned char id) : id_(id) {} + public: + token() = default; + token(token const&) = default; + operator bool() const { return id_ != 0; } + bool operator==(token const& t) { return id_ == t.id_; } + bool operator!=(token const& t) { return id_ != t.id_; } + token unique() { token t{id_++}; if(id_ == 0) ++id_; return t; } + void reset() { id_ = 0; } + }; + using control_cb_type = std::function; @@ -136,26 +158,24 @@ class stream // struct rd_t { + detail::frame_header fh; // current frame header + detail::prepared_key key; // current stateful mask key + std::uint64_t size; // total size of current message so far + std::uint64_t remain; // message frame bytes left in current frame + detail::frame_streambuf fb; // to write control frames + detail::utf8_checker utf8; // to validate utf8 + + // A small, circular buffer to read frame headers. + // This improves performance by avoiding small reads. + static_buffer<+tcp_frame_size> buf; + // opcode of current message being read detail::opcode op; // `true` if the next frame is a continuation. bool cont; - // Checks that test messages are valid utf8 - detail::utf8_checker utf8; - - // Size of the current message so far. - std::uint64_t size; - - // Size of the read buffer. - // This gets set to the read buffer size option at the - // beginning of sending a message, so that the option can be - // changed mid-send without affecting the current message. - std::size_t buf_size; - - // The read buffer. Used for compression and masking. - std::unique_ptr buf; + bool done; // set when a message is done }; // State information for the message being sent @@ -216,7 +236,7 @@ class stream bool rd_close_; // read close frame bool wr_close_; // sent close frame - op* wr_block_; // op currenly writing + token wr_block_; // op currenly writing ping_data* ping_data_; // where to put the payload detail::pausation rd_op_; // paused read op @@ -237,6 +257,8 @@ class stream // Offer for clients, negotiated result for servers detail::pmd_offer pmd_config_; + token t_; + public: /// The type of the next layer. using next_layer_type = @@ -290,6 +312,8 @@ public: explicit stream(Args&&... args); + //-------------------------------------------------------------------------- + /** Return the `io_service` associated with the stream This function may be used to obtain the `io_service` object @@ -361,6 +385,105 @@ public: return stream_.lowest_layer(); } + //-------------------------------------------------------------------------- + // + // Observers + // + //-------------------------------------------------------------------------- + + /** Returns `true` if the latest message data indicates binary. + + This function informs the caller of whether the last + received message frame represents a message with the + binary opcode. + + If there is no last message frame, the return value is + undefined. + */ + bool + got_binary() const + { + return rd_.op == detail::opcode::binary; + } + + /** Returns `true` if the latest message data indicates text. + + This function informs the caller of whether the last + received message frame represents a message with the + text opcode. + + If there is no last message frame, the return value is + undefined. + */ + bool + got_text() const + { + return ! got_binary(); + } + + /// Returns `true` if the last completed read finished the current message. + bool + is_message_done() const + { + return rd_.done; + } + + /** Returns the close reason received from the peer. + + This is only valid after a read completes with error::closed. + */ + close_reason const& + reason() const + { + return cr_; + } + + /** Returns a suggested maximum buffer size for the next call to read. + + This function returns a reasonable upper limit on the number + of bytes for the size of the buffer passed in the next call + to read. The number is determined by the state of the current + frame and whether or not the permessage-deflate extension is + enabled. + + @param initial_size A size representing the caller's desired + buffer size for when there is no information which may be used + to calculate a more specific value. For example, when reading + the first frame header of a message. + */ + std::size_t + read_size_hint( + std::size_t initial_size = +tcp_frame_size) const; + + /** Returns a suggested maximum buffer size for the next call to read. + + This function returns a reasonable upper limit on the number + of bytes for the size of the buffer passed in the next call + to read. The number is determined by the state of the current + frame and whether or not the permessage-deflate extension is + enabled. + + @param buffer The buffer which will be used for reading. The + implementation will query the buffer to obtain the optimum + size of a subsequent call to `buffer.prepare` based on the + state of the current frame, if any. + */ + template::value>::type +#endif + > + std::size_t + read_size_hint( + DynamicBuffer& buffer) const; + + //-------------------------------------------------------------------------- + // + // Settings + // + //-------------------------------------------------------------------------- + /// Set the permessage-deflate extension options void set_option(permessage_deflate const& o); @@ -445,9 +568,9 @@ public: of the following functions: @li @ref beast::websocket::stream::read - @li @ref beast::websocket::stream::read_frame + @li @ref beast::websocket::stream::read_some @li @ref beast::websocket::stream::async_read - @li @ref beast::websocket::stream::async_read_frame + @li @ref beast::websocket::stream::async_read_some Unlike completion handlers, the callback will be invoked for each control frame during a call to any synchronous @@ -484,44 +607,6 @@ public: ctrl_cb_ = std::move(cb); } - /** Set the read buffer size option. - - Sets the size of the read buffer used by the implementation to - receive frames. The read buffer is needed when permessage-deflate - is used. - - Lowering the size of the buffer can decrease the memory requirements - for each connection, while increasing the size of the buffer can reduce - the number of calls made to the next layer to read data. - - The default setting is 4096. The minimum value is 8. - - @param amount The size of the read buffer. - - @throws std::invalid_argument If the buffer size is less than 8. - - @par Example - Setting the read buffer size. - @code - ws.read_buffer_size(16 * 1024); - @endcode - */ - void - read_buffer_size(std::size_t amount) - { - if(amount < 8) - BOOST_THROW_EXCEPTION(std::invalid_argument{ - "read buffer size underflow"}); - rd_buf_size_ = amount; - } - - /// Returns the read buffer size setting. - std::size_t - read_buffer_size() const - { - return rd_buf_size_; - } - /** Set the maximum incoming message size option. Sets the largest permissible incoming message size. Message @@ -627,45 +712,11 @@ public: return wr_opcode_ == detail::opcode::text; } - /** Returns the close reason received from the peer. - - This is only valid after a read completes with error::closed. - */ - close_reason const& - reason() const - { - return cr_; - } - - /** Returns `true` if the latest message data indicates binary. - - This function informs the caller of whether the last - received message frame represents a message with the - binary opcode. - - If there is no last message frame, the return value is - undefined. - */ - bool - got_binary() - { - return rd_.op == detail::opcode::binary; - } - - /** Returns `true` if the latest message data indicates text. - - This function informs the caller of whether the last - received message frame represents a message with the - text opcode. - - If there is no last message frame, the return value is - undefined. - */ - bool - got_text() - { - return ! got_binary(); - } + //-------------------------------------------------------------------------- + // + // Handshaking + // + //-------------------------------------------------------------------------- /** Read and respond to a WebSocket HTTP Upgrade request. @@ -2480,6 +2531,12 @@ public: RequestDecorator const& decorator, HandshakeHandler&& handler); + //-------------------------------------------------------------------------- + // + // Control Frames + // + //-------------------------------------------------------------------------- + /** Send a WebSocket close frame. This function is used to synchronously send a close frame on @@ -2553,7 +2610,7 @@ public: next layer's `async_write_some` functions, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such as @ref async_ping, - @ref stream::async_write, @ref stream::async_write_frame, or + @ref stream::async_write, @ref stream::async_write_some, or @ref stream::async_close) until this operation completes. If the close reason specifies a close code other than @@ -2769,6 +2826,75 @@ public: #endif async_pong(ping_data const& payload, WriteHandler&& handler); + //-------------------------------------------------------------------------- + // + // Reading + // + //-------------------------------------------------------------------------- + + /** Start an asynchronous operation to read a message frame from the stream. + + This function is used to asynchronously read a single message + frame from the websocket. The function call always returns + immediately. The asynchronous operation will continue until + one of the following conditions is true: + + @li A complete frame is received. + + @li An error occurs on the stream. + + This operation is implemented in terms of one or more calls to the + next layer's `async_read_some` and `async_write_some` functions, + and is known as a composed operation. The program must + ensure that the stream performs no other reads until this operation + completes. + + During reads, the implementation handles control frames as + follows: + + @li The @ref control_callback is invoked when a ping frame + or pong frame is received. + + @li A pong frame is sent when a ping frame is received. + + @li The WebSocket close procedure is started if a close frame + is received. In this case, the operation will eventually + complete with the error set to @ref error::closed. + + Because of the need to handle control frames, read operations + can cause writes to take place. These writes are managed + transparently; callers can still have one active asynchronous + read and asynchronous write operation pending simultaneously + (a user initiated call to @ref async_close counts as a write). + + @param buffer A dynamic buffer to hold the message data after + any masking or decompression has been applied. This object must + remain valid until the handler is called. + + @param handler The handler to be called when the read operation + completes. Copies will be made of the handler as required. The + function signature of the handler must be: + @code + void handler( + error_code const& ec, // Result of operation + bool fin // `true` if this is the last frame + ); + @endcode + Regardless of whether the asynchronous operation completes + immediately or not, the handler will not be invoked from within + this function. Invocation of the handler will be performed in a + manner equivalent to using boost::asio::io_service::post(). + */ + template +#if BEAST_DOXYGEN + void_or_deduced +#else + async_return_type +#endif + async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler); + + //-------------------------------------------------------------------------- + /** Read a message from the stream. This function is used to synchronously read a message from @@ -2897,7 +3023,7 @@ public: function signature of the handler must be: @code void handler( - error_code const& ec // Result of operation + error_code const& ec; // Result of operation ); @endcode Regardless of whether the asynchronous operation completes @@ -2914,24 +3040,30 @@ public: #endif async_read(DynamicBuffer& buffer, ReadHandler&& handler); - /** Read a message frame from the stream. + //-------------------------------------------------------------------------- - This function is used to synchronously read a single message - frame from the stream. The call blocks until one of the following + /** Read some message data from the stream. + + This function is used to synchronously read some message + data from the stream. The call blocks until one of the following is true: - @li A complete frame is received. + @li One or more message octets are placed into the provided buffers. + + @li A final message frame is received. + + @li A close frame is received and processed. @li An error occurs on the stream. - This call is implemented in terms of one or more calls to the - stream's `read_some` and `write_some` operations. + This function is implemented in terms of one or more calls to the + stream's `read_some` operation. During reads, the implementation handles control frames as follows: - @li The @ref control_callback is invoked when a ping frame - or pong frame is received. + @li The @ref control_callback is invoked when any control + frame is received. @li A pong frame is sent when a ping frame is received. @@ -2939,35 +3071,44 @@ public: is received. In this case, the operation will eventually complete with the error set to @ref error::closed. - @param buffer A dynamic buffer to hold the message data after - any masking or decompression has been applied. + @param buffer A dynamic buffer for holding the result - @return `true` if this is the last frame of the message. + @param limit An upper limit on the number of bytes this + function will write. If this value is zero, then a reasonable + size will be chosen automatically. @throws system_error Thrown on failure. + + @return The number of bytes written to the buffers */ template - bool - read_frame(DynamicBuffer& buffer); + std::size_t + read_some( + DynamicBuffer& buffer, + std::size_t limit); - /** Read a message frame from the stream. + /** Read some message data from the stream. - This function is used to synchronously read a single message - frame from the stream. The call blocks until one of the following + This function is used to synchronously read some message + data from the stream. The call blocks until one of the following is true: - @li A complete frame is received. + @li One or more message octets are placed into the provided buffers. + + @li A final message frame is received. + + @li A close frame is received and processed. @li An error occurs on the stream. - This call is implemented in terms of one or more calls to the - stream's `read_some` and `write_some` operations. + This function is implemented in terms of one or more calls to the + stream's `read_some` operation. During reads, the implementation handles control frames as follows: - @li The @ref control_callback is invoked when a ping frame - or pong frame is received. + @li The @ref control_callback is invoked when any control + frame is received. @li A pong frame is sent when a ping frame is received. @@ -2975,39 +3116,54 @@ public: is received. In this case, the operation will eventually complete with the error set to @ref error::closed. - @param buffer A dynamic buffer to hold the message data after - any masking or decompression has been applied. + @param buffer A dynamic buffer for holding the result + + @param limit An upper limit on the number of bytes this + function will write. If this value is zero, then a reasonable + size will be chosen automatically. @param ec Set to indicate what error occurred, if any. - @return `true` if this is the last frame of the message. + @return The number of bytes written to the buffer */ template - bool - read_frame(DynamicBuffer& buffer, error_code& ec); + std::size_t + read_some( + DynamicBuffer& buffer, + std::size_t limit, + error_code& ec); - /** Start an asynchronous operation to read a message frame from the stream. + /** Start an asynchronous operation to read some message data from the stream. - This function is used to asynchronously read a single message - frame from the websocket. The function call always returns - immediately. The asynchronous operation will continue until - one of the following conditions is true: + This function is used to asynchronously read some message + data from the stream. The function call always returns immediately. + The asynchronous operation will continue until one of the following + is true: - @li A complete frame is received. + @li One or more message octets are placed into the provided buffers. + + @li A final message frame is received. + + @li A close frame is received and processed. @li An error occurs on the stream. This operation is implemented in terms of one or more calls to the - next layer's `async_read_some` and `async_write_some` functions, - and is known as a composed operation. The program must - ensure that the stream performs no other reads until this operation - completes. + next layer's `async_read_some` function, and is known as a + composed operation. The program must ensure that the + stream performs no other reads until this operation completes. + + Upon a success, the input area of the stream buffer will + hold the received message payload bytes (which may be zero + in length). The functions @ref got_binary and @ref got_text + may be used to query the stream and determine the type + of the last received message. During reads, the implementation handles control frames as follows: - @li The @ref control_callback is invoked when a ping frame - or pong frame is received. + @li The @ref control_callback is invoked when any control + frame is received. @li A pong frame is sent when a ping frame is received. @@ -3021,31 +3177,216 @@ public: read and asynchronous write operation pending simultaneously (a user initiated call to @ref async_close counts as a write). - @param buffer A dynamic buffer to hold the message data after - any masking or decompression has been applied. This object must - remain valid until the handler is called. + @param buffer A dynamic buffer for holding the result @param handler The handler to be called when the read operation completes. Copies will be made of the handler as required. The function signature of the handler must be: @code void handler( - error_code const& ec, // Result of operation - bool fin // `true` if this is the last frame + error_code const& ec, // Result of operation + std::size_t bytes_transferred // The number of bytes written to the buffer ); @endcode Regardless of whether the asynchronous operation completes immediately or not, the handler will not be invoked from within this function. Invocation of the handler will be performed in a - manner equivalent to using boost::asio::io_service::post(). + manner equivalent to using `boost::asio::io_service::post`. */ template #if BEAST_DOXYGEN void_or_deduced #else - async_return_type + async_return_type< + ReadHandler, void(error_code, std::size_t)> #endif - async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler); + async_read_some( + DynamicBuffer& buffer, + std::size_t limit, + ReadHandler&& handler); + + //-------------------------------------------------------------------------- + + /** Read some message data from the stream. + + This function is used to synchronously read some message + data from the stream. The call blocks until one of the following + is true: + + @li One or more message octets are placed into the provided buffers. + + @li A final message frame is received. + + @li A close frame is received and processed. + + @li An error occurs on the stream. + + This function is implemented in terms of one or more calls to the + stream's `read_some` operation. + + During reads, the implementation handles control frames as + follows: + + @li The @ref control_callback is invoked when any control + frame is received. + + @li A pong frame is sent when a ping frame is received. + + @li The WebSocket close procedure is started if a close frame + is received. In this case, the operation will eventually + complete with the error set to @ref error::closed. + + @param buffers A mutable buffer to hold the message data after + any masking or decompression has been applied. + @param buffers The buffers into which message data will be + placed after any masking or decompresison has been applied. + The implementation will make copies of this object as needed, + but ownership of the underlying memory is not transferred. + The caller is responsible for ensuring that the memory + locations pointed to by the buffers remains valid until the + completion handler is called. + + @throws system_error Thrown on failure. + + @return The number of bytes written to the buffers + */ + template + std::size_t + read_some( + MutableBufferSequence const& buffers); + + /** Read some message data from the stream. + + This function is used to synchronously read some message + data from the stream. The call blocks until one of the + following is true: + + @li One or more message octets are placed into the provided buffers. + + @li A final message frame is received. + + @li A close frame is received and processed. + + @li An error occurs on the stream. + + This operation is implemented in terms of one or more calls to the + stream's `read_some` function. + + During reads, the implementation handles control frames as + follows: + + @li The @ref control_callback is invoked when any control + frame is received. + + @li A pong frame is sent when a ping frame is received. + + @li The WebSocket close procedure is started if a close frame + is received. In this case, the operation will eventually + complete with the error set to @ref error::closed. + + @param buffers A mutable buffer to hold the message data after + any masking or decompression has been applied. + @param buffers The buffers into which message data will be + placed after any masking or decompresison has been applied. + The implementation will make copies of this object as needed, + but ownership of the underlying memory is not transferred. + The caller is responsible for ensuring that the memory + locations pointed to by the buffers remains valid until the + completion handler is called. + + @param ec Set to indicate what error occurred, if any. + + @return The number of bytes written to the buffers + */ + template + std::size_t + read_some( + MutableBufferSequence const& buffers, + error_code& ec); + + /** Start an asynchronous operation to read some message data from the stream. + + This function is used to asynchronously read some message + data from the stream. The function call always returns immediately. + The asynchronous operation will continue until one of the following + is true: + + @li One or more message octets are placed into the provided buffers. + + @li A final message frame is received. + + @li A close frame is received and processed. + + @li An error occurs on the stream. + + This operation is implemented in terms of one or more calls to the + next layer's `async_read_some` function, and is known as a + composed operation. The program must ensure that the + stream performs no other reads until this operation completes. + + Upon a success, the input area of the stream buffer will + hold the received message payload bytes (which may be zero + in length). The functions @ref got_binary and @ref got_text + may be used to query the stream and determine the type + of the last received message. + + During reads, the implementation handles control frames as + follows: + + @li The @ref control_callback is invoked when any control + frame is received. + + @li A pong frame is sent when a ping frame is received. + + @li The WebSocket close procedure is started if a close frame + is received. In this case, the operation will eventually + complete with the error set to @ref error::closed. + + Because of the need to handle control frames, read operations + can cause writes to take place. These writes are managed + transparently; callers can still have one active asynchronous + read and asynchronous write operation pending simultaneously + (a user initiated call to @ref async_close counts as a write). + + @param buffers A mutable buffer to hold the message data after + any masking or decompression has been applied. + @param buffers The buffers into which message data will be + placed after any masking or decompresison has been applied. + The implementation will make copies of this object as needed, + but ownership of the underlying memory is not transferred. + The caller is responsible for ensuring that the memory + locations pointed to by the buffers remains valid until the + completion handler is called. + + @param handler The handler to be called when the read operation + completes. Copies will be made of the handler as required. The + function signature of the handler must be: + @code + void handler( + error_code const& ec, // Result of operation + std::size_t bytes_transferred // The number of bytes written to buffers + ); + @endcode + Regardless of whether the asynchronous operation completes + immediately or not, the handler will not be invoked from within + this function. Invocation of the handler will be performed in a + manner equivalent to using `boost::asio::io_service::post`. + */ + template +#if BEAST_DOXYGEN + void_or_deduced +#else + async_return_type +#endif + async_read_some( + MutableBufferSequence const& buffers, + ReadHandler&& handler); + + //-------------------------------------------------------------------------- + // + // Writing + // + //-------------------------------------------------------------------------- /** Write a message to the stream. @@ -3076,7 +3417,7 @@ public: @throws system_error Thrown on failure. @note This function always sends an entire message. To - send a message in fragments, use @ref write_frame. + send a message in fragments, use @ref write_some. */ template void @@ -3113,7 +3454,7 @@ public: @throws system_error Thrown on failure. @note This function always sends an entire message. To - send a message in fragments, use @ref write_frame. + send a message in fragments, use @ref write_some. */ template void @@ -3134,7 +3475,7 @@ public: to the next layer's `async_write_some` functions, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such as - stream::async_write, stream::async_write_frame, or + stream::async_write, stream::async_write_some, or stream::async_close). The current setting of the @ref binary option controls @@ -3203,7 +3544,7 @@ public: */ template void - write_frame(bool fin, ConstBufferSequence const& buffers); + write_some(bool fin, ConstBufferSequence const& buffers); /** Write partial message data on the stream. @@ -3235,7 +3576,7 @@ public: */ template void - write_frame(bool fin, + write_some(bool fin, ConstBufferSequence const& buffers, error_code& ec); /** Start an asynchronous operation to send a message frame on the stream. @@ -3254,7 +3595,7 @@ public: as a composed operation. The actual payload sent may be transformed as per the WebSocket protocol settings. The program must ensure that the stream performs no other write - operations (such as stream::async_write, stream::async_write_frame, + operations (such as stream::async_write, stream::async_write_some, or stream::async_close). If this is the beginning of a new message, the message opcode @@ -3286,24 +3627,28 @@ public: async_return_type< WriteHandler, void(error_code)> #endif - async_write_frame(bool fin, + async_write_some(bool fin, ConstBufferSequence const& buffers, WriteHandler&& handler); private: - template class accept_op; - template class close_op; - template class handshake_op; - template class ping_op; - template class read_op; - template class read_frame_op; - template class response_op; - template class write_frame_op; - template class write_op; + enum class fail_how + { + code = 1, // send close code, teardown, finish with error::failed + close = 2, // send frame in fb, teardown, finish with error::closed + teardown = 3 // teardown, finish with error::failed + }; + + template class accept_op; + template class close_op; + template class fail_op; + template class handshake_op; + template class ping_op; + template class read_fh_op; + template class read_some_op; + template class read_op; + template class response_op; + template class write_some_op; + template class write_op; static void default_decorate_req(request_type&) {} static void default_decorate_res(response_type&) {} @@ -3311,9 +3656,13 @@ private: void open(role_type role); void close(); void reset(); - void rd_begin(); void wr_begin(); + template + bool + parse_fh(detail::frame_header& fh, + DynamicBuffer& b, close_code& code); + template std::size_t read_fh1(detail::frame_header& fh, @@ -3376,6 +3725,7 @@ private: #include #include +#include #include #include #include diff --git a/test/websocket/doc_snippets.cpp b/test/websocket/doc_snippets.cpp index 654c2ca3..1e27c58e 100644 --- a/test/websocket/doc_snippets.cpp +++ b/test/websocket/doc_snippets.cpp @@ -163,7 +163,7 @@ boost::asio::ip::tcp::socket sock{ios}; //[ws_snippet_16 multi_buffer buffer; for(;;) - if(ws.read_frame(buffer)) + if(ws.read_some(buffer, 0)) break; ws.binary(ws.got_binary()); consuming_buffers cb{buffer.data()}; @@ -172,12 +172,12 @@ boost::asio::ip::tcp::socket sock{ios}; using boost::asio::buffer_size; if(buffer_size(cb) > 512) { - ws.write_frame(false, buffer_prefix(512, cb)); + ws.write_some(false, buffer_prefix(512, cb)); cb.consume(512); } else { - ws.write_frame(true, cb); + ws.write_some(true, cb); break; } } diff --git a/test/websocket/error.cpp b/test/websocket/error.cpp index 3032915d..a82b399b 100644 --- a/test/websocket/error.cpp +++ b/test/websocket/error.cpp @@ -37,6 +37,7 @@ public: check("beast.websocket", error::closed); check("beast.websocket", error::failed); check("beast.websocket", error::handshake_failed); + check("beast.websocket", error::buffer_overflow); } }; diff --git a/test/websocket/stream.cpp b/test/websocket/stream.cpp index a129cd45..fb617aeb 100644 --- a/test/websocket/stream.cpp +++ b/test/websocket/stream.cpp @@ -264,7 +264,7 @@ public: read(stream& ws, DynamicBuffer& buffer) const { - ws.read(buffer); + return ws.read(buffer); } template< @@ -279,10 +279,10 @@ public: template< class NextLayer, class ConstBufferSequence> void - write_frame(stream& ws, bool fin, + write_some(stream& ws, bool fin, ConstBufferSequence const& buffers) const { - ws.write_frame(fin, buffers); + ws.write_some(fin, buffers); } template< @@ -521,11 +521,11 @@ public: template< class NextLayer, class ConstBufferSequence> void - write_frame(stream& ws, bool fin, + write_some(stream& ws, bool fin, ConstBufferSequence const& buffers) const { error_code ec; - ws.async_write_frame(fin, buffers, yield_[ec]); + ws.async_write_some(fin, buffers, yield_[ec]); if(ec) throw system_error{ec}; } @@ -551,7 +551,6 @@ public: ws.auto_fragment(true); ws.write_buffer_size(2048); ws.binary(false); - ws.read_buffer_size(8192); ws.read_message_max(1 * 1024 * 1024); try { @@ -592,7 +591,7 @@ public: { static std::size_t constexpr limit = 200; std::size_t n; - for(n = 0; n < limit; ++n) + for(n = 0; n <= limit; ++n) { test::fail_counter fc{n}; try @@ -904,7 +903,7 @@ public: { static std::size_t constexpr limit = 200; std::size_t n; - for(n = 199; n < limit; ++n) + for(n = 0; n < limit; ++n) { test::fail_counter fc{n}; try @@ -1539,8 +1538,8 @@ public: ws.handshake("localhost", "/", ec); if(! BEAST_EXPECTS(! ec, ec.message())) return; - ws.write_frame(false, sbuf("u")); - ws.write_frame(true, sbuf("v")); + ws.write_some(false, sbuf("u")); + ws.write_some(true, sbuf("v")); multi_buffer b; ws.read(b, ec); if(! BEAST_EXPECTS(! ec, ec.message())) @@ -1562,7 +1561,7 @@ public: ws.handshake("localhost", "/", ec); if(! BEAST_EXPECTS(! ec, ec.message())) break; - ws.async_write_frame(false, + ws.async_write_some(false, boost::asio::null_buffers{}, [&](error_code) { @@ -1573,7 +1572,7 @@ public: break; // // Destruction of the io_service will cause destruction - // of the write_frame_op without invoking the final handler. + // of the write_some_op without invoking the final handler. // break; } @@ -1687,9 +1686,9 @@ public: BEAST_EXPECT(s == "payload"); }); ws.ping("payload"); - c.write_frame(ws, false, sbuf("Hello, ")); - c.write_frame(ws, false, sbuf("")); - c.write_frame(ws, true, sbuf("World!")); + c.write_some(ws, false, sbuf("Hello, ")); + c.write_some(ws, false, sbuf("")); + c.write_some(ws, true, sbuf("World!")); { // receive echoed message multi_buffer db; @@ -1783,13 +1782,13 @@ public: if(! pmd.client_enable) { // expected cont - c.write_frame(ws, false, boost::asio::null_buffers{}); + c.write_some(ws, false, boost::asio::null_buffers{}); c.write_raw(ws, cbuf(0x81, 0x80, 0xff, 0xff, 0xff, 0xff)); restart(error::closed); // message size above 2^64 - c.write_frame(ws, false, cbuf(0x00)); + c.write_some(ws, false, cbuf(0x00)); c.write_raw(ws, cbuf(0x80, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff)); diff --git a/test/websocket/websocket_async_echo_server.hpp b/test/websocket/websocket_async_echo_server.hpp index 9057e3cc..8aa39991 100644 --- a/test/websocket/websocket_async_echo_server.hpp +++ b/test/websocket/websocket_async_echo_server.hpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include diff --git a/test/wstest/main.cpp b/test/wstest/main.cpp index 182a84db..acbcf82e 100644 --- a/test/wstest/main.cpp +++ b/test/wstest/main.cpp @@ -167,7 +167,7 @@ private: { std::geometric_distribution dist{ double(4) / boost::asio::buffer_size(tb_)}; - ws_.async_write_frame(true, + ws_.async_write_some(true, beast::buffer_prefix(dist(rng_), tb_), alloc_.wrap(std::bind( &connection::on_write,