Merge stream_base to stream and tidy

This commit is contained in:
Vinnie Falco
2017-06-24 10:13:17 -07:00
parent fc15f5e0b2
commit cc1c02e236
10 changed files with 623 additions and 664 deletions

View File

@ -2,6 +2,7 @@ Version 67:
* Fix doc example link
* Add http-server-small example
* Merge stream_base to stream and tidy
--------------------------------------------------------------------------------

View File

@ -1,71 +0,0 @@
//
// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BEAST_WEBSOCKET_DETAIL_ENDIAN_HPP
#define BEAST_WEBSOCKET_DETAIL_ENDIAN_HPP
#include <cstdint>
namespace beast {
namespace websocket {
namespace detail {
inline
std::uint16_t
big_uint16_to_native(void const* buf)
{
auto const p = reinterpret_cast<
std::uint8_t const*>(buf);
return (p[0]<<8) + p[1];
}
inline
std::uint64_t
big_uint64_to_native(void const* buf)
{
auto const p = reinterpret_cast<
std::uint8_t const*>(buf);
return
(static_cast<std::uint64_t>(p[0])<<56) +
(static_cast<std::uint64_t>(p[1])<<48) +
(static_cast<std::uint64_t>(p[2])<<40) +
(static_cast<std::uint64_t>(p[3])<<32) +
(static_cast<std::uint64_t>(p[4])<<24) +
(static_cast<std::uint64_t>(p[5])<<16) +
(static_cast<std::uint64_t>(p[6])<< 8) +
p[7];
}
inline
std::uint32_t
little_uint32_to_native(void const* buf)
{
auto const p = reinterpret_cast<
std::uint8_t const*>(buf);
return
p[0] +
(static_cast<std::uint32_t>(p[1])<< 8) +
(static_cast<std::uint32_t>(p[2])<<16) +
(static_cast<std::uint32_t>(p[3])<<24);
}
inline
void
native_to_little_uint32(std::uint32_t v, void* buf)
{
auto p = reinterpret_cast<std::uint8_t*>(buf);
p[0] = v & 0xff;
p[1] = (v >> 8) & 0xff;
p[2] = (v >> 16) & 0xff;
p[3] = (v >> 24) & 0xff;
}
} // detail
} // websocket
} // beast
#endif

View File

@ -9,7 +9,6 @@
#define BEAST_WEBSOCKET_DETAIL_FRAME_HPP
#include <beast/websocket/rfc6455.hpp>
#include <beast/websocket/detail/endian.hpp>
#include <beast/websocket/detail/utf8_checker.hpp>
#include <beast/core/consuming_buffers.hpp>
#include <beast/core/static_buffer.hpp>
@ -23,6 +22,56 @@ namespace beast {
namespace websocket {
namespace detail {
inline
std::uint16_t
big_uint16_to_native(void const* buf)
{
auto const p = reinterpret_cast<
std::uint8_t const*>(buf);
return (p[0]<<8) + p[1];
}
inline
std::uint64_t
big_uint64_to_native(void const* buf)
{
auto const p = reinterpret_cast<
std::uint8_t const*>(buf);
return
(static_cast<std::uint64_t>(p[0])<<56) +
(static_cast<std::uint64_t>(p[1])<<48) +
(static_cast<std::uint64_t>(p[2])<<40) +
(static_cast<std::uint64_t>(p[3])<<32) +
(static_cast<std::uint64_t>(p[4])<<24) +
(static_cast<std::uint64_t>(p[5])<<16) +
(static_cast<std::uint64_t>(p[6])<< 8) +
p[7];
}
inline
std::uint32_t
little_uint32_to_native(void const* buf)
{
auto const p = reinterpret_cast<
std::uint8_t const*>(buf);
return
p[0] +
(static_cast<std::uint32_t>(p[1])<< 8) +
(static_cast<std::uint32_t>(p[2])<<16) +
(static_cast<std::uint32_t>(p[3])<<24);
}
inline
void
native_to_little_uint32(std::uint32_t v, void* buf)
{
auto p = reinterpret_cast<std::uint8_t*>(buf);
p[0] = v & 0xff;
p[1] = (v >> 8) & 0xff;
p[2] = (v >> 16) & 0xff;
p[3] = (v >> 24) & 0xff;
}
/** WebSocket frame header opcodes. */
enum class opcode : std::uint8_t
{

View File

@ -1,566 +0,0 @@
//
// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BEAST_WEBSOCKET_DETAIL_STREAM_BASE_HPP
#define BEAST_WEBSOCKET_DETAIL_STREAM_BASE_HPP
#include <beast/websocket/error.hpp>
#include <beast/websocket/option.hpp>
#include <beast/websocket/rfc6455.hpp>
#include <beast/websocket/detail/frame.hpp>
#include <beast/websocket/detail/mask.hpp>
#include <beast/websocket/detail/pausation.hpp>
#include <beast/websocket/detail/pmd_extension.hpp>
#include <beast/websocket/detail/utf8_checker.hpp>
#include <beast/http/message.hpp>
#include <beast/http/string_body.hpp>
#include <beast/zlib/deflate_stream.hpp>
#include <beast/zlib/inflate_stream.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <cstdint>
#include <memory>
namespace beast {
namespace websocket {
namespace detail {
/// Identifies the role of a WebSockets stream.
enum class role_type
{
/// Stream is operating as a client.
client,
/// Stream is operating as a server.
server
};
//------------------------------------------------------------------------------
struct stream_base
{
protected:
friend class frame_test;
using ping_callback_type =
std::function<void(bool, ping_data const&)>;
struct op {};
detail::maskgen maskgen_; // source of mask keys
std::size_t rd_msg_max_ =
16 * 1024 * 1024; // max message size
bool wr_autofrag_ = true; // auto fragment
std::size_t wr_buf_size_ = 4096; // write buffer size
std::size_t rd_buf_size_ = 4096; // read buffer size
detail::opcode wr_opcode_ =
detail::opcode::text; // outgoing message type
ping_callback_type ping_cb_; // ping callback
role_type role_; // server or client
bool failed_; // the connection failed
bool wr_close_; // sent close frame
op* wr_block_; // op currenly writing
ping_data* ping_data_; // where to put the payload
pausation rd_op_; // parked read op
pausation wr_op_; // parked write op
pausation ping_op_; // parked ping op
close_reason cr_; // set from received close frame
// State information for the message being received
//
struct rd_t
{
// opcode of current message being read
detail::opcode op;
// `true` if the next frame is a continuation.
bool cont;
// Checks that test messages are valid utf8
detail::utf8_checker utf8;
// Size of the current message so far.
std::uint64_t size;
// Size of the read buffer.
// This gets set to the read buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t buf_size;
// The read buffer. Used for compression and masking.
std::unique_ptr<std::uint8_t[]> buf;
};
rd_t rd_;
// State information for the message being sent
//
struct wr_t
{
// `true` if next frame is a continuation,
// `false` if next frame starts a new message
bool cont;
// `true` if this message should be auto-fragmented
// This gets set to the auto-fragment option at the beginning
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool autofrag;
// `true` if this message should be compressed.
// This gets set to the compress option at the beginning of
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool compress;
// Size of the write buffer.
// This gets set to the write buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t buf_size;
// The write buffer. Used for compression and masking.
// The buffer is allocated or reallocated at the beginning of
// sending a message.
std::unique_ptr<std::uint8_t[]> buf;
};
wr_t wr_;
// State information for the permessage-deflate extension
struct pmd_t
{
// `true` if current read message is compressed
bool rd_set;
zlib::deflate_stream zo;
zlib::inflate_stream zi;
};
// If not engaged, then permessage-deflate is not
// enabled for the currently active session.
std::unique_ptr<pmd_t> pmd_;
// Local options for permessage-deflate
permessage_deflate pmd_opts_;
// Offer for clients, negotiated result for servers
pmd_offer pmd_config_;
stream_base() = default;
stream_base(stream_base&&) = default;
stream_base(stream_base const&) = delete;
stream_base& operator=(stream_base&&) = default;
stream_base& operator=(stream_base const&) = delete;
template<class = void>
void
open(role_type role);
template<class = void>
void
close();
template<class DynamicBuffer>
std::size_t
read_fh1(detail::frame_header& fh,
DynamicBuffer& db, close_code& code);
template<class DynamicBuffer>
void
read_fh2(detail::frame_header& fh,
DynamicBuffer& db, close_code& code);
// Called before receiving the first frame of each message
template<class = void>
void
rd_begin();
// Called before sending the first frame of each message
//
template<class = void>
void
wr_begin();
template<class DynamicBuffer>
void
write_close(DynamicBuffer& db, close_reason const& rc);
template<class DynamicBuffer>
void
write_ping(DynamicBuffer& db,
detail::opcode op, ping_data const& data);
};
template<class>
void
stream_base::
open(role_type role)
{
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
failed_ = false;
rd_.cont = false;
wr_close_ = false;
wr_block_ = nullptr; // should be nullptr on close anyway
ping_data_ = nullptr; // should be nullptr on close anyway
wr_.cont = false;
wr_.buf_size = 0;
if(((role_ == role_type::client && pmd_opts_.client_enable) ||
(role_ == role_type::server && pmd_opts_.server_enable)) &&
pmd_config_.accept)
{
pmd_normalize(pmd_config_);
pmd_.reset(new pmd_t);
if(role_ == role_type::client)
{
pmd_->zi.reset(
pmd_config_.server_max_window_bits);
pmd_->zo.reset(
pmd_opts_.compLevel,
pmd_config_.client_max_window_bits,
pmd_opts_.memLevel,
zlib::Strategy::normal);
}
else
{
pmd_->zi.reset(
pmd_config_.client_max_window_bits);
pmd_->zo.reset(
pmd_opts_.compLevel,
pmd_config_.server_max_window_bits,
pmd_opts_.memLevel,
zlib::Strategy::normal);
}
}
}
template<class>
void
stream_base::
close()
{
rd_.buf.reset();
wr_.buf.reset();
pmd_.reset();
}
// Read fixed frame header from buffer
// Requires at least 2 bytes
//
template<class DynamicBuffer>
std::size_t
stream_base::
read_fh1(detail::frame_header& fh,
DynamicBuffer& db, close_code& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
auto const err =
[&](close_code cv)
{
code = cv;
return 0;
};
std::uint8_t b[2];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
std::size_t need;
fh.len = b[1] & 0x7f;
switch(fh.len)
{
case 126: need = 2; break;
case 127: need = 8; break;
default:
need = 0;
}
fh.mask = (b[1] & 0x80) != 0;
if(fh.mask)
need += 4;
fh.op = static_cast<
detail::opcode>(b[0] & 0x0f);
fh.fin = (b[0] & 0x80) != 0;
fh.rsv1 = (b[0] & 0x40) != 0;
fh.rsv2 = (b[0] & 0x20) != 0;
fh.rsv3 = (b[0] & 0x10) != 0;
switch(fh.op)
{
case detail::opcode::binary:
case detail::opcode::text:
if(rd_.cont)
{
// new data frame when continuation expected
return err(close_code::protocol_error);
}
if((fh.rsv1 && ! pmd_) ||
fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
if(pmd_)
pmd_->rd_set = fh.rsv1;
break;
case detail::opcode::cont:
if(! rd_.cont)
{
// continuation without an active message
return err(close_code::protocol_error);
}
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
default:
if(is_reserved(fh.op))
{
// reserved opcode
return err(close_code::protocol_error);
}
if(! fh.fin)
{
// fragmented control message
return err(close_code::protocol_error);
}
if(fh.len > 125)
{
// invalid length for control message
return err(close_code::protocol_error);
}
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
}
// unmasked frame from client
if(role_ == role_type::server && ! fh.mask)
{
code = close_code::protocol_error;
return 0;
}
// masked frame from server
if(role_ == role_type::client && fh.mask)
{
code = close_code::protocol_error;
return 0;
}
code = close_code::none;
return need;
}
// Decode variable frame header from buffer
//
template<class DynamicBuffer>
void
stream_base::
read_fh2(detail::frame_header& fh,
DynamicBuffer& db, close_code& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
using namespace boost::endian;
switch(fh.len)
{
case 126:
{
std::uint8_t b[2];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.len = big_uint16_to_native(&b[0]);
// length not canonical
if(fh.len < 126)
{
code = close_code::protocol_error;
return;
}
break;
}
case 127:
{
std::uint8_t b[8];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.len = big_uint64_to_native(&b[0]);
// length not canonical
if(fh.len < 65536)
{
code = close_code::protocol_error;
return;
}
break;
}
}
if(fh.mask)
{
std::uint8_t b[4];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.key = little_uint32_to_native(&b[0]);
}
else
{
// initialize this otherwise operator== breaks
fh.key = 0;
}
if(! is_control(fh.op))
{
if(fh.op != detail::opcode::cont)
{
rd_.size = 0;
rd_.op = fh.op;
}
else
{
if(rd_.size > (std::numeric_limits<
std::uint64_t>::max)() - fh.len)
{
code = close_code::too_big;
return;
}
}
rd_.cont = ! fh.fin;
}
code = close_code::none;
}
template<class>
void
stream_base::
rd_begin()
{
// Maintain the read buffer
if(pmd_)
{
if(! rd_.buf || rd_.buf_size != rd_buf_size_)
{
rd_.buf_size = rd_buf_size_;
rd_.buf.reset(new std::uint8_t[rd_.buf_size]);
}
}
}
template<class>
void
stream_base::
wr_begin()
{
wr_.autofrag = wr_autofrag_;
wr_.compress = static_cast<bool>(pmd_);
// Maintain the write buffer
if( wr_.compress ||
role_ == detail::role_type::client)
{
if(! wr_.buf || wr_.buf_size != wr_buf_size_)
{
wr_.buf_size = wr_buf_size_;
wr_.buf.reset(new std::uint8_t[wr_.buf_size]);
}
}
else
{
wr_.buf_size = wr_buf_size_;
wr_.buf.reset();
}
}
template<class DynamicBuffer>
void
stream_base::
write_close(DynamicBuffer& db, close_reason const& cr)
{
using namespace boost::endian;
frame_header fh;
fh.op = detail::opcode::close;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
fh.mask = role_ == detail::role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(cr.code != close_code::none)
{
detail::prepared_key key;
if(fh.mask)
detail::prepare_key(key, fh.key);
{
std::uint8_t b[2];
::new(&b[0]) big_uint16_buf_t{
(std::uint16_t)cr.code};
auto d = db.prepare(2);
boost::asio::buffer_copy(d,
boost::asio::buffer(b));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(2);
}
if(! cr.reason.empty())
{
auto d = db.prepare(cr.reason.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffer(
cr.reason.data(), cr.reason.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(cr.reason.size());
}
}
}
template<class DynamicBuffer>
void
stream_base::
write_ping(DynamicBuffer& db,
detail::opcode code, ping_data const& data)
{
frame_header fh;
fh.op = code;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = data.size();
fh.mask = role_ == role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(data.empty())
return;
detail::prepared_key key;
if(fh.mask)
detail::prepare_key(key, fh.key);
auto d = db.prepare(data.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffers_1(
data.data(), data.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(data.size());
}
} // detail
} // websocket
} // beast
#endif

View File

@ -153,7 +153,7 @@ operator()(error_code ec, bool again)
if(! ec)
{
pmd_read(d.ws.pmd_config_, d.res);
d.ws.open(detail::role_type::server);
d.ws.open(role_type::server);
}
break;
}

View File

@ -331,9 +331,9 @@ operator()(error_code ec,
break;
}
if(d.fh.fin && (
(d.ws.role_ == detail::role_type::client &&
(d.ws.role_ == role_type::client &&
d.ws.pmd_config_.server_no_context_takeover) ||
(d.ws.role_ == detail::role_type::server &&
(d.ws.role_ == role_type::server &&
d.ws.pmd_config_.client_no_context_takeover)))
d.ws.pmd_->zi.reset();
d.state = do_frame_done;
@ -933,9 +933,9 @@ read_frame(DynamicBuffer& dynabuf, error_code& ec)
break;
}
if(fh.fin && (
(role_ == detail::role_type::client &&
(role_ == role_type::client &&
pmd_config_.server_no_context_takeover) ||
(role_ == detail::role_type::server &&
(role_ == role_type::server &&
pmd_config_.client_no_context_takeover)))
pmd_->zi.reset();
}

View File

@ -120,7 +120,7 @@ do_accept(http::header<true,
return;
}
pmd_read(pmd_config_, req);
open(detail::role_type::server);
open(role_type::server);
}
template<class NextLayer>
@ -310,7 +310,373 @@ do_response(http::header<false> const& res,
// VFALCO see if offer satisfies pmd_config_,
// return an error if not.
pmd_config_ = offer; // overwrite for now
open(detail::role_type::client);
open(role_type::client);
}
//------------------------------------------------------------------------------
template<class NextLayer>
void
stream<NextLayer>::
open(role_type role)
{
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
failed_ = false;
rd_.cont = false;
wr_close_ = false;
wr_block_ = nullptr; // should be nullptr on close anyway
ping_data_ = nullptr; // should be nullptr on close anyway
wr_.cont = false;
wr_.buf_size = 0;
if(((role_ == role_type::client && pmd_opts_.client_enable) ||
(role_ == role_type::server && pmd_opts_.server_enable)) &&
pmd_config_.accept)
{
pmd_normalize(pmd_config_);
pmd_.reset(new pmd_t);
if(role_ == role_type::client)
{
pmd_->zi.reset(
pmd_config_.server_max_window_bits);
pmd_->zo.reset(
pmd_opts_.compLevel,
pmd_config_.client_max_window_bits,
pmd_opts_.memLevel,
zlib::Strategy::normal);
}
else
{
pmd_->zi.reset(
pmd_config_.client_max_window_bits);
pmd_->zo.reset(
pmd_opts_.compLevel,
pmd_config_.server_max_window_bits,
pmd_opts_.memLevel,
zlib::Strategy::normal);
}
}
}
template<class NextLayer>
void
stream<NextLayer>::
close()
{
rd_.buf.reset();
wr_.buf.reset();
pmd_.reset();
}
// Read fixed frame header from buffer
// Requires at least 2 bytes
//
template<class NextLayer>
template<class DynamicBuffer>
std::size_t
stream<NextLayer>::
read_fh1(detail::frame_header& fh,
DynamicBuffer& db, close_code& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
auto const err =
[&](close_code cv)
{
code = cv;
return 0;
};
std::uint8_t b[2];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
std::size_t need;
fh.len = b[1] & 0x7f;
switch(fh.len)
{
case 126: need = 2; break;
case 127: need = 8; break;
default:
need = 0;
}
fh.mask = (b[1] & 0x80) != 0;
if(fh.mask)
need += 4;
fh.op = static_cast<
detail::opcode>(b[0] & 0x0f);
fh.fin = (b[0] & 0x80) != 0;
fh.rsv1 = (b[0] & 0x40) != 0;
fh.rsv2 = (b[0] & 0x20) != 0;
fh.rsv3 = (b[0] & 0x10) != 0;
switch(fh.op)
{
case detail::opcode::binary:
case detail::opcode::text:
if(rd_.cont)
{
// new data frame when continuation expected
return err(close_code::protocol_error);
}
if((fh.rsv1 && ! pmd_) ||
fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
if(pmd_)
pmd_->rd_set = fh.rsv1;
break;
case detail::opcode::cont:
if(! rd_.cont)
{
// continuation without an active message
return err(close_code::protocol_error);
}
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
default:
if(is_reserved(fh.op))
{
// reserved opcode
return err(close_code::protocol_error);
}
if(! fh.fin)
{
// fragmented control message
return err(close_code::protocol_error);
}
if(fh.len > 125)
{
// invalid length for control message
return err(close_code::protocol_error);
}
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
}
// unmasked frame from client
if(role_ == role_type::server && ! fh.mask)
{
code = close_code::protocol_error;
return 0;
}
// masked frame from server
if(role_ == role_type::client && fh.mask)
{
code = close_code::protocol_error;
return 0;
}
code = close_code::none;
return need;
}
// Decode variable frame header from buffer
//
template<class NextLayer>
template<class DynamicBuffer>
void
stream<NextLayer>::
read_fh2(detail::frame_header& fh,
DynamicBuffer& db, close_code& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
using namespace boost::endian;
switch(fh.len)
{
case 126:
{
std::uint8_t b[2];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.len = detail::big_uint16_to_native(&b[0]);
// length not canonical
if(fh.len < 126)
{
code = close_code::protocol_error;
return;
}
break;
}
case 127:
{
std::uint8_t b[8];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.len = detail::big_uint64_to_native(&b[0]);
// length not canonical
if(fh.len < 65536)
{
code = close_code::protocol_error;
return;
}
break;
}
}
if(fh.mask)
{
std::uint8_t b[4];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.key = detail::little_uint32_to_native(&b[0]);
}
else
{
// initialize this otherwise operator== breaks
fh.key = 0;
}
if(! is_control(fh.op))
{
if(fh.op != detail::opcode::cont)
{
rd_.size = 0;
rd_.op = fh.op;
}
else
{
if(rd_.size > (std::numeric_limits<
std::uint64_t>::max)() - fh.len)
{
code = close_code::too_big;
return;
}
}
rd_.cont = ! fh.fin;
}
code = close_code::none;
}
template<class NextLayer>
void
stream<NextLayer>::
rd_begin()
{
// Maintain the read buffer
if(pmd_)
{
if(! rd_.buf || rd_.buf_size != rd_buf_size_)
{
rd_.buf_size = rd_buf_size_;
rd_.buf.reset(new std::uint8_t[rd_.buf_size]);
}
}
}
template<class NextLayer>
void
stream<NextLayer>::
wr_begin()
{
wr_.autofrag = wr_autofrag_;
wr_.compress = static_cast<bool>(pmd_);
// Maintain the write buffer
if( wr_.compress ||
role_ == role_type::client)
{
if(! wr_.buf || wr_.buf_size != wr_buf_size_)
{
wr_.buf_size = wr_buf_size_;
wr_.buf.reset(new std::uint8_t[wr_.buf_size]);
}
}
else
{
wr_.buf_size = wr_buf_size_;
wr_.buf.reset();
}
}
template<class NextLayer>
template<class DynamicBuffer>
void
stream<NextLayer>::
write_close(DynamicBuffer& db, close_reason const& cr)
{
using namespace boost::endian;
detail::frame_header fh;
fh.op = detail::opcode::close;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
fh.mask = role_ == role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(cr.code != close_code::none)
{
detail::prepared_key key;
if(fh.mask)
detail::prepare_key(key, fh.key);
{
std::uint8_t b[2];
::new(&b[0]) big_uint16_buf_t{
(std::uint16_t)cr.code};
auto d = db.prepare(2);
boost::asio::buffer_copy(d,
boost::asio::buffer(b));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(2);
}
if(! cr.reason.empty())
{
auto d = db.prepare(cr.reason.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffer(
cr.reason.data(), cr.reason.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(cr.reason.size());
}
}
}
template<class NextLayer>
template<class DynamicBuffer>
void
stream<NextLayer>::
write_ping(DynamicBuffer& db,
detail::opcode code, ping_data const& data)
{
detail::frame_header fh;
fh.op = code;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = data.size();
fh.mask = role_ == role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(data.empty())
return;
detail::prepared_key key;
if(fh.mask)
detail::prepare_key(key, fh.key);
auto d = db.prepare(data.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffers_1(
data.data(), data.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(data.size());
}
} // websocket

View File

@ -181,7 +181,7 @@ operator()(error_code ec,
d.fh.op = d.ws.wr_.cont ?
detail::opcode::cont : d.ws.wr_opcode_;
d.fh.mask =
d.ws.role_ == detail::role_type::client;
d.ws.role_ == role_type::client;
// entry_state determines which algorithm
// we will use to send. If we suspend, we
@ -477,9 +477,9 @@ operator()(error_code ec,
case do_deflate + 3:
if(d.fh.fin && (
(d.ws.role_ == detail::role_type::client &&
(d.ws.role_ == role_type::client &&
d.ws.pmd_config_.client_no_context_takeover) ||
(d.ws.role_ == detail::role_type::server &&
(d.ws.role_ == role_type::server &&
d.ws.pmd_config_.server_no_context_takeover)))
d.ws.pmd_->zo.reset();
goto upcall;
@ -707,7 +707,7 @@ write_frame(bool fin,
fh.rsv3 = false;
fh.op = wr_.cont ?
detail::opcode::cont : wr_opcode_;
fh.mask = role_ == detail::role_type::client;
fh.mask = role_ == role_type::client;
auto remain = buffer_size(buffers);
if(wr_.compress)
{
@ -756,9 +756,9 @@ write_frame(bool fin,
fh.rsv1 = false;
}
if(fh.fin && (
(role_ == detail::role_type::client &&
(role_ == role_type::client &&
pmd_config_.client_no_context_takeover) ||
(role_ == detail::role_type::server &&
(role_ == role_type::server &&
pmd_config_.server_no_context_takeover)))
pmd_->zo.reset();
return;

View File

@ -9,9 +9,15 @@
#define BEAST_WEBSOCKET_STREAM_HPP
#include <beast/config.hpp>
#include <beast/websocket/error.hpp>
#include <beast/websocket/option.hpp>
#include <beast/websocket/rfc6455.hpp>
#include <beast/websocket/detail/frame.hpp>
#include <beast/websocket/detail/hybi13.hpp>
#include <beast/websocket/detail/stream_base.hpp>
#include <beast/websocket/detail/mask.hpp>
#include <beast/websocket/detail/pausation.hpp>
#include <beast/websocket/detail/pmd_extension.hpp>
#include <beast/websocket/detail/utf8_checker.hpp>
#include <beast/http/empty_body.hpp>
#include <beast/http/message.hpp>
#include <beast/http/string_body.hpp>
@ -20,6 +26,8 @@
#include <beast/core/buffered_read_stream.hpp>
#include <beast/core/string.hpp>
#include <beast/core/detail/type_traits.hpp>
#include <beast/zlib/deflate_stream.hpp>
#include <beast/zlib/inflate_stream.hpp>
#include <boost/asio.hpp>
#include <algorithm>
#include <cstdint>
@ -29,12 +37,17 @@
namespace beast {
namespace websocket {
namespace detail {
class frame_test;
}
/// The type of object holding HTTP Upgrade requests
using request_type = http::request<http::empty_body>;
/// The type of object holding HTTP Upgrade responses
using response_type = http::response<http::string_body>;
//--------------------------------------------------------------------
/** Provides message-oriented functionality using WebSocket.
@ -80,12 +93,167 @@ using response_type = http::response<http::string_body>;
@b SyncStream
*/
template<class NextLayer>
class stream : public detail::stream_base
class stream
{
friend class detail::frame_test;
friend class stream_test;
buffered_read_stream<NextLayer, multi_buffer> stream_;
/// Identifies the role of a WebSockets stream.
enum class role_type
{
/// Stream is operating as a client.
client,
/// Stream is operating as a server.
server
};
friend class frame_test;
using ping_callback_type =
std::function<void(bool, ping_data const&)>;
struct op {};
detail::maskgen maskgen_; // source of mask keys
std::size_t rd_msg_max_ =
16 * 1024 * 1024; // max message size
bool wr_autofrag_ = true; // auto fragment
std::size_t wr_buf_size_ = 4096; // write buffer size
std::size_t rd_buf_size_ = 4096; // read buffer size
detail::opcode wr_opcode_ =
detail::opcode::text; // outgoing message type
ping_callback_type ping_cb_; // ping callback
role_type role_; // server or client
bool failed_; // the connection failed
bool wr_close_; // sent close frame
op* wr_block_; // op currenly writing
ping_data* ping_data_; // where to put the payload
detail::pausation rd_op_; // parked read op
detail::pausation wr_op_; // parked write op
detail::pausation ping_op_; // parked ping op
close_reason cr_; // set from received close frame
// State information for the message being received
//
struct rd_t
{
// opcode of current message being read
detail::opcode op;
// `true` if the next frame is a continuation.
bool cont;
// Checks that test messages are valid utf8
detail::utf8_checker utf8;
// Size of the current message so far.
std::uint64_t size;
// Size of the read buffer.
// This gets set to the read buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t buf_size;
// The read buffer. Used for compression and masking.
std::unique_ptr<std::uint8_t[]> buf;
};
rd_t rd_;
// State information for the message being sent
//
struct wr_t
{
// `true` if next frame is a continuation,
// `false` if next frame starts a new message
bool cont;
// `true` if this message should be auto-fragmented
// This gets set to the auto-fragment option at the beginning
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool autofrag;
// `true` if this message should be compressed.
// This gets set to the compress option at the beginning of
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool compress;
// Size of the write buffer.
// This gets set to the write buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t buf_size;
// The write buffer. Used for compression and masking.
// The buffer is allocated or reallocated at the beginning of
// sending a message.
std::unique_ptr<std::uint8_t[]> buf;
};
wr_t wr_;
// State information for the permessage-deflate extension
struct pmd_t
{
// `true` if current read message is compressed
bool rd_set;
zlib::deflate_stream zo;
zlib::inflate_stream zi;
};
// If not engaged, then permessage-deflate is not
// enabled for the currently active session.
std::unique_ptr<pmd_t> pmd_;
// Local options for permessage-deflate
permessage_deflate pmd_opts_;
// Offer for clients, negotiated result for servers
detail::pmd_offer pmd_config_;
void
open(role_type role);
void
close();
template<class DynamicBuffer>
std::size_t
read_fh1(detail::frame_header& fh,
DynamicBuffer& db, close_code& code);
template<class DynamicBuffer>
void
read_fh2(detail::frame_header& fh,
DynamicBuffer& db, close_code& code);
// Called before receiving the first frame of each message
void
rd_begin();
// Called before sending the first frame of each message
//
void
wr_begin();
template<class DynamicBuffer>
void
write_close(DynamicBuffer& db, close_reason const& rc);
template<class DynamicBuffer>
void
write_ping(DynamicBuffer& db,
detail::opcode op, ping_data const& data);
public:
/// The type of the next layer.
using next_layer_type =

View File

@ -5,9 +5,11 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <beast/websocket/stream.hpp>
#include <beast/websocket/detail/frame.hpp>
#include <beast/websocket/detail/stream_base.hpp>
#include <beast/unit_test/suite.hpp>
#include <beast/test/pipe_stream.hpp>
#include <beast/test/yield_to.hpp>
#include <initializer_list>
#include <climits>
@ -30,7 +32,9 @@ operator==(frame_header const& lhs, frame_header const& rhs)
lhs.key == rhs.key;
}
class frame_test : public beast::unit_test::suite
class frame_test
: public beast::unit_test::suite
, public test::enable_yield_to
{
public:
void testCloseCodes()
@ -68,9 +72,14 @@ public:
void testFrameHeader()
{
using stream_type =
beast::websocket::stream<test::pipe::stream&>;
test::pipe p{ios_};
// good frame fields
{
role_type role = role_type::client;
stream_type::role_type role =
stream_type::role_type::client;
auto check =
[&](frame_header const& fh)
@ -78,7 +87,7 @@ public:
fh_streambuf b;
write(b, fh);
close_code code;
stream_base stream;
stream_type stream{p.server};
stream.open(role);
detail::frame_header fh1;
auto const n =
@ -99,7 +108,7 @@ public:
check(fh);
role = role_type::server;
role = stream_type::role_type::server;
fh.mask = true;
fh.key = 1;
check(fh);
@ -122,7 +131,7 @@ public:
// bad frame fields
{
role_type role = role_type::client;
stream_type::role_type role = stream_type::role_type::client;
auto check =
[&](frame_header const& fh)
@ -130,9 +139,9 @@ public:
fh_streambuf b;
write(b, fh);
close_code code;
stream_base stream;
stream_type stream{p.server};
stream.open(role);
detail::frame_header fh1;
frame_header fh1;
auto const n =
stream.read_fh1(fh1, b, code);
if(code)
@ -181,7 +190,7 @@ public:
fh.mask = true;
check(fh);
role = role_type::server;
role = stream_type::role_type::server;
fh.mask = false;
check(fh);
}
@ -189,13 +198,16 @@ public:
void bad(std::initializer_list<std::uint8_t> bs)
{
using stream_type =
beast::websocket::stream<test::pipe::stream&>;
using boost::asio::buffer;
using boost::asio::buffer_copy;
static role_type constexpr role = role_type::client;
test::pipe p{ios_};
static stream_type::role_type constexpr role = stream_type::role_type::client;
std::vector<std::uint8_t> v{bs};
fh_streambuf b;
b.commit(buffer_copy(b.prepare(v.size()), buffer(v)));
stream_base stream;
stream_type stream{p.server};
stream.open(role);
close_code code;
detail::frame_header fh;