diff --git a/CHANGELOG.md b/CHANGELOG.md index 48aa6b0e..98804eb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Version 67: * Fix doc example link * Add http-server-small example +* Merge stream_base to stream and tidy -------------------------------------------------------------------------------- diff --git a/include/beast/websocket/detail/endian.hpp b/include/beast/websocket/detail/endian.hpp deleted file mode 100644 index e9276089..00000000 --- a/include/beast/websocket/detail/endian.hpp +++ /dev/null @@ -1,71 +0,0 @@ -// -// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#ifndef BEAST_WEBSOCKET_DETAIL_ENDIAN_HPP -#define BEAST_WEBSOCKET_DETAIL_ENDIAN_HPP - -#include - -namespace beast { -namespace websocket { -namespace detail { - -inline -std::uint16_t -big_uint16_to_native(void const* buf) -{ - auto const p = reinterpret_cast< - std::uint8_t const*>(buf); - return (p[0]<<8) + p[1]; -} - -inline -std::uint64_t -big_uint64_to_native(void const* buf) -{ - auto const p = reinterpret_cast< - std::uint8_t const*>(buf); - return - (static_cast(p[0])<<56) + - (static_cast(p[1])<<48) + - (static_cast(p[2])<<40) + - (static_cast(p[3])<<32) + - (static_cast(p[4])<<24) + - (static_cast(p[5])<<16) + - (static_cast(p[6])<< 8) + - p[7]; -} - -inline -std::uint32_t -little_uint32_to_native(void const* buf) -{ - auto const p = reinterpret_cast< - std::uint8_t const*>(buf); - return - p[0] + - (static_cast(p[1])<< 8) + - (static_cast(p[2])<<16) + - (static_cast(p[3])<<24); -} - -inline -void -native_to_little_uint32(std::uint32_t v, void* buf) -{ - auto p = reinterpret_cast(buf); - p[0] = v & 0xff; - p[1] = (v >> 8) & 0xff; - p[2] = (v >> 16) & 0xff; - p[3] = (v >> 24) & 0xff; -} - -} // detail -} // websocket -} // beast - -#endif diff --git a/include/beast/websocket/detail/frame.hpp b/include/beast/websocket/detail/frame.hpp index 5e3425da..512f817a 100644 --- a/include/beast/websocket/detail/frame.hpp +++ b/include/beast/websocket/detail/frame.hpp @@ -9,7 +9,6 @@ #define BEAST_WEBSOCKET_DETAIL_FRAME_HPP #include -#include #include #include #include @@ -23,6 +22,56 @@ namespace beast { namespace websocket { namespace detail { +inline +std::uint16_t +big_uint16_to_native(void const* buf) +{ + auto const p = reinterpret_cast< + std::uint8_t const*>(buf); + return (p[0]<<8) + p[1]; +} + +inline +std::uint64_t +big_uint64_to_native(void const* buf) +{ + auto const p = reinterpret_cast< + std::uint8_t const*>(buf); + return + (static_cast(p[0])<<56) + + (static_cast(p[1])<<48) + + (static_cast(p[2])<<40) + + (static_cast(p[3])<<32) + + (static_cast(p[4])<<24) + + (static_cast(p[5])<<16) + + (static_cast(p[6])<< 8) + + p[7]; +} + +inline +std::uint32_t +little_uint32_to_native(void const* buf) +{ + auto const p = reinterpret_cast< + std::uint8_t const*>(buf); + return + p[0] + + (static_cast(p[1])<< 8) + + (static_cast(p[2])<<16) + + (static_cast(p[3])<<24); +} + +inline +void +native_to_little_uint32(std::uint32_t v, void* buf) +{ + auto p = reinterpret_cast(buf); + p[0] = v & 0xff; + p[1] = (v >> 8) & 0xff; + p[2] = (v >> 16) & 0xff; + p[3] = (v >> 24) & 0xff; +} + /** WebSocket frame header opcodes. */ enum class opcode : std::uint8_t { diff --git a/include/beast/websocket/detail/stream_base.hpp b/include/beast/websocket/detail/stream_base.hpp deleted file mode 100644 index 9e5c18d1..00000000 --- a/include/beast/websocket/detail/stream_base.hpp +++ /dev/null @@ -1,566 +0,0 @@ -// -// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#ifndef BEAST_WEBSOCKET_DETAIL_STREAM_BASE_HPP -#define BEAST_WEBSOCKET_DETAIL_STREAM_BASE_HPP - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace beast { -namespace websocket { -namespace detail { - -/// Identifies the role of a WebSockets stream. -enum class role_type -{ - /// Stream is operating as a client. - client, - - /// Stream is operating as a server. - server -}; - -//------------------------------------------------------------------------------ - -struct stream_base -{ -protected: - friend class frame_test; - - using ping_callback_type = - std::function; - - struct op {}; - - detail::maskgen maskgen_; // source of mask keys - std::size_t rd_msg_max_ = - 16 * 1024 * 1024; // max message size - bool wr_autofrag_ = true; // auto fragment - std::size_t wr_buf_size_ = 4096; // write buffer size - std::size_t rd_buf_size_ = 4096; // read buffer size - detail::opcode wr_opcode_ = - detail::opcode::text; // outgoing message type - ping_callback_type ping_cb_; // ping callback - role_type role_; // server or client - bool failed_; // the connection failed - - bool wr_close_; // sent close frame - op* wr_block_; // op currenly writing - - ping_data* ping_data_; // where to put the payload - pausation rd_op_; // parked read op - pausation wr_op_; // parked write op - pausation ping_op_; // parked ping op - close_reason cr_; // set from received close frame - - // State information for the message being received - // - struct rd_t - { - // opcode of current message being read - detail::opcode op; - - // `true` if the next frame is a continuation. - bool cont; - - // Checks that test messages are valid utf8 - detail::utf8_checker utf8; - - // Size of the current message so far. - std::uint64_t size; - - // Size of the read buffer. - // This gets set to the read buffer size option at the - // beginning of sending a message, so that the option can be - // changed mid-send without affecting the current message. - std::size_t buf_size; - - // The read buffer. Used for compression and masking. - std::unique_ptr buf; - }; - - rd_t rd_; - - // State information for the message being sent - // - struct wr_t - { - // `true` if next frame is a continuation, - // `false` if next frame starts a new message - bool cont; - - // `true` if this message should be auto-fragmented - // This gets set to the auto-fragment option at the beginning - // of sending a message, so that the option can be changed - // mid-send without affecting the current message. - bool autofrag; - - // `true` if this message should be compressed. - // This gets set to the compress option at the beginning of - // of sending a message, so that the option can be changed - // mid-send without affecting the current message. - bool compress; - - // Size of the write buffer. - // This gets set to the write buffer size option at the - // beginning of sending a message, so that the option can be - // changed mid-send without affecting the current message. - std::size_t buf_size; - - // The write buffer. Used for compression and masking. - // The buffer is allocated or reallocated at the beginning of - // sending a message. - std::unique_ptr buf; - }; - - wr_t wr_; - - // State information for the permessage-deflate extension - struct pmd_t - { - // `true` if current read message is compressed - bool rd_set; - - zlib::deflate_stream zo; - zlib::inflate_stream zi; - }; - - // If not engaged, then permessage-deflate is not - // enabled for the currently active session. - std::unique_ptr pmd_; - - // Local options for permessage-deflate - permessage_deflate pmd_opts_; - - // Offer for clients, negotiated result for servers - pmd_offer pmd_config_; - - stream_base() = default; - stream_base(stream_base&&) = default; - stream_base(stream_base const&) = delete; - stream_base& operator=(stream_base&&) = default; - stream_base& operator=(stream_base const&) = delete; - - template - void - open(role_type role); - - template - void - close(); - - template - std::size_t - read_fh1(detail::frame_header& fh, - DynamicBuffer& db, close_code& code); - - template - void - read_fh2(detail::frame_header& fh, - DynamicBuffer& db, close_code& code); - - // Called before receiving the first frame of each message - template - void - rd_begin(); - - // Called before sending the first frame of each message - // - template - void - wr_begin(); - - template - void - write_close(DynamicBuffer& db, close_reason const& rc); - - template - void - write_ping(DynamicBuffer& db, - detail::opcode op, ping_data const& data); -}; - -template -void -stream_base:: -open(role_type role) -{ - // VFALCO TODO analyze and remove dupe code in reset() - role_ = role; - failed_ = false; - rd_.cont = false; - wr_close_ = false; - wr_block_ = nullptr; // should be nullptr on close anyway - ping_data_ = nullptr; // should be nullptr on close anyway - - wr_.cont = false; - wr_.buf_size = 0; - - if(((role_ == role_type::client && pmd_opts_.client_enable) || - (role_ == role_type::server && pmd_opts_.server_enable)) && - pmd_config_.accept) - { - pmd_normalize(pmd_config_); - pmd_.reset(new pmd_t); - if(role_ == role_type::client) - { - pmd_->zi.reset( - pmd_config_.server_max_window_bits); - pmd_->zo.reset( - pmd_opts_.compLevel, - pmd_config_.client_max_window_bits, - pmd_opts_.memLevel, - zlib::Strategy::normal); - } - else - { - pmd_->zi.reset( - pmd_config_.client_max_window_bits); - pmd_->zo.reset( - pmd_opts_.compLevel, - pmd_config_.server_max_window_bits, - pmd_opts_.memLevel, - zlib::Strategy::normal); - } - } -} - -template -void -stream_base:: -close() -{ - rd_.buf.reset(); - wr_.buf.reset(); - pmd_.reset(); -} - -// Read fixed frame header from buffer -// Requires at least 2 bytes -// -template -std::size_t -stream_base:: -read_fh1(detail::frame_header& fh, - DynamicBuffer& db, close_code& code) -{ - using boost::asio::buffer; - using boost::asio::buffer_copy; - using boost::asio::buffer_size; - auto const err = - [&](close_code cv) - { - code = cv; - return 0; - }; - std::uint8_t b[2]; - BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); - db.consume(buffer_copy(buffer(b), db.data())); - std::size_t need; - fh.len = b[1] & 0x7f; - switch(fh.len) - { - case 126: need = 2; break; - case 127: need = 8; break; - default: - need = 0; - } - fh.mask = (b[1] & 0x80) != 0; - if(fh.mask) - need += 4; - fh.op = static_cast< - detail::opcode>(b[0] & 0x0f); - fh.fin = (b[0] & 0x80) != 0; - fh.rsv1 = (b[0] & 0x40) != 0; - fh.rsv2 = (b[0] & 0x20) != 0; - fh.rsv3 = (b[0] & 0x10) != 0; - switch(fh.op) - { - case detail::opcode::binary: - case detail::opcode::text: - if(rd_.cont) - { - // new data frame when continuation expected - return err(close_code::protocol_error); - } - if((fh.rsv1 && ! pmd_) || - fh.rsv2 || fh.rsv3) - { - // reserved bits not cleared - return err(close_code::protocol_error); - } - if(pmd_) - pmd_->rd_set = fh.rsv1; - break; - - case detail::opcode::cont: - if(! rd_.cont) - { - // continuation without an active message - return err(close_code::protocol_error); - } - if(fh.rsv1 || fh.rsv2 || fh.rsv3) - { - // reserved bits not cleared - return err(close_code::protocol_error); - } - break; - - default: - if(is_reserved(fh.op)) - { - // reserved opcode - return err(close_code::protocol_error); - } - if(! fh.fin) - { - // fragmented control message - return err(close_code::protocol_error); - } - if(fh.len > 125) - { - // invalid length for control message - return err(close_code::protocol_error); - } - if(fh.rsv1 || fh.rsv2 || fh.rsv3) - { - // reserved bits not cleared - return err(close_code::protocol_error); - } - break; - } - // unmasked frame from client - if(role_ == role_type::server && ! fh.mask) - { - code = close_code::protocol_error; - return 0; - } - // masked frame from server - if(role_ == role_type::client && fh.mask) - { - code = close_code::protocol_error; - return 0; - } - code = close_code::none; - return need; -} - -// Decode variable frame header from buffer -// -template -void -stream_base:: -read_fh2(detail::frame_header& fh, - DynamicBuffer& db, close_code& code) -{ - using boost::asio::buffer; - using boost::asio::buffer_copy; - using boost::asio::buffer_size; - using namespace boost::endian; - switch(fh.len) - { - case 126: - { - std::uint8_t b[2]; - BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); - db.consume(buffer_copy(buffer(b), db.data())); - fh.len = big_uint16_to_native(&b[0]); - // length not canonical - if(fh.len < 126) - { - code = close_code::protocol_error; - return; - } - break; - } - case 127: - { - std::uint8_t b[8]; - BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); - db.consume(buffer_copy(buffer(b), db.data())); - fh.len = big_uint64_to_native(&b[0]); - // length not canonical - if(fh.len < 65536) - { - code = close_code::protocol_error; - return; - } - break; - } - } - if(fh.mask) - { - std::uint8_t b[4]; - BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); - db.consume(buffer_copy(buffer(b), db.data())); - fh.key = little_uint32_to_native(&b[0]); - } - else - { - // initialize this otherwise operator== breaks - fh.key = 0; - } - if(! is_control(fh.op)) - { - if(fh.op != detail::opcode::cont) - { - rd_.size = 0; - rd_.op = fh.op; - } - else - { - if(rd_.size > (std::numeric_limits< - std::uint64_t>::max)() - fh.len) - { - code = close_code::too_big; - return; - } - } - rd_.cont = ! fh.fin; - } - code = close_code::none; -} - -template -void -stream_base:: -rd_begin() -{ - // Maintain the read buffer - if(pmd_) - { - if(! rd_.buf || rd_.buf_size != rd_buf_size_) - { - rd_.buf_size = rd_buf_size_; - rd_.buf.reset(new std::uint8_t[rd_.buf_size]); - } - } -} - -template -void -stream_base:: -wr_begin() -{ - wr_.autofrag = wr_autofrag_; - wr_.compress = static_cast(pmd_); - - // Maintain the write buffer - if( wr_.compress || - role_ == detail::role_type::client) - { - if(! wr_.buf || wr_.buf_size != wr_buf_size_) - { - wr_.buf_size = wr_buf_size_; - wr_.buf.reset(new std::uint8_t[wr_.buf_size]); - } - } - else - { - wr_.buf_size = wr_buf_size_; - wr_.buf.reset(); - } -} - -template -void -stream_base:: -write_close(DynamicBuffer& db, close_reason const& cr) -{ - using namespace boost::endian; - frame_header fh; - fh.op = detail::opcode::close; - fh.fin = true; - fh.rsv1 = false; - fh.rsv2 = false; - fh.rsv3 = false; - fh.len = cr.code == close_code::none ? - 0 : 2 + cr.reason.size(); - fh.mask = role_ == detail::role_type::client; - if(fh.mask) - fh.key = maskgen_(); - detail::write(db, fh); - if(cr.code != close_code::none) - { - detail::prepared_key key; - if(fh.mask) - detail::prepare_key(key, fh.key); - { - std::uint8_t b[2]; - ::new(&b[0]) big_uint16_buf_t{ - (std::uint16_t)cr.code}; - auto d = db.prepare(2); - boost::asio::buffer_copy(d, - boost::asio::buffer(b)); - if(fh.mask) - detail::mask_inplace(d, key); - db.commit(2); - } - if(! cr.reason.empty()) - { - auto d = db.prepare(cr.reason.size()); - boost::asio::buffer_copy(d, - boost::asio::const_buffer( - cr.reason.data(), cr.reason.size())); - if(fh.mask) - detail::mask_inplace(d, key); - db.commit(cr.reason.size()); - } - } -} - -template -void -stream_base:: -write_ping(DynamicBuffer& db, - detail::opcode code, ping_data const& data) -{ - frame_header fh; - fh.op = code; - fh.fin = true; - fh.rsv1 = false; - fh.rsv2 = false; - fh.rsv3 = false; - fh.len = data.size(); - fh.mask = role_ == role_type::client; - if(fh.mask) - fh.key = maskgen_(); - detail::write(db, fh); - if(data.empty()) - return; - detail::prepared_key key; - if(fh.mask) - detail::prepare_key(key, fh.key); - auto d = db.prepare(data.size()); - boost::asio::buffer_copy(d, - boost::asio::const_buffers_1( - data.data(), data.size())); - if(fh.mask) - detail::mask_inplace(d, key); - db.commit(data.size()); -} - -} // detail -} // websocket -} // beast - -#endif diff --git a/include/beast/websocket/impl/accept.ipp b/include/beast/websocket/impl/accept.ipp index 86a3a32b..892dcffa 100644 --- a/include/beast/websocket/impl/accept.ipp +++ b/include/beast/websocket/impl/accept.ipp @@ -153,7 +153,7 @@ operator()(error_code ec, bool again) if(! ec) { pmd_read(d.ws.pmd_config_, d.res); - d.ws.open(detail::role_type::server); + d.ws.open(role_type::server); } break; } diff --git a/include/beast/websocket/impl/read.ipp b/include/beast/websocket/impl/read.ipp index e4190585..f3fbde36 100644 --- a/include/beast/websocket/impl/read.ipp +++ b/include/beast/websocket/impl/read.ipp @@ -331,9 +331,9 @@ operator()(error_code ec, break; } if(d.fh.fin && ( - (d.ws.role_ == detail::role_type::client && + (d.ws.role_ == role_type::client && d.ws.pmd_config_.server_no_context_takeover) || - (d.ws.role_ == detail::role_type::server && + (d.ws.role_ == role_type::server && d.ws.pmd_config_.client_no_context_takeover))) d.ws.pmd_->zi.reset(); d.state = do_frame_done; @@ -933,9 +933,9 @@ read_frame(DynamicBuffer& dynabuf, error_code& ec) break; } if(fh.fin && ( - (role_ == detail::role_type::client && + (role_ == role_type::client && pmd_config_.server_no_context_takeover) || - (role_ == detail::role_type::server && + (role_ == role_type::server && pmd_config_.client_no_context_takeover))) pmd_->zi.reset(); } diff --git a/include/beast/websocket/impl/stream.ipp b/include/beast/websocket/impl/stream.ipp index 048f4fc0..0f65f959 100644 --- a/include/beast/websocket/impl/stream.ipp +++ b/include/beast/websocket/impl/stream.ipp @@ -120,7 +120,7 @@ do_accept(http::header @@ -310,7 +310,373 @@ do_response(http::header const& res, // VFALCO see if offer satisfies pmd_config_, // return an error if not. pmd_config_ = offer; // overwrite for now - open(detail::role_type::client); + open(role_type::client); +} + +//------------------------------------------------------------------------------ + +template +void +stream:: +open(role_type role) +{ + // VFALCO TODO analyze and remove dupe code in reset() + role_ = role; + failed_ = false; + rd_.cont = false; + wr_close_ = false; + wr_block_ = nullptr; // should be nullptr on close anyway + ping_data_ = nullptr; // should be nullptr on close anyway + + wr_.cont = false; + wr_.buf_size = 0; + + if(((role_ == role_type::client && pmd_opts_.client_enable) || + (role_ == role_type::server && pmd_opts_.server_enable)) && + pmd_config_.accept) + { + pmd_normalize(pmd_config_); + pmd_.reset(new pmd_t); + if(role_ == role_type::client) + { + pmd_->zi.reset( + pmd_config_.server_max_window_bits); + pmd_->zo.reset( + pmd_opts_.compLevel, + pmd_config_.client_max_window_bits, + pmd_opts_.memLevel, + zlib::Strategy::normal); + } + else + { + pmd_->zi.reset( + pmd_config_.client_max_window_bits); + pmd_->zo.reset( + pmd_opts_.compLevel, + pmd_config_.server_max_window_bits, + pmd_opts_.memLevel, + zlib::Strategy::normal); + } + } +} + +template +void +stream:: +close() +{ + rd_.buf.reset(); + wr_.buf.reset(); + pmd_.reset(); +} + +// Read fixed frame header from buffer +// Requires at least 2 bytes +// +template +template +std::size_t +stream:: +read_fh1(detail::frame_header& fh, + DynamicBuffer& db, close_code& code) +{ + using boost::asio::buffer; + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + auto const err = + [&](close_code cv) + { + code = cv; + return 0; + }; + std::uint8_t b[2]; + BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); + db.consume(buffer_copy(buffer(b), db.data())); + std::size_t need; + fh.len = b[1] & 0x7f; + switch(fh.len) + { + case 126: need = 2; break; + case 127: need = 8; break; + default: + need = 0; + } + fh.mask = (b[1] & 0x80) != 0; + if(fh.mask) + need += 4; + fh.op = static_cast< + detail::opcode>(b[0] & 0x0f); + fh.fin = (b[0] & 0x80) != 0; + fh.rsv1 = (b[0] & 0x40) != 0; + fh.rsv2 = (b[0] & 0x20) != 0; + fh.rsv3 = (b[0] & 0x10) != 0; + switch(fh.op) + { + case detail::opcode::binary: + case detail::opcode::text: + if(rd_.cont) + { + // new data frame when continuation expected + return err(close_code::protocol_error); + } + if((fh.rsv1 && ! pmd_) || + fh.rsv2 || fh.rsv3) + { + // reserved bits not cleared + return err(close_code::protocol_error); + } + if(pmd_) + pmd_->rd_set = fh.rsv1; + break; + + case detail::opcode::cont: + if(! rd_.cont) + { + // continuation without an active message + return err(close_code::protocol_error); + } + if(fh.rsv1 || fh.rsv2 || fh.rsv3) + { + // reserved bits not cleared + return err(close_code::protocol_error); + } + break; + + default: + if(is_reserved(fh.op)) + { + // reserved opcode + return err(close_code::protocol_error); + } + if(! fh.fin) + { + // fragmented control message + return err(close_code::protocol_error); + } + if(fh.len > 125) + { + // invalid length for control message + return err(close_code::protocol_error); + } + if(fh.rsv1 || fh.rsv2 || fh.rsv3) + { + // reserved bits not cleared + return err(close_code::protocol_error); + } + break; + } + // unmasked frame from client + if(role_ == role_type::server && ! fh.mask) + { + code = close_code::protocol_error; + return 0; + } + // masked frame from server + if(role_ == role_type::client && fh.mask) + { + code = close_code::protocol_error; + return 0; + } + code = close_code::none; + return need; +} + +// Decode variable frame header from buffer +// +template +template +void +stream:: +read_fh2(detail::frame_header& fh, + DynamicBuffer& db, close_code& code) +{ + using boost::asio::buffer; + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + using namespace boost::endian; + switch(fh.len) + { + case 126: + { + std::uint8_t b[2]; + BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); + db.consume(buffer_copy(buffer(b), db.data())); + fh.len = detail::big_uint16_to_native(&b[0]); + // length not canonical + if(fh.len < 126) + { + code = close_code::protocol_error; + return; + } + break; + } + case 127: + { + std::uint8_t b[8]; + BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); + db.consume(buffer_copy(buffer(b), db.data())); + fh.len = detail::big_uint64_to_native(&b[0]); + // length not canonical + if(fh.len < 65536) + { + code = close_code::protocol_error; + return; + } + break; + } + } + if(fh.mask) + { + std::uint8_t b[4]; + BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b)); + db.consume(buffer_copy(buffer(b), db.data())); + fh.key = detail::little_uint32_to_native(&b[0]); + } + else + { + // initialize this otherwise operator== breaks + fh.key = 0; + } + if(! is_control(fh.op)) + { + if(fh.op != detail::opcode::cont) + { + rd_.size = 0; + rd_.op = fh.op; + } + else + { + if(rd_.size > (std::numeric_limits< + std::uint64_t>::max)() - fh.len) + { + code = close_code::too_big; + return; + } + } + rd_.cont = ! fh.fin; + } + code = close_code::none; +} + +template +void +stream:: +rd_begin() +{ + // Maintain the read buffer + if(pmd_) + { + if(! rd_.buf || rd_.buf_size != rd_buf_size_) + { + rd_.buf_size = rd_buf_size_; + rd_.buf.reset(new std::uint8_t[rd_.buf_size]); + } + } +} + +template +void +stream:: +wr_begin() +{ + wr_.autofrag = wr_autofrag_; + wr_.compress = static_cast(pmd_); + + // Maintain the write buffer + if( wr_.compress || + role_ == role_type::client) + { + if(! wr_.buf || wr_.buf_size != wr_buf_size_) + { + wr_.buf_size = wr_buf_size_; + wr_.buf.reset(new std::uint8_t[wr_.buf_size]); + } + } + else + { + wr_.buf_size = wr_buf_size_; + wr_.buf.reset(); + } +} + +template +template +void +stream:: +write_close(DynamicBuffer& db, close_reason const& cr) +{ + using namespace boost::endian; + detail::frame_header fh; + fh.op = detail::opcode::close; + fh.fin = true; + fh.rsv1 = false; + fh.rsv2 = false; + fh.rsv3 = false; + fh.len = cr.code == close_code::none ? + 0 : 2 + cr.reason.size(); + fh.mask = role_ == role_type::client; + if(fh.mask) + fh.key = maskgen_(); + detail::write(db, fh); + if(cr.code != close_code::none) + { + detail::prepared_key key; + if(fh.mask) + detail::prepare_key(key, fh.key); + { + std::uint8_t b[2]; + ::new(&b[0]) big_uint16_buf_t{ + (std::uint16_t)cr.code}; + auto d = db.prepare(2); + boost::asio::buffer_copy(d, + boost::asio::buffer(b)); + if(fh.mask) + detail::mask_inplace(d, key); + db.commit(2); + } + if(! cr.reason.empty()) + { + auto d = db.prepare(cr.reason.size()); + boost::asio::buffer_copy(d, + boost::asio::const_buffer( + cr.reason.data(), cr.reason.size())); + if(fh.mask) + detail::mask_inplace(d, key); + db.commit(cr.reason.size()); + } + } +} + +template +template +void +stream:: +write_ping(DynamicBuffer& db, + detail::opcode code, ping_data const& data) +{ + detail::frame_header fh; + fh.op = code; + fh.fin = true; + fh.rsv1 = false; + fh.rsv2 = false; + fh.rsv3 = false; + fh.len = data.size(); + fh.mask = role_ == role_type::client; + if(fh.mask) + fh.key = maskgen_(); + detail::write(db, fh); + if(data.empty()) + return; + detail::prepared_key key; + if(fh.mask) + detail::prepare_key(key, fh.key); + auto d = db.prepare(data.size()); + boost::asio::buffer_copy(d, + boost::asio::const_buffers_1( + data.data(), data.size())); + if(fh.mask) + detail::mask_inplace(d, key); + db.commit(data.size()); } } // websocket diff --git a/include/beast/websocket/impl/write.ipp b/include/beast/websocket/impl/write.ipp index 9bc819ff..58c15c81 100644 --- a/include/beast/websocket/impl/write.ipp +++ b/include/beast/websocket/impl/write.ipp @@ -181,7 +181,7 @@ operator()(error_code ec, d.fh.op = d.ws.wr_.cont ? detail::opcode::cont : d.ws.wr_opcode_; d.fh.mask = - d.ws.role_ == detail::role_type::client; + d.ws.role_ == role_type::client; // entry_state determines which algorithm // we will use to send. If we suspend, we @@ -477,9 +477,9 @@ operator()(error_code ec, case do_deflate + 3: if(d.fh.fin && ( - (d.ws.role_ == detail::role_type::client && + (d.ws.role_ == role_type::client && d.ws.pmd_config_.client_no_context_takeover) || - (d.ws.role_ == detail::role_type::server && + (d.ws.role_ == role_type::server && d.ws.pmd_config_.server_no_context_takeover))) d.ws.pmd_->zo.reset(); goto upcall; @@ -707,7 +707,7 @@ write_frame(bool fin, fh.rsv3 = false; fh.op = wr_.cont ? detail::opcode::cont : wr_opcode_; - fh.mask = role_ == detail::role_type::client; + fh.mask = role_ == role_type::client; auto remain = buffer_size(buffers); if(wr_.compress) { @@ -756,9 +756,9 @@ write_frame(bool fin, fh.rsv1 = false; } if(fh.fin && ( - (role_ == detail::role_type::client && + (role_ == role_type::client && pmd_config_.client_no_context_takeover) || - (role_ == detail::role_type::server && + (role_ == role_type::server && pmd_config_.server_no_context_takeover))) pmd_->zo.reset(); return; diff --git a/include/beast/websocket/stream.hpp b/include/beast/websocket/stream.hpp index e068f6a3..2047edde 100644 --- a/include/beast/websocket/stream.hpp +++ b/include/beast/websocket/stream.hpp @@ -9,9 +9,15 @@ #define BEAST_WEBSOCKET_STREAM_HPP #include +#include #include +#include +#include #include -#include +#include +#include +#include +#include #include #include #include @@ -20,6 +26,8 @@ #include #include #include +#include +#include #include #include #include @@ -29,12 +37,17 @@ namespace beast { namespace websocket { +namespace detail { +class frame_test; +} + /// The type of object holding HTTP Upgrade requests using request_type = http::request; /// The type of object holding HTTP Upgrade responses using response_type = http::response; + //-------------------------------------------------------------------- /** Provides message-oriented functionality using WebSocket. @@ -80,12 +93,167 @@ using response_type = http::response; @b SyncStream */ template -class stream : public detail::stream_base +class stream { + friend class detail::frame_test; friend class stream_test; buffered_read_stream stream_; + /// Identifies the role of a WebSockets stream. + enum class role_type + { + /// Stream is operating as a client. + client, + + /// Stream is operating as a server. + server + }; + + friend class frame_test; + + using ping_callback_type = + std::function; + + struct op {}; + + detail::maskgen maskgen_; // source of mask keys + std::size_t rd_msg_max_ = + 16 * 1024 * 1024; // max message size + bool wr_autofrag_ = true; // auto fragment + std::size_t wr_buf_size_ = 4096; // write buffer size + std::size_t rd_buf_size_ = 4096; // read buffer size + detail::opcode wr_opcode_ = + detail::opcode::text; // outgoing message type + ping_callback_type ping_cb_; // ping callback + role_type role_; // server or client + bool failed_; // the connection failed + + bool wr_close_; // sent close frame + op* wr_block_; // op currenly writing + + ping_data* ping_data_; // where to put the payload + detail::pausation rd_op_; // parked read op + detail::pausation wr_op_; // parked write op + detail::pausation ping_op_; // parked ping op + close_reason cr_; // set from received close frame + + // State information for the message being received + // + struct rd_t + { + // opcode of current message being read + detail::opcode op; + + // `true` if the next frame is a continuation. + bool cont; + + // Checks that test messages are valid utf8 + detail::utf8_checker utf8; + + // Size of the current message so far. + std::uint64_t size; + + // Size of the read buffer. + // This gets set to the read buffer size option at the + // beginning of sending a message, so that the option can be + // changed mid-send without affecting the current message. + std::size_t buf_size; + + // The read buffer. Used for compression and masking. + std::unique_ptr buf; + }; + + rd_t rd_; + + // State information for the message being sent + // + struct wr_t + { + // `true` if next frame is a continuation, + // `false` if next frame starts a new message + bool cont; + + // `true` if this message should be auto-fragmented + // This gets set to the auto-fragment option at the beginning + // of sending a message, so that the option can be changed + // mid-send without affecting the current message. + bool autofrag; + + // `true` if this message should be compressed. + // This gets set to the compress option at the beginning of + // of sending a message, so that the option can be changed + // mid-send without affecting the current message. + bool compress; + + // Size of the write buffer. + // This gets set to the write buffer size option at the + // beginning of sending a message, so that the option can be + // changed mid-send without affecting the current message. + std::size_t buf_size; + + // The write buffer. Used for compression and masking. + // The buffer is allocated or reallocated at the beginning of + // sending a message. + std::unique_ptr buf; + }; + + wr_t wr_; + + // State information for the permessage-deflate extension + struct pmd_t + { + // `true` if current read message is compressed + bool rd_set; + + zlib::deflate_stream zo; + zlib::inflate_stream zi; + }; + + // If not engaged, then permessage-deflate is not + // enabled for the currently active session. + std::unique_ptr pmd_; + + // Local options for permessage-deflate + permessage_deflate pmd_opts_; + + // Offer for clients, negotiated result for servers + detail::pmd_offer pmd_config_; + + void + open(role_type role); + + void + close(); + + template + std::size_t + read_fh1(detail::frame_header& fh, + DynamicBuffer& db, close_code& code); + + template + void + read_fh2(detail::frame_header& fh, + DynamicBuffer& db, close_code& code); + + // Called before receiving the first frame of each message + void + rd_begin(); + + // Called before sending the first frame of each message + // + void + wr_begin(); + + template + void + write_close(DynamicBuffer& db, close_reason const& rc); + + template + void + write_ping(DynamicBuffer& db, + detail::opcode op, ping_data const& data); + public: /// The type of the next layer. using next_layer_type = diff --git a/test/websocket/frame.cpp b/test/websocket/frame.cpp index 6d9592ce..2dc21c4e 100644 --- a/test/websocket/frame.cpp +++ b/test/websocket/frame.cpp @@ -5,9 +5,11 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // +#include #include -#include #include +#include +#include #include #include @@ -30,7 +32,9 @@ operator==(frame_header const& lhs, frame_header const& rhs) lhs.key == rhs.key; } -class frame_test : public beast::unit_test::suite +class frame_test + : public beast::unit_test::suite + , public test::enable_yield_to { public: void testCloseCodes() @@ -68,9 +72,14 @@ public: void testFrameHeader() { + using stream_type = + beast::websocket::stream; + test::pipe p{ios_}; + // good frame fields { - role_type role = role_type::client; + stream_type::role_type role = + stream_type::role_type::client; auto check = [&](frame_header const& fh) @@ -78,7 +87,7 @@ public: fh_streambuf b; write(b, fh); close_code code; - stream_base stream; + stream_type stream{p.server}; stream.open(role); detail::frame_header fh1; auto const n = @@ -99,7 +108,7 @@ public: check(fh); - role = role_type::server; + role = stream_type::role_type::server; fh.mask = true; fh.key = 1; check(fh); @@ -122,7 +131,7 @@ public: // bad frame fields { - role_type role = role_type::client; + stream_type::role_type role = stream_type::role_type::client; auto check = [&](frame_header const& fh) @@ -130,9 +139,9 @@ public: fh_streambuf b; write(b, fh); close_code code; - stream_base stream; + stream_type stream{p.server}; stream.open(role); - detail::frame_header fh1; + frame_header fh1; auto const n = stream.read_fh1(fh1, b, code); if(code) @@ -181,7 +190,7 @@ public: fh.mask = true; check(fh); - role = role_type::server; + role = stream_type::role_type::server; fh.mask = false; check(fh); } @@ -189,13 +198,16 @@ public: void bad(std::initializer_list bs) { + using stream_type = + beast::websocket::stream; using boost::asio::buffer; using boost::asio::buffer_copy; - static role_type constexpr role = role_type::client; + test::pipe p{ios_}; + static stream_type::role_type constexpr role = stream_type::role_type::client; std::vector v{bs}; fh_streambuf b; b.commit(buffer_copy(b.prepare(v.size()), buffer(v))); - stream_base stream; + stream_type stream{p.server}; stream.open(role); close_code code; detail::frame_header fh;