websocket stream uses shared_ptr<impl_type>

This commit is contained in:
Vinnie Falco
2019-01-19 07:24:00 -08:00
parent 944b5dcda7
commit 9a8e22950f
31 changed files with 2243 additions and 2130 deletions

View File

@@ -1,6 +1,7 @@
Version 206
* Clear error codes idiomatically
* websocket stream uses shared_ptr<impl_type>
--------------------------------------------------------------------------------

View File

@@ -82,7 +82,7 @@ these memory regions as type-safe pointer/size pairs, as shown below:
pointer-to-byte. The operating system doesn't care about this, but if
a user wants to send and receive an array of some other type, presenting
it as an array of bytes which supports bitwise operations is unnecessary.
Custom buffer types also permit networking implmentations to provide
Custom buffer types also enable networking implementations to provide
targeted features such as
[@boost:/doc/html/boost_asio/overview/core/buffers.html#boost_asio.overview.core.buffers.buffer_debugging ['buffer debugging]]
without changing the more general vocabulary types.

View File

@@ -8,6 +8,7 @@
//
#include "websocket_session.hpp"
#include <iostream>
websocket_session::
websocket_session(

View File

@@ -111,23 +111,6 @@ BOOST_BEAST_DECL
prng::ref
make_prng(bool secure);
//------------------------------------------------------------------------------
struct stream_prng
{
bool secure_prng_ = true;
std::uint32_t
create_mask()
{
auto g = make_prng(secure_prng_);
for(;;)
if(auto key = g())
return key;
}
};
} // detail
} // websocket
} // beast

View File

@@ -14,6 +14,9 @@
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/websocket/option.hpp>
#include <boost/beast/websocket/role.hpp>
#include <boost/beast/websocket/detail/frame.hpp>
#include <boost/beast/websocket/detail/prng.hpp>
#include <boost/beast/websocket/detail/pmd_extension.hpp>
#include <boost/beast/websocket/detail/prng.hpp>
#include <boost/beast/zlib/deflate_stream.hpp>
@@ -24,14 +27,20 @@
#include <boost/asio/buffer.hpp>
#include <cstdint>
#include <memory>
#include <stdexcept>
namespace boost {
namespace beast {
namespace websocket {
namespace detail {
//------------------------------------------------------------------------------
template<bool deflateSupported>
struct stream_base : stream_prng
struct impl_base;
template<>
struct impl_base<true>
{
// State information for the permessage-deflate extension
struct pmd_type
@@ -67,6 +76,9 @@ struct stream_base : stream_prng
return ! rsv1; // pmd not negotiated
}
// Compress a buffer sequence
// Returns: `true` if more calls are needed
//
template<class ConstBufferSequence>
bool
deflate(
@@ -74,19 +86,105 @@ struct stream_base : stream_prng
buffers_suffix<ConstBufferSequence>& cb,
bool fin,
std::size_t& total_in,
error_code& ec);
error_code& ec)
{
using net::buffer;
BOOST_ASSERT(out.size() >= 6);
auto& zo = this->pmd_->zo;
zlib::z_params zs;
zs.avail_in = 0;
zs.next_in = nullptr;
zs.avail_out = out.size();
zs.next_out = out.data();
for(auto in : beast::buffers_range_ref(cb))
{
zs.avail_in = in.size();
if(zs.avail_in == 0)
continue;
zs.next_in = in.data();
zo.write(zs, zlib::Flush::none, ec);
if(ec)
{
if(ec != zlib::error::need_buffers)
return false;
BOOST_ASSERT(zs.avail_out == 0);
BOOST_ASSERT(zs.total_out == out.size());
ec = {};
break;
}
if(zs.avail_out == 0)
{
BOOST_ASSERT(zs.total_out == out.size());
break;
}
BOOST_ASSERT(zs.avail_in == 0);
}
total_in = zs.total_in;
cb.consume(zs.total_in);
if(zs.avail_out > 0 && fin)
{
auto const remain = net::buffer_size(cb);
if(remain == 0)
{
// Inspired by Mark Adler
// https://github.com/madler/zlib/issues/149
//
// VFALCO We could do this flush twice depending
// on how much space is in the output.
zo.write(zs, zlib::Flush::block, ec);
BOOST_ASSERT(! ec || ec == zlib::error::need_buffers);
if(ec == zlib::error::need_buffers)
ec = {};
if(ec)
return false;
if(zs.avail_out >= 6)
{
zo.write(zs, zlib::Flush::full, ec);
BOOST_ASSERT(! ec);
// remove flush marker
zs.total_out -= 4;
out = buffer(out.data(), zs.total_out);
return false;
}
}
}
ec = {};
out = buffer(out.data(), zs.total_out);
return true;
}
void
do_context_takeover_write(role_type role);
do_context_takeover_write(role_type role)
{
if((role == role_type::client &&
this->pmd_config_.client_no_context_takeover) ||
(role == role_type::server &&
this->pmd_config_.server_no_context_takeover))
{
this->pmd_->zo.reset();
}
}
void
inflate(
zlib::z_params& zs,
zlib::Flush flush,
error_code& ec);
error_code& ec)
{
pmd_->zi.write(zs, flush, ec);
}
void
do_context_takeover_read(role_type role);
do_context_takeover_read(role_type role)
{
if((role == role_type::client &&
pmd_config_.server_no_context_takeover) ||
(role == role_type::server &&
pmd_config_.client_no_context_takeover))
{
pmd_->zi.clear();
}
}
template<class Body, class Allocator>
void
@@ -95,14 +193,15 @@ struct stream_base : stream_prng
http::request<Body,
http::basic_fields<Allocator>> const& req)
{
detail::pmd_offer offer;
detail::pmd_offer unused;
detail::pmd_read(offer, req);
detail::pmd_negotiate(res, unused, offer, pmd_opts_);
pmd_offer offer;
pmd_offer unused;
pmd_read(offer, req);
pmd_negotiate(res, unused, offer, pmd_opts_);
}
void
on_response_pmd(http::response<http::string_body> const& res)
on_response_pmd(
http::response<http::string_body> const& res)
{
detail::pmd_offer offer;
detail::pmd_read(offer, res);
@@ -177,8 +276,7 @@ struct stream_base : stream_prng
pmd_config_.accept)
{
detail::pmd_normalize(pmd_config_);
pmd_.reset(new typename
detail::stream_base<deflateSupported>::pmd_type);
pmd_.reset(::new pmd_type);
if(role == role_type::client)
{
pmd_->zi.reset(
@@ -248,8 +346,10 @@ struct stream_base : stream_prng
}
};
//------------------------------------------------------------------------------
template<>
struct stream_base<false> : stream_prng
struct impl_base<false>
{
// These stubs are for avoiding linking in the zlib
// code when permessage-deflate is not enabled.
@@ -362,7 +462,7 @@ struct stream_base<false> : stream_prng
std::size_t initial_size,
bool rd_done,
std::uint64_t rd_remain,
detail::frame_header const& rd_fh) const
frame_header const& rd_fh) const
{
using beast::detail::clamp;
std::size_t result;
@@ -389,6 +489,24 @@ struct stream_base<false> : stream_prng
}
};
//------------------------------------------------------------------------------
struct stream_base
{
protected:
bool secure_prng_ = true;
std::uint32_t
create_mask()
{
auto g = make_prng(secure_prng_);
for(;;)
if(auto key = g())
return key;
}
};
} // detail
} // websocket
} // beast

View File

@@ -249,6 +249,6 @@ enum class condition
} // beast
} // boost
#include <boost/beast/websocket/impl/error.ipp>
#include <boost/beast/websocket/impl/error.hpp>
#endif

View File

@@ -32,7 +32,8 @@ namespace boost {
namespace beast {
namespace websocket {
// Respond to an upgrade HTTP request
/** Respond to an HTTP request
*/
template<class NextLayer, bool deflateSupported>
template<class Handler>
class stream<NextLayer, deflateSupported>::response_op
@@ -40,39 +41,26 @@ class stream<NextLayer, deflateSupported>::response_op
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{
struct data
{
stream<NextLayer, deflateSupported>& ws;
error_code result;
response_type res;
template<class Body, class Allocator, class Decorator>
data(
stream<NextLayer, deflateSupported>& ws_,
http::request<Body,
http::basic_fields<Allocator>> const& req,
Decorator const& decorator)
: ws(ws_)
, res(ws_.build_response(req, decorator, result))
{
}
};
data& d_;
stream<NextLayer, deflateSupported>& ws_;
error_code result_; // must come before res_
response_type& res_;
public:
template<
class Handler_,
class... Args>
class Body, class Allocator,
class Decorator>
response_op(
Handler_&& h,
stream<NextLayer, deflateSupported>& ws,
Args&&... args)
http::request<Body, http::basic_fields<Allocator>> const& req,
Decorator const& decorator)
: stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
std::forward<Handler_>(h), ws.get_executor())
, d_(beast::allocate_stable<data>(
*this, ws, std::forward<Args>(args)...))
, ws_(ws)
, res_(beast::allocate_stable<response_type>(*this,
ws.build_response(req, decorator, result_)))
{
}
@@ -81,19 +69,18 @@ public:
std::size_t bytes_transferred = 0)
{
boost::ignore_unused(bytes_transferred);
BOOST_ASIO_CORO_REENTER(*this)
{
// Send response
BOOST_ASIO_CORO_YIELD
http::async_write(d_.ws.next_layer(),
d_.res, std::move(*this));
http::async_write(
ws_.next_layer(), res_, std::move(*this));
if(! ec)
ec = d_.result;
ec = result_;
if(! ec)
{
d_.ws.do_pmd_config(d_.res);
d_.ws.open(role_type::server);
ws_.impl_->do_pmd_config(res_);
ws_.impl_->open(role_type::server);
}
this->invoke(ec);
}
@@ -111,36 +98,23 @@ class stream<NextLayer, deflateSupported>::accept_op
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{
struct data
{
stream<NextLayer, deflateSupported>& ws;
Decorator decorator;
http::request_parser<http::empty_body> p;
data(
stream<NextLayer, deflateSupported>& ws_,
Decorator const& decorator_)
: ws(ws_)
, decorator(decorator_)
{
}
};
data& d_;
stream<NextLayer, deflateSupported>& ws_;
http::request_parser<http::empty_body>& p_;
Decorator d_;
public:
template<
class Handler_,
class... Args>
template<class Handler_>
accept_op(
Handler_&& h,
stream<NextLayer, deflateSupported>& ws,
Args&&... args)
Decorator const& decorator)
: stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
std::forward<Handler_>(h), ws.get_executor())
, d_(beast::allocate_stable<data>(
*this, ws, std::forward<Args>(args)...))
, ws_(ws)
, p_(beast::allocate_stable<
http::request_parser<http::empty_body>>(*this))
, d_(decorator)
{
}
@@ -151,19 +125,19 @@ public:
using net::buffer_size;
error_code ec;
auto const mb = beast::detail::dynamic_buffer_prepare(
d_.ws.rd_buf_, buffer_size(buffers), ec,
ws_.impl_->rd_buf, buffer_size(buffers), ec,
error::buffer_overflow);
if(ec)
return (*this)(ec);
d_.ws.rd_buf_.commit(buffer_copy(*mb, buffers));
ws_.impl_->rd_buf.commit(buffer_copy(*mb, buffers));
(*this)(ec);
}
void operator()(
error_code ec = {},
std::size_t bytes_used = 0)
std::size_t bytes_transferred = 0)
{
boost::ignore_unused(bytes_used);
boost::ignore_unused(bytes_transferred);
BOOST_ASIO_CORO_REENTER(*this)
{
@@ -171,15 +145,15 @@ public:
{
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(),
ws_.get_executor(),
beast::bind_front_handler(std::move(*this), ec));
}
else
{
BOOST_ASIO_CORO_YIELD
http::async_read(
d_.ws.next_layer(), d_.ws.rd_buf_,
d_.p, std::move(*this));
ws_.next_layer(), ws_.impl_->rd_buf,
p_, std::move(*this));
if(ec == http::error::end_of_stream)
ec = error::closed;
if(! ec)
@@ -187,9 +161,9 @@ public:
// Arguments from our state must be
// moved to the stack before releasing
// the handler.
auto& ws = d_.ws;
auto const req = d_.p.release();
auto const decorator = d_.decorator;
auto& ws = ws_;
auto const req = p_.release();
auto const decorator = d_;
#if 1
return response_op<Handler>{
this->release_handler(),
@@ -247,7 +221,7 @@ accept(error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
reset();
impl_->reset();
do_accept(&default_decorate_res, ec);
}
@@ -262,7 +236,7 @@ accept_ex(ResponseDecorator const& decorator, error_code& ec)
static_assert(detail::is_response_decorator<
ResponseDecorator>::value,
"ResponseDecorator requirements not met");
reset();
impl_->reset();
do_accept(decorator, ec);
}
@@ -324,13 +298,13 @@ accept(
"ConstBufferSequence requirements not met");
using net::buffer_copy;
using net::buffer_size;
reset();
impl_->reset();
auto const mb = beast::detail::dynamic_buffer_prepare(
rd_buf_, buffer_size(buffers), ec,
impl_->rd_buf, buffer_size(buffers), ec,
error::buffer_overflow);
if(ec)
return;
rd_buf_.commit(buffer_copy(*mb, buffers));
impl_->rd_buf.commit(buffer_copy(*mb, buffers));
do_accept(&default_decorate_res, ec);
}
@@ -356,13 +330,13 @@ accept_ex(
"ConstBufferSequence requirements not met");
using net::buffer_copy;
using net::buffer_size;
reset();
impl_->reset();
auto const mb = beast::detail::dynamic_buffer_prepare(
rd_buf_, buffer_size(buffers), ec,
impl_->rd_buf, buffer_size(buffers), ec,
error::buffer_overflow);
if(ec)
return;
rd_buf_.commit(buffer_copy(*mb, buffers));
impl_->rd_buf.commit(buffer_copy(*mb, buffers));
do_accept(decorator, ec);
}
@@ -415,7 +389,7 @@ accept(
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
reset();
impl_->reset();
do_accept(req, &default_decorate_res, ec);
}
@@ -436,7 +410,7 @@ accept_ex(
static_assert(detail::is_response_decorator<
ResponseDecorator>::value,
"ResponseDecorator requirements not met");
reset();
impl_->reset();
do_accept(req, decorator, ec);
}
@@ -455,7 +429,7 @@ async_accept(
"AsyncStream requirements not met");
BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code));
reset();
impl_->reset();
accept_op<
decltype(&default_decorate_res),
BOOST_ASIO_HANDLER_TYPE(
@@ -484,7 +458,7 @@ async_accept_ex(
"ResponseDecorator requirements not met");
BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code));
reset();
impl_->reset();
accept_op<
ResponseDecorator,
BOOST_ASIO_HANDLER_TYPE(
@@ -515,7 +489,7 @@ async_accept(
"ConstBufferSequence requirements not met");
BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code));
reset();
impl_->reset();
accept_op<
decltype(&default_decorate_res),
BOOST_ASIO_HANDLER_TYPE(
@@ -551,7 +525,7 @@ async_accept_ex(
"ResponseDecorator requirements not met");
BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code));
reset();
impl_->reset();
accept_op<
ResponseDecorator,
BOOST_ASIO_HANDLER_TYPE(
@@ -577,7 +551,7 @@ async_accept(
"AsyncStream requirements not met");
BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code));
reset();
impl_->reset();
using net::asio_handler_is_continuation;
response_op<
BOOST_ASIO_HANDLER_TYPE(
@@ -609,7 +583,7 @@ async_accept_ex(
"ResponseDecorator requirements not met");
BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code));
reset();
impl_->reset();
using net::asio_handler_is_continuation;
response_op<
BOOST_ASIO_HANDLER_TYPE(
@@ -632,7 +606,7 @@ do_accept(
error_code& ec)
{
http::request_parser<http::empty_body> p;
http::read(next_layer(), rd_buf_, p, ec);
http::read(next_layer(), impl_->rd_buf, p, ec);
if(ec == http::error::end_of_stream)
ec = error::closed;
if(ec)
@@ -653,7 +627,7 @@ do_accept(
{
error_code result;
auto const res = build_response(req, decorator, result);
http::write(stream_, res, ec);
http::write(impl_->stream, res, ec);
if(ec)
return;
ec = result;
@@ -663,8 +637,8 @@ do_accept(
// teardown if Connection: close.
return;
}
this->do_pmd_config(res);
open(role_type::server);
impl_->do_pmd_config(res);
impl_->open(role_type::server);
}
} // websocket

View File

@@ -7,10 +7,11 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
#include <boost/beast/websocket/teardown.hpp>
#include <boost/beast/websocket/detail/mask.hpp>
#include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/type_traits.hpp>
@@ -86,42 +87,42 @@ public:
BOOST_ASIO_CORO_REENTER(*this)
{
// Attempt to acquire write block
if(! d_.ws.wr_block_.try_lock(this))
if(! d_.ws.impl_->wr_block.try_lock(this))
{
// Suspend
BOOST_ASIO_CORO_YIELD
d_.ws.paused_close_.emplace(std::move(*this));
d_.ws.impl_->paused_close.emplace(std::move(*this));
// Acquire the write block
d_.ws.wr_block_.lock(this);
d_.ws.impl_->wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(), std::move(*this));
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this));
}
// Make sure the stream is open
if(! d_.ws.check_open(ec))
if(! d_.ws.impl_->check_open(ec))
goto upcall;
// Can't call close twice
BOOST_ASSERT(! d_.ws.wr_close_);
BOOST_ASSERT(! d_.ws.impl_->wr_close);
// Change status to closing
BOOST_ASSERT(d_.ws.status_ == status::open);
d_.ws.status_ = status::closing;
BOOST_ASSERT(d_.ws.impl_->status_ == status::open);
d_.ws.impl_->status_ = status::closing;
// Send close frame
d_.ws.wr_close_ = true;
d_.ws.impl_->wr_close = true;
BOOST_ASIO_CORO_YIELD
net::async_write(d_.ws.stream_,
net::async_write(d_.ws.impl_->stream,
d_.fb.data(), std::move(*this));
if(! d_.ws.check_ok(ec))
if(! d_.ws.impl_->check_ok(ec))
goto upcall;
if(d_.ws.rd_close_)
if(d_.ws.impl_->rd_close)
{
// This happens when the read_op gets a close frame
// at the same time close_op is sending the close frame.
@@ -130,100 +131,100 @@ public:
}
// Maybe suspend
if(! d_.ws.rd_block_.try_lock(this))
if(! d_.ws.impl_->rd_block.try_lock(this))
{
// Suspend
BOOST_ASIO_CORO_YIELD
d_.ws.paused_r_close_.emplace(std::move(*this));
d_.ws.impl_->paused_r_close.emplace(std::move(*this));
// Acquire the read block
d_.ws.rd_block_.lock(this);
d_.ws.impl_->rd_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(), std::move(*this));
BOOST_ASSERT(d_.ws.rd_block_.is_locked(this));
BOOST_ASSERT(d_.ws.impl_->rd_block.is_locked(this));
// Make sure the stream is open
BOOST_ASSERT(d_.ws.status_ != status::open);
BOOST_ASSERT(d_.ws.status_ != status::closed);
if( d_.ws.status_ == status::failed)
BOOST_ASSERT(d_.ws.impl_->status_ != status::open);
BOOST_ASSERT(d_.ws.impl_->status_ != status::closed);
if( d_.ws.impl_->status_ == status::failed)
goto upcall;
BOOST_ASSERT(! d_.ws.rd_close_);
BOOST_ASSERT(! d_.ws.impl_->rd_close);
}
// Drain
if(d_.ws.rd_remain_ > 0)
if(d_.ws.impl_->rd_remain > 0)
goto read_payload;
for(;;)
{
// Read frame header
while(! d_.ws.parse_fh(
d_.ws.rd_fh_, d_.ws.rd_buf_, d_.ev))
d_.ws.impl_->rd_fh, d_.ws.impl_->rd_buf, d_.ev))
{
if(d_.ev)
goto teardown;
BOOST_ASIO_CORO_YIELD
d_.ws.stream_.async_read_some(
d_.ws.rd_buf_.prepare(read_size(d_.ws.rd_buf_,
d_.ws.rd_buf_.max_size())),
d_.ws.impl_->stream.async_read_some(
d_.ws.impl_->rd_buf.prepare(read_size(d_.ws.impl_->rd_buf,
d_.ws.impl_->rd_buf.max_size())),
std::move(*this));
if(! d_.ws.check_ok(ec))
if(! d_.ws.impl_->check_ok(ec))
goto upcall;
d_.ws.rd_buf_.commit(bytes_transferred);
d_.ws.impl_->rd_buf.commit(bytes_transferred);
}
if(detail::is_control(d_.ws.rd_fh_.op))
if(detail::is_control(d_.ws.impl_->rd_fh.op))
{
// Process control frame
if(d_.ws.rd_fh_.op == detail::opcode::close)
if(d_.ws.impl_->rd_fh.op == detail::opcode::close)
{
BOOST_ASSERT(! d_.ws.rd_close_);
d_.ws.rd_close_ = true;
BOOST_ASSERT(! d_.ws.impl_->rd_close);
d_.ws.impl_->rd_close = true;
auto const mb = buffers_prefix(
clamp(d_.ws.rd_fh_.len),
d_.ws.rd_buf_.data());
if(d_.ws.rd_fh_.len > 0 && d_.ws.rd_fh_.mask)
detail::mask_inplace(mb, d_.ws.rd_key_);
detail::read_close(d_.ws.cr_, mb, d_.ev);
clamp(d_.ws.impl_->rd_fh.len),
d_.ws.impl_->rd_buf.data());
if(d_.ws.impl_->rd_fh.len > 0 && d_.ws.impl_->rd_fh.mask)
detail::mask_inplace(mb, d_.ws.impl_->rd_key);
detail::read_close(d_.ws.impl_->cr, mb, d_.ev);
if(d_.ev)
goto teardown;
d_.ws.rd_buf_.consume(clamp(d_.ws.rd_fh_.len));
d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_fh.len));
goto teardown;
}
d_.ws.rd_buf_.consume(clamp(d_.ws.rd_fh_.len));
d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_fh.len));
}
else
{
read_payload:
while(d_.ws.rd_buf_.size() < d_.ws.rd_remain_)
while(d_.ws.impl_->rd_buf.size() < d_.ws.impl_->rd_remain)
{
d_.ws.rd_remain_ -= d_.ws.rd_buf_.size();
d_.ws.rd_buf_.consume(d_.ws.rd_buf_.size());
d_.ws.impl_->rd_remain -= d_.ws.impl_->rd_buf.size();
d_.ws.impl_->rd_buf.consume(d_.ws.impl_->rd_buf.size());
BOOST_ASIO_CORO_YIELD
d_.ws.stream_.async_read_some(
d_.ws.rd_buf_.prepare(read_size(d_.ws.rd_buf_,
d_.ws.rd_buf_.max_size())),
d_.ws.impl_->stream.async_read_some(
d_.ws.impl_->rd_buf.prepare(read_size(d_.ws.impl_->rd_buf,
d_.ws.impl_->rd_buf.max_size())),
std::move(*this));
if(! d_.ws.check_ok(ec))
if(! d_.ws.impl_->check_ok(ec))
goto upcall;
d_.ws.rd_buf_.commit(bytes_transferred);
d_.ws.impl_->rd_buf.commit(bytes_transferred);
}
BOOST_ASSERT(d_.ws.rd_buf_.size() >= d_.ws.rd_remain_);
d_.ws.rd_buf_.consume(clamp(d_.ws.rd_remain_));
d_.ws.rd_remain_ = 0;
BOOST_ASSERT(d_.ws.impl_->rd_buf.size() >= d_.ws.impl_->rd_remain);
d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_remain));
d_.ws.impl_->rd_remain = 0;
}
}
teardown:
// Teardown
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this));
using beast::websocket::async_teardown;
BOOST_ASIO_CORO_YIELD
async_teardown(d_.ws.role_,
d_.ws.stream_, std::move(*this));
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
async_teardown(d_.ws.impl_->role,
d_.ws.impl_->stream, std::move(*this));
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this));
if(ec == net::error::eof)
{
// Rationale:
@@ -233,19 +234,19 @@ public:
if(! ec)
ec = d_.ev;
if(ec)
d_.ws.status_ = status::failed;
d_.ws.impl_->status_ = status::failed;
else
d_.ws.status_ = status::closed;
d_.ws.close();
d_.ws.impl_->status_ = status::closed;
d_.ws.impl_->close();
upcall:
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
d_.ws.wr_block_.unlock(this);
if(d_.ws.rd_block_.try_unlock(this))
d_.ws.paused_r_rd_.maybe_invoke();
d_.ws.paused_rd_.maybe_invoke() ||
d_.ws.paused_ping_.maybe_invoke() ||
d_.ws.paused_wr_.maybe_invoke();
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this));
d_.ws.impl_->wr_block.unlock(this);
if(d_.ws.impl_->rd_block.try_unlock(this))
d_.ws.impl_->paused_r_rd.maybe_invoke();
d_.ws.impl_->paused_rd.maybe_invoke() ||
d_.ws.impl_->paused_ping.maybe_invoke() ||
d_.ws.impl_->paused_wr.maybe_invoke();
if(! d_.cont)
{
BOOST_ASIO_CORO_YIELD
@@ -281,90 +282,92 @@ close(close_reason const& cr, error_code& ec)
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
using beast::detail::clamp;
ec.assign(0, ec.category());
ec = {};
// Make sure the stream is open
if(! check_open(ec))
if(! impl_->check_open(ec))
return;
// If rd_close_ is set then we already sent a close
BOOST_ASSERT(! rd_close_);
BOOST_ASSERT(! wr_close_);
wr_close_ = true;
BOOST_ASSERT(! impl_->rd_close);
BOOST_ASSERT(! impl_->wr_close);
impl_->wr_close = true;
{
detail::frame_buffer fb;
write_close<flat_static_buffer_base>(fb, cr);
net::write(stream_, fb.data(), ec);
net::write(impl_->stream, fb.data(), ec);
}
if(! check_ok(ec))
if(! impl_->check_ok(ec))
return;
status_ = status::closing;
impl_->status_ = status::closing;
error_code result;
// Drain the connection
if(rd_remain_ > 0)
if(impl_->rd_remain > 0)
goto read_payload;
for(;;)
{
// Read frame header
while(! parse_fh(rd_fh_, rd_buf_, result))
while(! parse_fh(
impl_->rd_fh, impl_->rd_buf, result))
{
if(result)
return do_fail(
close_code::none, result, ec);
auto const bytes_transferred =
stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec);
if(! check_ok(ec))
impl_->stream.read_some(
impl_->rd_buf.prepare(read_size(impl_->rd_buf,
impl_->rd_buf.max_size())), ec);
if(! impl_->check_ok(ec))
return;
rd_buf_.commit(bytes_transferred);
impl_->rd_buf.commit(bytes_transferred);
}
if(detail::is_control(rd_fh_.op))
if(detail::is_control(impl_->rd_fh.op))
{
// Process control frame
if(rd_fh_.op == detail::opcode::close)
if(impl_->rd_fh.op == detail::opcode::close)
{
BOOST_ASSERT(! rd_close_);
rd_close_ = true;
BOOST_ASSERT(! impl_->rd_close);
impl_->rd_close = true;
auto const mb = buffers_prefix(
clamp(rd_fh_.len),
rd_buf_.data());
if(rd_fh_.len > 0 && rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
detail::read_close(cr_, mb, result);
clamp(impl_->rd_fh.len),
impl_->rd_buf.data());
if(impl_->rd_fh.len > 0 && impl_->rd_fh.mask)
detail::mask_inplace(mb, impl_->rd_key);
detail::read_close(impl_->cr, mb, result);
if(result)
{
// Protocol violation
return do_fail(
close_code::none, result, ec);
}
rd_buf_.consume(clamp(rd_fh_.len));
impl_->rd_buf.consume(clamp(impl_->rd_fh.len));
break;
}
rd_buf_.consume(clamp(rd_fh_.len));
impl_->rd_buf.consume(clamp(impl_->rd_fh.len));
}
else
{
read_payload:
while(rd_buf_.size() < rd_remain_)
while(impl_->rd_buf.size() < impl_->rd_remain)
{
rd_remain_ -= rd_buf_.size();
rd_buf_.consume(rd_buf_.size());
impl_->rd_remain -= impl_->rd_buf.size();
impl_->rd_buf.consume(impl_->rd_buf.size());
auto const bytes_transferred =
stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec);
if(! check_ok(ec))
impl_->stream.read_some(
impl_->rd_buf.prepare(read_size(impl_->rd_buf,
impl_->rd_buf.max_size())), ec);
if(! impl_->check_ok(ec))
return;
rd_buf_.commit(bytes_transferred);
impl_->rd_buf.commit(bytes_transferred);
}
BOOST_ASSERT(rd_buf_.size() >= rd_remain_);
rd_buf_.consume(clamp(rd_remain_));
rd_remain_ = 0;
BOOST_ASSERT(
impl_->rd_buf.size() >= impl_->rd_remain);
impl_->rd_buf.consume(clamp(impl_->rd_remain));
impl_->rd_remain = 0;
}
}
// _Close the WebSocket Connection_
do_fail(close_code::none, error::closed, ec);
if(ec == error::closed)
ec.assign(0, ec.category());
ec = {};
}
template<class NextLayer, bool deflateSupported>

View File

@@ -7,15 +7,15 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_ERROR_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_ERROR_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_ERROR_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_ERROR_HPP
namespace boost {
namespace beast {
namespace websocket {
namespace detail {
inline
BOOST_BEAST_DECL
const char*
error_codes::
name() const noexcept
@@ -23,7 +23,7 @@ name() const noexcept
return "boost.beast.websocket";
}
inline
BOOST_BEAST_DECL
std::string
error_codes::
message(int ev) const
@@ -67,7 +67,7 @@ message(int ev) const
}
}
inline
BOOST_BEAST_DECL
error_condition
error_codes::
default_error_condition(int ev) const noexcept
@@ -114,7 +114,7 @@ default_error_condition(int ev) const noexcept
}
}
inline
BOOST_BEAST_DECL
const char*
error_conditions::
name() const noexcept
@@ -122,7 +122,7 @@ name() const noexcept
return "boost.beast.websocket";
}
inline
BOOST_BEAST_DECL
std::string
error_conditions::
message(int cv) const
@@ -137,7 +137,7 @@ message(int cv) const
} // detail
inline
BOOST_BEAST_DECL
error_code
make_error_code(error e)
{
@@ -146,7 +146,7 @@ make_error_code(error e)
std::underlying_type<error>::type>(e), cat};
}
inline
BOOST_BEAST_DECL
error_condition
make_error_condition(condition c)
{

View File

@@ -7,8 +7,8 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_HANDSHAKE_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_HANDSHAKE_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_HANDSHAKE_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_HANDSHAKE_HPP
#include <boost/beast/websocket/detail/type_traits.hpp>
#include <boost/beast/http/empty_body.hpp>
@@ -58,7 +58,7 @@ class stream<NextLayer, deflateSupported>::handshake_op
, req(ws.build_request(key,
host, target, decorator))
{
ws.reset();
ws.impl_->reset();
}
};
@@ -89,9 +89,9 @@ public:
BOOST_ASIO_CORO_REENTER(*this)
{
// Send HTTP Upgrade
d_.ws.do_pmd_config(d_.req);
d_.ws.impl_->do_pmd_config(d_.req);
BOOST_ASIO_CORO_YIELD
http::async_write(d_.ws.stream_,
http::async_write(d_.ws.impl_->stream,
d_.req, std::move(*this));
if(ec)
goto upcall;
@@ -105,7 +105,7 @@ public:
// Read HTTP response
BOOST_ASIO_CORO_YIELD
http::async_read(d_.ws.next_layer(),
d_.ws.rd_buf_, d_.res,
d_.ws.impl_->rd_buf, d_.res,
std::move(*this));
if(ec)
goto upcall;
@@ -354,17 +354,17 @@ do_handshake(
error_code& ec)
{
response_type res;
reset();
impl_->reset();
detail::sec_ws_key_type key;
{
auto const req = build_request(
key, host, target, decorator);
this->do_pmd_config(req);
http::write(stream_, req, ec);
this->impl_->do_pmd_config(req);
http::write(impl_->stream, req, ec);
}
if(ec)
return;
http::read(next_layer(), rd_buf_, res, ec);
http::read(next_layer(), impl_->rd_buf, res, ec);
if(ec)
return;
on_response(res, key, ec);

View File

@@ -7,8 +7,8 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_PING_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_PING_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_PING_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_PING_HPP
#include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/bind_handler.hpp>
@@ -83,10 +83,10 @@ public:
BOOST_ASIO_CORO_REENTER(*this)
{
// Maybe suspend
if(d_.ws.wr_block_.try_lock(this))
if(d_.ws.impl_->wr_block.try_lock(this))
{
// Make sure the stream is open
if(! d_.ws.check_open(ec))
if(! d_.ws.impl_->check_open(ec))
{
BOOST_ASIO_CORO_YIELD
net::post(
@@ -99,34 +99,34 @@ public:
{
// Suspend
BOOST_ASIO_CORO_YIELD
d_.ws.paused_ping_.emplace(std::move(*this));
d_.ws.impl_->paused_ping.emplace(std::move(*this));
// Acquire the write block
d_.ws.wr_block_.lock(this);
d_.ws.impl_->wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(), std::move(*this));
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this));
// Make sure the stream is open
if(! d_.ws.check_open(ec))
if(! d_.ws.impl_->check_open(ec))
goto upcall;
}
// Send ping frame
BOOST_ASIO_CORO_YIELD
net::async_write(d_.ws.stream_,
net::async_write(d_.ws.impl_->stream,
d_.fb.data(), std::move(*this));
if(! d_.ws.check_ok(ec))
if(! d_.ws.impl_->check_ok(ec))
goto upcall;
upcall:
d_.ws.wr_block_.unlock(this);
d_.ws.paused_close_.maybe_invoke() ||
d_.ws.paused_rd_.maybe_invoke() ||
d_.ws.paused_wr_.maybe_invoke();
d_.ws.impl_->wr_block.unlock(this);
d_.ws.impl_->paused_close.maybe_invoke() ||
d_.ws.impl_->paused_rd.maybe_invoke() ||
d_.ws.impl_->paused_wr.maybe_invoke();
this->invoke(ec);
}
}
@@ -151,13 +151,13 @@ stream<NextLayer, deflateSupported>::
ping(ping_data const& payload, error_code& ec)
{
// Make sure the stream is open
if(! check_open(ec))
if(! impl_->check_open(ec))
return;
detail::frame_buffer fb;
write_ping<flat_static_buffer_base>(
fb, detail::opcode::ping, payload);
net::write(stream_, fb.data(), ec);
if(! check_ok(ec))
net::write(impl_->stream, fb.data(), ec);
if(! impl_->check_ok(ec))
return;
}
@@ -178,13 +178,13 @@ stream<NextLayer, deflateSupported>::
pong(ping_data const& payload, error_code& ec)
{
// Make sure the stream is open
if(! check_open(ec))
if(! impl_->check_open(ec))
return;
detail::frame_buffer fb;
write_ping<flat_static_buffer_base>(
fb, detail::opcode::pong, payload);
net::write(stream_, fb.data(), ec);
if(! check_ok(ec))
net::write(impl_->stream, fb.data(), ec);
if(! impl_->check_ok(ec))
return;
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -7,8 +7,8 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_RFC6455_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_RFC6455_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_RFC6455_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_RFC6455_HPP
#include <boost/beast/http/fields.hpp>
#include <boost/beast/http/rfc7230.hpp>

View File

@@ -7,8 +7,8 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_SSL_IPP_INCLUDED
#define BOOST_BEAST_WEBSOCKET_IMPL_SSL_IPP_INCLUDED
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_SSL_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_SSL_HPP
#include <utility>

View File

@@ -7,13 +7,13 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_STREAM_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_STREAM_HPP
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/beast/websocket/teardown.hpp>
#include <boost/beast/websocket/detail/hybi13.hpp>
#include <boost/beast/websocket/detail/pmd_extension.hpp>
#include <boost/beast/websocket/detail/mask.hpp>
#include <boost/beast/version.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/write.hpp>
@@ -25,17 +25,18 @@
#include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/core/detail/type_traits.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/assert.hpp>
#include <boost/endian/buffers.hpp>
#include <boost/make_unique.hpp>
#include <boost/throw_exception.hpp>
#include <algorithm>
#include <chrono>
#include <memory>
#include <stdexcept>
#include <utility>
#include <iostream>
namespace boost {
namespace beast {
namespace websocket {
@@ -44,12 +45,101 @@ template<class NextLayer, bool deflateSupported>
template<class... Args>
stream<NextLayer, deflateSupported>::
stream(Args&&... args)
: stream_(std::forward<Args>(args)...)
: impl_(std::make_shared<impl_type>(
std::forward<Args>(args)...))
{
BOOST_ASSERT(rd_buf_.max_size() >=
BOOST_ASSERT(impl_->rd_buf.max_size() >=
max_control_frame_size);
}
template<class NextLayer, bool deflateSupported>
auto
stream<NextLayer, deflateSupported>::
get_executor() const noexcept ->
executor_type
{
return impl_->stream.get_executor();
}
template<class NextLayer, bool deflateSupported>
auto
stream<NextLayer, deflateSupported>::
next_layer() noexcept ->
next_layer_type&
{
return impl_->stream;
}
template<class NextLayer, bool deflateSupported>
auto
stream<NextLayer, deflateSupported>::
next_layer() const noexcept ->
next_layer_type const&
{
return impl_->stream;
}
template<class NextLayer, bool deflateSupported>
auto
stream<NextLayer, deflateSupported>::
lowest_layer() noexcept ->
lowest_layer_type&
{
return impl_->stream.lowest_layer();
}
template<class NextLayer, bool deflateSupported>
auto
stream<NextLayer, deflateSupported>::
lowest_layer() const noexcept ->
lowest_layer_type const&
{
return impl_->stream.lowest_layer();
}
template<class NextLayer, bool deflateSupported>
bool
stream<NextLayer, deflateSupported>::
is_open() const noexcept
{
return impl_->status_ == status::open;
}
template<class NextLayer, bool deflateSupported>
bool
stream<NextLayer, deflateSupported>::
got_binary() const noexcept
{
return impl_->rd_op == detail::opcode::binary;
}
template<class NextLayer, bool deflateSupported>
bool
stream<NextLayer, deflateSupported>::
is_message_done() const noexcept
{
return impl_->rd_done;
}
template<class NextLayer, bool deflateSupported>
close_reason const&
stream<NextLayer, deflateSupported>::
reason() const noexcept
{
return impl_->cr;
}
template<class NextLayer, bool deflateSupported>
std::size_t
stream<NextLayer, deflateSupported>::
read_size_hint(
std::size_t initial_size) const
{
return impl_->read_size_hint_pmd(
initial_size, impl_->rd_done,
impl_->rd_remain, impl_->rd_fh);
}
template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer, class>
std::size_t
@@ -72,88 +162,129 @@ read_size_hint(DynamicBuffer& buffer) const
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
open(role_type role)
set_option(permessage_deflate const& o)
{
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
status_ = status::open;
rd_remain_ = 0;
rd_cont_ = false;
rd_done_ = true;
// Can't clear this because accept uses it
//rd_buf_.reset();
rd_fh_.fin = false;
rd_close_ = false;
wr_close_ = false;
// These should not be necessary, because all completion
// handlers must be allowed to execute otherwise the
// stream exhibits undefined behavior.
wr_block_.reset();
rd_block_.reset();
cr_.code = close_code::none;
wr_cont_ = false;
wr_buf_size_ = 0;
this->open_pmd(role_);
impl_->set_option_pmd(o);
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
close()
get_option(permessage_deflate& o)
{
wr_buf_.reset();
this->close_pmd();
impl_->get_option_pmd(o);
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
reset()
auto_fragment(bool value)
{
BOOST_ASSERT(status_ != status::open);
rd_remain_ = 0;
rd_cont_ = false;
rd_done_ = true;
rd_buf_.consume(rd_buf_.size());
rd_fh_.fin = false;
rd_close_ = false;
wr_close_ = false;
wr_cont_ = false;
// These should not be necessary, because all completion
// handlers must be allowed to execute otherwise the
// stream exhibits undefined behavior.
wr_block_.reset();
rd_block_.reset();
cr_.code = close_code::none;
impl_->wr_frag_opt = value;
}
template<class NextLayer, bool deflateSupported>
bool
stream<NextLayer, deflateSupported>::
auto_fragment() const
{
return impl_->wr_frag_opt;
}
// Called before each write frame
template<class NextLayer, bool deflateSupported>
inline
void
stream<NextLayer, deflateSupported>::
begin_msg()
binary(bool value)
{
wr_frag_ = wr_frag_opt_;
impl_->wr_opcode = value ?
detail::opcode::binary :
detail::opcode::text;
}
// Maintain the write buffer
if( this->pmd_enabled() ||
role_ == role_type::client)
{
if(! wr_buf_ || wr_buf_size_ != wr_buf_opt_)
{
wr_buf_size_ = wr_buf_opt_;
wr_buf_ = boost::make_unique_noinit<
std::uint8_t[]>(wr_buf_size_);
}
}
else
{
wr_buf_size_ = wr_buf_opt_;
wr_buf_.reset();
}
template<class NextLayer, bool deflateSupported>
bool
stream<NextLayer, deflateSupported>::
binary() const
{
return impl_->wr_opcode == detail::opcode::binary;
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
control_callback(std::function<
void(frame_type, string_view)> cb)
{
impl_->ctrl_cb = std::move(cb);
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
control_callback()
{
impl_->ctrl_cb = {};
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
read_message_max(std::size_t amount)
{
impl_->rd_msg_max = amount;
}
template<class NextLayer, bool deflateSupported>
std::size_t
stream<NextLayer, deflateSupported>::
read_message_max() const
{
return impl_->rd_msg_max;
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
secure_prng(bool value)
{
this->secure_prng_ = value;
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
write_buffer_size(std::size_t amount)
{
if(amount < 8)
BOOST_THROW_EXCEPTION(std::invalid_argument{
"write buffer size underflow"});
impl_->wr_buf_opt = amount;
}
template<class NextLayer, bool deflateSupported>
std::size_t
stream<NextLayer, deflateSupported>::
write_buffer_size() const
{
return impl_->wr_buf_opt;
}
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
text(bool value)
{
impl_->wr_opcode = value ?
detail::opcode::text :
detail::opcode::binary;
}
template<class NextLayer, bool deflateSupported>
bool
stream<NextLayer, deflateSupported>::
text() const
{
return impl_->wr_opcode == detail::opcode::text;
}
//------------------------------------------------------------------------------
@@ -175,7 +306,7 @@ parse_fh(
if(buffer_size(b.data()) < 2)
{
// need more bytes
ec.assign(0, ec.category());
ec = {};
return false;
}
buffers_suffix<typename
@@ -199,7 +330,7 @@ parse_fh(
if(buffer_size(cb) < need)
{
// need more bytes
ec.assign(0, ec.category());
ec = {};
return false;
}
fh.op = static_cast<
@@ -213,14 +344,14 @@ parse_fh(
{
case detail::opcode::binary:
case detail::opcode::text:
if(rd_cont_)
if(impl_->rd_cont)
{
// new data frame when continuation expected
ec = error::bad_data_frame;
return false;
}
if(fh.rsv2 || fh.rsv3 ||
! this->rd_deflated(fh.rsv1))
! impl_->rd_deflated(fh.rsv1))
{
// reserved bits not cleared
ec = error::bad_reserved_bits;
@@ -229,7 +360,7 @@ parse_fh(
break;
case detail::opcode::cont:
if(! rd_cont_)
if(! impl_->rd_cont)
{
// continuation without an active message
ec = error::bad_continuation;
@@ -270,13 +401,13 @@ parse_fh(
}
break;
}
if(role_ == role_type::server && ! fh.mask)
if(impl_->role == role_type::server && ! fh.mask)
{
// unmasked frame from client
ec = error::bad_unmasked_frame;
return false;
}
if(role_ == role_type::client && fh.mask)
if(impl_->role == role_type::client && fh.mask)
{
// masked frame from server
ec = error::bad_masked_frame;
@@ -326,7 +457,7 @@ parse_fh(
BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp));
cb.consume(buffer_copy(buffer(tmp), cb));
fh.key = detail::little_uint32_to_native(&tmp[0]);
detail::prepare_key(rd_key_, fh.key);
detail::prepare_key(impl_->rd_key, fh.key);
}
else
{
@@ -337,12 +468,12 @@ parse_fh(
{
if(fh.op != detail::opcode::cont)
{
rd_size_ = 0;
rd_op_ = fh.op;
impl_->rd_size = 0;
impl_->rd_op = fh.op;
}
else
{
if(rd_size_ > (std::numeric_limits<
if(impl_->rd_size > (std::numeric_limits<
std::uint64_t>::max)() - fh.len)
{
// message size exceeds configured limit
@@ -350,21 +481,21 @@ parse_fh(
return false;
}
}
if(! this->rd_deflated())
if(! impl_->rd_deflated())
{
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_size_, fh.len, rd_msg_max_))
if(impl_->rd_msg_max && beast::detail::sum_exceeds(
impl_->rd_size, fh.len, impl_->rd_msg_max))
{
// message size exceeds configured limit
ec = error::message_too_big;
return false;
}
}
rd_cont_ = ! fh.fin;
rd_remain_ = fh.len;
impl_->rd_cont = ! fh.fin;
impl_->rd_remain = fh.len;
}
b.consume(b.size() - buffer_size(cb));
ec.assign(0, ec.category());
ec = {};
return true;
}
@@ -383,7 +514,7 @@ write_close(DynamicBuffer& db, close_reason const& cr)
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
if(role_ == role_type::client)
if(impl_->role == role_type::client)
{
fh.mask = true;
fh.key = this->create_mask();
@@ -436,7 +567,7 @@ write_ping(DynamicBuffer& db,
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = data.size();
fh.mask = role_ == role_type::client;
fh.mask = impl_->role == role_type::client;
if(fh.mask)
fh.key = this->create_mask();
detail::write(db, fh);
@@ -474,7 +605,7 @@ build_request(detail::sec_ws_key_type& key,
detail::make_sec_ws_key(key);
req.set(http::field::sec_websocket_key, key);
req.set(http::field::sec_websocket_version, "13");
this->build_request_pmd(req);
impl_->build_request_pmd(req);
decorator(req);
if(! req.count(http::field::user_agent))
req.set(http::field::user_agent,
@@ -572,7 +703,7 @@ build_response(
detail::make_sec_ws_accept(acc, key);
res.set(http::field::sec_websocket_accept, acc);
}
this->build_response_pmd(res, req);
impl_->build_response_pmd(res, req);
decorate(res);
result = {};
return res;
@@ -620,9 +751,9 @@ on_response(
return err(error::bad_sec_accept);
}
ec.assign(0, ec.category());
this->on_response_pmd(res);
open(role_type::client);
ec = {};
impl_->on_response_pmd(res);
impl_->open(role_type::client);
}
// _Fail the WebSocket Connection_
@@ -635,32 +766,32 @@ do_fail(
error_code& ec) // set to the error, else set to ev
{
BOOST_ASSERT(ev);
status_ = status::closing;
if(code != close_code::none && ! wr_close_)
impl_->status_ = status::closing;
if(code != close_code::none && ! impl_->wr_close)
{
wr_close_ = true;
impl_->wr_close = true;
detail::frame_buffer fb;
write_close<
flat_static_buffer_base>(fb, code);
net::write(stream_, fb.data(), ec);
if(! check_ok(ec))
net::write(impl_->stream, fb.data(), ec);
if(! impl_->check_ok(ec))
return;
}
using beast::websocket::teardown;
teardown(role_, stream_, ec);
teardown(impl_->role, impl_->stream, ec);
if(ec == net::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
ec = {};
}
if(! ec)
ec = ev;
if(ec && ec != error::closed)
status_ = status::failed;
impl_->status_ = status::failed;
else
status_ = status::closed;
close();
impl_->status_ = status::closed;
impl_->close();
}
} // websocket

View File

@@ -0,0 +1,199 @@
//
// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP
#include <boost/beast/core/static_buffer.hpp>
#include <boost/beast/core/saved_handler.hpp>
#include <boost/beast/websocket/detail/frame.hpp>
#include <boost/beast/websocket/detail/pmd_extension.hpp>
#include <boost/beast/websocket/detail/soft_mutex.hpp>
#include <boost/beast/websocket/detail/utf8_checker.hpp>
#include <boost/optional.hpp>
namespace boost {
namespace beast {
namespace websocket {
template<
class NextLayer, bool deflateSupported>
struct stream<NextLayer, deflateSupported>::impl_type
: std::enable_shared_from_this<impl_type>
, detail::impl_base<deflateSupported>
{
using time_point = typename
std::chrono::steady_clock::time_point;
static constexpr time_point never()
{
return (time_point::max)();
}
NextLayer stream; // The underlying stream
close_reason cr; // set from received close frame
control_cb_type ctrl_cb; // control callback
std::size_t rd_msg_max /* max message size */ = 16 * 1024 * 1024;
std::uint64_t rd_size /* total size of current message so far */ = 0;
std::uint64_t rd_remain /* message frame bytes left in current frame */ = 0;
detail::frame_header rd_fh; // current frame header
detail::prepared_key rd_key; // current stateful mask key
detail::frame_buffer rd_fb; // to write control frames (during reads)
detail::utf8_checker rd_utf8; // to validate utf8
static_buffer<
+tcp_frame_size> rd_buf; // buffer for reads
detail::opcode rd_op /* current message binary or text */ = detail::opcode::text;
bool rd_cont /* `true` if the next frame is a continuation */ = false;
bool rd_done /* set when a message is done */ = true;
bool rd_close /* did we read a close frame? */ = false;
detail::soft_mutex rd_block; // op currently reading
role_type role /* server or client */ = role_type::client;
status status_ /* state of the object */ = status::closed;
detail::soft_mutex wr_block; // op currently writing
bool wr_close /* did we write a close frame? */ = false;
bool wr_cont /* next write is a continuation */ = false;
bool wr_frag /* autofrag the current message */ = false;
bool wr_frag_opt /* autofrag option setting */ = true;
bool wr_compress /* compress current message */ = false;
detail::opcode wr_opcode /* message type */ = detail::opcode::text;
std::unique_ptr<
std::uint8_t[]> wr_buf; // write buffer
std::size_t wr_buf_size /* write buffer size (current message) */ = 0;
std::size_t wr_buf_opt /* write buffer size option setting */ = 4096;
detail::fh_buffer wr_fb; // header buffer used for writes
saved_handler paused_rd; // paused read op
saved_handler paused_wr; // paused write op
saved_handler paused_ping; // paused ping op
saved_handler paused_close; // paused close op
saved_handler paused_r_rd; // paused read op (async read)
saved_handler paused_r_close; // paused close op (async read)
enum
{
};
template<class... Args>
impl_type(Args&&... args)
: stream(std::forward<Args>(args)...)
{
}
void
open(role_type role_)
{
// VFALCO TODO analyze and remove dupe code in reset()
role = role_;
status_ = status::open;
rd_remain = 0;
rd_cont = false;
rd_done = true;
// Can't clear this because accept uses it
//rd_buf.reset();
rd_fh.fin = false;
rd_close = false;
wr_close = false;
// These should not be necessary, because all completion
// handlers must be allowed to execute otherwise the
// stream exhibits undefined behavior.
wr_block.reset();
rd_block.reset();
cr.code = close_code::none;
wr_cont = false;
wr_buf_size = 0;
this->open_pmd(role);
}
void
close()
{
wr_buf.reset();
this->close_pmd();
}
void
reset()
{
BOOST_ASSERT(status_ != status::open);
rd_remain = 0;
rd_cont = false;
rd_done = true;
rd_buf.consume(rd_buf.size());
rd_fh.fin = false;
rd_close = false;
wr_close = false;
wr_cont = false;
// These should not be necessary, because all completion
// handlers must be allowed to execute otherwise the
// stream exhibits undefined behavior.
wr_block.reset();
rd_block.reset();
cr.code = close_code::none;
}
// Called before each write frame
void
begin_msg()
{
wr_frag = wr_frag_opt;
// Maintain the write buffer
if( this->pmd_enabled() ||
role == role_type::client)
{
if(! wr_buf ||
wr_buf_size != wr_buf_opt)
{
wr_buf_size = wr_buf_opt;
wr_buf = boost::make_unique_noinit<
std::uint8_t[]>(wr_buf_size);
}
}
else
{
wr_buf_size = wr_buf_opt;
wr_buf.reset();
}
}
bool
check_open(error_code& ec)
{
if(status_ != status::open)
{
ec = net::error::operation_aborted;
return false;
}
ec = {};
return true;
}
bool
check_ok(error_code& ec)
{
if(ec)
{
if(status_ != status::closed)
status_ = status::failed;
return false;
}
return true;
}
};
} // websocket
} // beast
} // boost
#endif

View File

@@ -7,8 +7,8 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_TEARDOWN_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_TEARDOWN_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_TEARDOWN_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_TEARDOWN_HPP
#include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/bind_handler.hpp>

View File

@@ -7,9 +7,10 @@
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_WRITE_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_WRITE_IPP
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
#include <boost/beast/websocket/detail/mask.hpp>
#include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/buffers_cat.hpp>
@@ -33,106 +34,6 @@ namespace boost {
namespace beast {
namespace websocket {
namespace detail {
// Compress a buffer sequence
// Returns: `true` if more calls are needed
//
template<>
template<class ConstBufferSequence>
bool
stream_base<true>::
deflate(
net::mutable_buffer& out,
buffers_suffix<ConstBufferSequence>& cb,
bool fin,
std::size_t& total_in,
error_code& ec)
{
using net::buffer;
BOOST_ASSERT(out.size() >= 6);
auto& zo = this->pmd_->zo;
zlib::z_params zs;
zs.avail_in = 0;
zs.next_in = nullptr;
zs.avail_out = out.size();
zs.next_out = out.data();
for(auto in : beast::buffers_range_ref(cb))
{
zs.avail_in = in.size();
if(zs.avail_in == 0)
continue;
zs.next_in = in.data();
zo.write(zs, zlib::Flush::none, ec);
if(ec)
{
if(ec != zlib::error::need_buffers)
return false;
BOOST_ASSERT(zs.avail_out == 0);
BOOST_ASSERT(zs.total_out == out.size());
ec.assign(0, ec.category());
break;
}
if(zs.avail_out == 0)
{
BOOST_ASSERT(zs.total_out == out.size());
break;
}
BOOST_ASSERT(zs.avail_in == 0);
}
total_in = zs.total_in;
cb.consume(zs.total_in);
if(zs.avail_out > 0 && fin)
{
auto const remain = net::buffer_size(cb);
if(remain == 0)
{
// Inspired by Mark Adler
// https://github.com/madler/zlib/issues/149
//
// VFALCO We could do this flush twice depending
// on how much space is in the output.
zo.write(zs, zlib::Flush::block, ec);
BOOST_ASSERT(! ec || ec == zlib::error::need_buffers);
if(ec == zlib::error::need_buffers)
ec.assign(0, ec.category());
if(ec)
return false;
if(zs.avail_out >= 6)
{
zo.write(zs, zlib::Flush::full, ec);
BOOST_ASSERT(! ec);
// remove flush marker
zs.total_out -= 4;
out = buffer(out.data(), zs.total_out);
return false;
}
}
}
ec.assign(0, ec.category());
out = buffer(out.data(), zs.total_out);
return true;
}
template<>
inline
void
stream_base<true>::
do_context_takeover_write(role_type role)
{
if((role == role_type::client &&
this->pmd_config_.client_no_context_takeover) ||
(role == role_type::server &&
this->pmd_config_.server_no_context_takeover))
{
this->pmd_->zo.reset();
}
}
} // detail
//------------------------------------------------------------------------------
template<class NextLayer, bool deflateSupported>
template<class Buffers, class Handler>
class stream<NextLayer, deflateSupported>::write_some_op
@@ -205,10 +106,10 @@ operator()(
BOOST_ASIO_CORO_REENTER(*this)
{
// Set up the outgoing frame header
if(! ws_.wr_cont_)
if(! ws_.impl_->wr_cont)
{
ws_.begin_msg();
fh_.rsv1 = ws_.wr_compress_;
ws_.impl_->begin_msg();
fh_.rsv1 = ws_.impl_->wr_compress;
}
else
{
@@ -216,27 +117,27 @@ operator()(
}
fh_.rsv2 = false;
fh_.rsv3 = false;
fh_.op = ws_.wr_cont_ ?
detail::opcode::cont : ws_.wr_opcode_;
fh_.op = ws_.impl_->wr_cont ?
detail::opcode::cont : ws_.impl_->wr_opcode;
fh_.mask =
ws_.role_ == role_type::client;
ws_.impl_->role == role_type::client;
// Choose a write algorithm
if(ws_.wr_compress_)
if(ws_.impl_->wr_compress)
{
how_ = do_deflate;
}
else if(! fh_.mask)
{
if(! ws_.wr_frag_)
if(! ws_.impl_->wr_frag)
{
how_ = do_nomask_nofrag;
}
else
{
BOOST_ASSERT(ws_.wr_buf_size_ != 0);
BOOST_ASSERT(ws_.impl_->wr_buf_size != 0);
remain_ = buffer_size(cb_);
if(remain_ > ws_.wr_buf_size_)
if(remain_ > ws_.impl_->wr_buf_size)
how_ = do_nomask_frag;
else
how_ = do_nomask_nofrag;
@@ -244,15 +145,15 @@ operator()(
}
else
{
if(! ws_.wr_frag_)
if(! ws_.impl_->wr_frag)
{
how_ = do_mask_nofrag;
}
else
{
BOOST_ASSERT(ws_.wr_buf_size_ != 0);
BOOST_ASSERT(ws_.impl_->wr_buf_size != 0);
remain_ = buffer_size(cb_);
if(remain_ > ws_.wr_buf_size_)
if(remain_ > ws_.impl_->wr_buf_size)
how_ = do_mask_frag;
else
how_ = do_mask_nofrag;
@@ -260,10 +161,10 @@ operator()(
}
// Maybe suspend
if(ws_.wr_block_.try_lock(this))
if(ws_.impl_->wr_block.try_lock(this))
{
// Make sure the stream is open
if(! ws_.check_open(ec))
if(! ws_.impl_->check_open(ec))
goto upcall;
}
else
@@ -271,19 +172,19 @@ operator()(
do_suspend:
// Suspend
BOOST_ASIO_CORO_YIELD
ws_.paused_wr_.emplace(std::move(*this));
ws_.impl_->paused_wr.emplace(std::move(*this));
// Acquire the write block
ws_.wr_block_.lock(this);
ws_.impl_->wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
ws_.get_executor(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
BOOST_ASSERT(ws_.impl_->wr_block.is_locked(this));
// Make sure the stream is open
if(! ws_.check_open(ec))
if(! ws_.impl_->check_open(ec))
goto upcall;
}
@@ -293,16 +194,16 @@ operator()(
{
fh_.fin = fin_;
fh_.len = buffer_size(cb_);
ws_.wr_fb_.clear();
ws_.impl_->wr_fb.clear();
detail::write<flat_static_buffer_base>(
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
ws_.impl_->wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD
net::async_write(ws_.stream_,
buffers_cat(ws_.wr_fb_.data(), cb_),
net::async_write(ws_.impl_->stream,
buffers_cat(ws_.impl_->wr_fb.data(), cb_),
std::move(*this));
if(! ws_.check_ok(ec))
if(! ws_.impl_->check_ok(ec))
goto upcall;
bytes_transferred_ += clamp(fh_.len);
goto upcall;
@@ -314,22 +215,22 @@ operator()(
{
for(;;)
{
n = clamp(remain_, ws_.wr_buf_size_);
n = clamp(remain_, ws_.impl_->wr_buf_size);
fh_.len = n;
remain_ -= n;
fh_.fin = fin_ ? remain_ == 0 : false;
ws_.wr_fb_.clear();
ws_.impl_->wr_fb.clear();
detail::write<flat_static_buffer_base>(
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
ws_.impl_->wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD
net::async_write(
ws_.stream_, buffers_cat(
ws_.wr_fb_.data(), buffers_prefix(
ws_.impl_->stream, buffers_cat(
ws_.impl_->wr_fb.data(), buffers_prefix(
clamp(fh_.len), cb_)),
std::move(*this));
if(! ws_.check_ok(ec))
if(! ws_.impl_->check_ok(ec))
goto upcall;
n = clamp(fh_.len); // because yield
bytes_transferred_ += n;
@@ -339,15 +240,15 @@ operator()(
fh_.op = detail::opcode::cont;
// Allow outgoing control frames to
// be sent in between message frames
ws_.wr_block_.unlock(this);
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
ws_.impl_->wr_block.unlock(this);
if( ws_.impl_->paused_close.maybe_invoke() ||
ws_.impl_->paused_rd.maybe_invoke() ||
ws_.impl_->paused_ping.maybe_invoke())
{
BOOST_ASSERT(ws_.wr_block_.is_locked());
BOOST_ASSERT(ws_.impl_->wr_block.is_locked());
goto do_suspend;
}
ws_.wr_block_.lock(this);
ws_.impl_->wr_block.lock(this);
}
goto upcall;
}
@@ -361,41 +262,41 @@ operator()(
fh_.len = remain_;
fh_.key = ws_.create_mask();
detail::prepare_key(key_, fh_.key);
ws_.wr_fb_.clear();
ws_.impl_->wr_fb.clear();
detail::write<flat_static_buffer_base>(
ws_.wr_fb_, fh_);
n = clamp(remain_, ws_.wr_buf_size_);
ws_.impl_->wr_fb, fh_);
n = clamp(remain_, ws_.impl_->wr_buf_size);
buffer_copy(buffer(
ws_.wr_buf_.get(), n), cb_);
ws_.impl_->wr_buf.get(), n), cb_);
detail::mask_inplace(buffer(
ws_.wr_buf_.get(), n), key_);
ws_.impl_->wr_buf.get(), n), key_);
remain_ -= n;
ws_.wr_cont_ = ! fin_;
ws_.impl_->wr_cont = ! fin_;
// Send frame header and partial payload
BOOST_ASIO_CORO_YIELD
net::async_write(
ws_.stream_, buffers_cat(ws_.wr_fb_.data(),
buffer(ws_.wr_buf_.get(), n)),
ws_.impl_->stream, buffers_cat(ws_.impl_->wr_fb.data(),
buffer(ws_.impl_->wr_buf.get(), n)),
std::move(*this));
if(! ws_.check_ok(ec))
if(! ws_.impl_->check_ok(ec))
goto upcall;
bytes_transferred_ +=
bytes_transferred - ws_.wr_fb_.size();
bytes_transferred - ws_.impl_->wr_fb.size();
while(remain_ > 0)
{
cb_.consume(ws_.wr_buf_size_);
n = clamp(remain_, ws_.wr_buf_size_);
cb_.consume(ws_.impl_->wr_buf_size);
n = clamp(remain_, ws_.impl_->wr_buf_size);
buffer_copy(buffer(
ws_.wr_buf_.get(), n), cb_);
ws_.impl_->wr_buf.get(), n), cb_);
detail::mask_inplace(buffer(
ws_.wr_buf_.get(), n), key_);
ws_.impl_->wr_buf.get(), n), key_);
remain_ -= n;
// Send partial payload
BOOST_ASIO_CORO_YIELD
net::async_write(ws_.stream_,
buffer(ws_.wr_buf_.get(), n),
net::async_write(ws_.impl_->stream,
buffer(ws_.impl_->wr_buf.get(), n),
std::move(*this));
if(! ws_.check_ok(ec))
if(! ws_.impl_->check_ok(ec))
goto upcall;
bytes_transferred_ += bytes_transferred;
}
@@ -408,29 +309,29 @@ operator()(
{
for(;;)
{
n = clamp(remain_, ws_.wr_buf_size_);
n = clamp(remain_, ws_.impl_->wr_buf_size);
remain_ -= n;
fh_.len = n;
fh_.key = ws_.create_mask();
fh_.fin = fin_ ? remain_ == 0 : false;
detail::prepare_key(key_, fh_.key);
buffer_copy(buffer(
ws_.wr_buf_.get(), n), cb_);
ws_.impl_->wr_buf.get(), n), cb_);
detail::mask_inplace(buffer(
ws_.wr_buf_.get(), n), key_);
ws_.wr_fb_.clear();
ws_.impl_->wr_buf.get(), n), key_);
ws_.impl_->wr_fb.clear();
detail::write<flat_static_buffer_base>(
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
ws_.impl_->wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD
net::async_write(ws_.stream_,
buffers_cat(ws_.wr_fb_.data(),
buffer(ws_.wr_buf_.get(), n)),
net::async_write(ws_.impl_->stream,
buffers_cat(ws_.impl_->wr_fb.data(),
buffer(ws_.impl_->wr_buf.get(), n)),
std::move(*this));
if(! ws_.check_ok(ec))
if(! ws_.impl_->check_ok(ec))
goto upcall;
n = bytes_transferred - ws_.wr_fb_.size();
n = bytes_transferred - ws_.impl_->wr_fb.size();
bytes_transferred_ += n;
if(remain_ == 0)
break;
@@ -438,15 +339,15 @@ operator()(
fh_.op = detail::opcode::cont;
// Allow outgoing control frames to
// be sent in between message frames:
ws_.wr_block_.unlock(this);
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
ws_.impl_->wr_block.unlock(this);
if( ws_.impl_->paused_close.maybe_invoke() ||
ws_.impl_->paused_rd.maybe_invoke() ||
ws_.impl_->paused_ping.maybe_invoke())
{
BOOST_ASSERT(ws_.wr_block_.is_locked());
BOOST_ASSERT(ws_.impl_->wr_block.is_locked());
goto do_suspend;
}
ws_.wr_block_.lock(this);
ws_.impl_->wr_block.lock(this);
}
goto upcall;
}
@@ -457,10 +358,10 @@ operator()(
{
for(;;)
{
b = buffer(ws_.wr_buf_.get(),
ws_.wr_buf_size_);
more_ = ws_.deflate(b, cb_, fin_, in_, ec);
if(! ws_.check_ok(ec))
b = buffer(ws_.impl_->wr_buf.get(),
ws_.impl_->wr_buf_size);
more_ = ws_.impl_->deflate(b, cb_, fin_, in_, ec);
if(! ws_.impl_->check_ok(ec))
goto upcall;
n = buffer_size(b);
if(n == 0)
@@ -481,16 +382,16 @@ operator()(
}
fh_.fin = ! more_;
fh_.len = n;
ws_.wr_fb_.clear();
ws_.impl_->wr_fb.clear();
detail::write<
flat_static_buffer_base>(ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
flat_static_buffer_base>(ws_.impl_->wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD
net::async_write(ws_.stream_,
buffers_cat(ws_.wr_fb_.data(), b),
net::async_write(ws_.impl_->stream,
buffers_cat(ws_.impl_->wr_fb.data(), b),
std::move(*this));
if(! ws_.check_ok(ec))
if(! ws_.impl_->check_ok(ec))
goto upcall;
bytes_transferred_ += in_;
if(more_)
@@ -499,20 +400,20 @@ operator()(
fh_.rsv1 = false;
// Allow outgoing control frames to
// be sent in between message frames:
ws_.wr_block_.unlock(this);
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
ws_.impl_->wr_block.unlock(this);
if( ws_.impl_->paused_close.maybe_invoke() ||
ws_.impl_->paused_rd.maybe_invoke() ||
ws_.impl_->paused_ping.maybe_invoke())
{
BOOST_ASSERT(ws_.wr_block_.is_locked());
BOOST_ASSERT(ws_.impl_->wr_block.is_locked());
goto do_suspend;
}
ws_.wr_block_.lock(this);
ws_.impl_->wr_block.lock(this);
}
else
{
if(fh_.fin)
ws_.do_context_takeover_write(ws_.role_);
ws_.impl_->do_context_takeover_write(ws_.impl_->role);
goto upcall;
}
}
@@ -521,10 +422,10 @@ operator()(
//--------------------------------------------------------------------------
upcall:
ws_.wr_block_.unlock(this);
ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke();
ws_.impl_->wr_block.unlock(this);
ws_.impl_->paused_close.maybe_invoke() ||
ws_.impl_->paused_rd.maybe_invoke() ||
ws_.impl_->paused_ping.maybe_invoke();
if(! cont_)
{
BOOST_ASIO_CORO_YIELD
@@ -576,15 +477,15 @@ write_some(bool fin,
using net::buffer_copy;
using net::buffer_size;
std::size_t bytes_transferred = 0;
ec.assign(0, ec.category());
ec = {};
// Make sure the stream is open
if(! check_open(ec))
if(! impl_->check_open(ec))
return bytes_transferred;
detail::frame_header fh;
if(! wr_cont_)
if(! impl_->wr_cont)
{
begin_msg();
fh.rsv1 = wr_compress_;
impl_->begin_msg();
fh.rsv1 = impl_->wr_compress;
}
else
{
@@ -592,21 +493,21 @@ write_some(bool fin,
}
fh.rsv2 = false;
fh.rsv3 = false;
fh.op = wr_cont_ ?
detail::opcode::cont : wr_opcode_;
fh.mask = role_ == role_type::client;
fh.op = impl_->wr_cont ?
detail::opcode::cont : impl_->wr_opcode;
fh.mask = impl_->role == role_type::client;
auto remain = buffer_size(buffers);
if(wr_compress_)
if(impl_->wr_compress)
{
buffers_suffix<
ConstBufferSequence> cb{buffers};
for(;;)
{
auto b = buffer(
wr_buf_.get(), wr_buf_size_);
auto const more = this->deflate(
impl_->wr_buf.get(), impl_->wr_buf_size);
auto const more = impl_->deflate(
b, cb, fin, bytes_transferred, ec);
if(! check_ok(ec))
if(! impl_->check_ok(ec))
return bytes_transferred;
auto const n = buffer_size(b);
if(n == 0)
@@ -631,10 +532,10 @@ write_some(bool fin,
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
wr_cont_ = ! fin;
net::write(stream_,
impl_->wr_cont = ! fin;
net::write(impl_->stream,
buffers_cat(fh_buf.data(), b), ec);
if(! check_ok(ec))
if(! impl_->check_ok(ec))
return bytes_transferred;
if(! more)
break;
@@ -642,11 +543,11 @@ write_some(bool fin,
fh.rsv1 = false;
}
if(fh.fin)
this->do_context_takeover_write(role_);
impl_->do_context_takeover_write(impl_->role);
}
else if(! fh.mask)
{
if(! wr_frag_)
if(! impl_->wr_frag)
{
// no mask, no autofrag
fh.fin = fin;
@@ -654,33 +555,33 @@ write_some(bool fin,
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
wr_cont_ = ! fin;
net::write(stream_,
impl_->wr_cont = ! fin;
net::write(impl_->stream,
buffers_cat(fh_buf.data(), buffers), ec);
if(! check_ok(ec))
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += remain;
}
else
{
// no mask, autofrag
BOOST_ASSERT(wr_buf_size_ != 0);
BOOST_ASSERT(impl_->wr_buf_size != 0);
buffers_suffix<
ConstBufferSequence> cb{buffers};
for(;;)
{
auto const n = clamp(remain, wr_buf_size_);
auto const n = clamp(remain, impl_->wr_buf_size);
remain -= n;
fh.len = n;
fh.fin = fin ? remain == 0 : false;
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
wr_cont_ = ! fin;
net::write(stream_,
buffers_cat(fh_buf.data(),
buffers_prefix(n, cb)), ec);
if(! check_ok(ec))
impl_->wr_cont = ! fin;
net::write(impl_->stream,
beast::buffers_cat(fh_buf.data(),
beast::buffers_prefix(n, cb)), ec);
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n;
if(remain == 0)
@@ -690,7 +591,7 @@ write_some(bool fin,
}
}
}
else if(! wr_frag_)
else if(! impl_->wr_frag)
{
// mask, no autofrag
fh.fin = fin;
@@ -704,29 +605,29 @@ write_some(bool fin,
buffers_suffix<
ConstBufferSequence> cb{buffers};
{
auto const n = clamp(remain, wr_buf_size_);
auto const b = buffer(wr_buf_.get(), n);
auto const n = clamp(remain, impl_->wr_buf_size);
auto const b = buffer(impl_->wr_buf.get(), n);
buffer_copy(b, cb);
cb.consume(n);
remain -= n;
detail::mask_inplace(b, key);
wr_cont_ = ! fin;
net::write(stream_,
impl_->wr_cont = ! fin;
net::write(impl_->stream,
buffers_cat(fh_buf.data(), b), ec);
if(! check_ok(ec))
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n;
}
while(remain > 0)
{
auto const n = clamp(remain, wr_buf_size_);
auto const b = buffer(wr_buf_.get(), n);
auto const n = clamp(remain, impl_->wr_buf_size);
auto const b = buffer(impl_->wr_buf.get(), n);
buffer_copy(b, cb);
cb.consume(n);
remain -= n;
detail::mask_inplace(b, key);
net::write(stream_, b, ec);
if(! check_ok(ec))
net::write(impl_->stream, b, ec);
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n;
}
@@ -734,28 +635,28 @@ write_some(bool fin,
else
{
// mask, autofrag
BOOST_ASSERT(wr_buf_size_ != 0);
BOOST_ASSERT(impl_->wr_buf_size != 0);
buffers_suffix<
ConstBufferSequence> cb{buffers};
ConstBufferSequence> cb(buffers);
for(;;)
{
fh.key = this->create_mask();
detail::prepared_key key;
detail::prepare_key(key, fh.key);
auto const n = clamp(remain, wr_buf_size_);
auto const b = buffer(wr_buf_.get(), n);
auto const n = clamp(remain, impl_->wr_buf_size);
auto const b = buffer(impl_->wr_buf.get(), n);
buffer_copy(b, cb);
detail::mask_inplace(b, key);
fh.len = n;
remain -= n;
fh.fin = fin ? remain == 0 : false;
wr_cont_ = ! fh.fin;
impl_->wr_cont = ! fh.fin;
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
net::write(stream_,
net::write(impl_->stream,
buffers_cat(fh_buf.data(), b), ec);
if(! check_ok(ec))
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n;
if(remain == 0)

View File

@@ -210,6 +210,6 @@ struct close_reason
} // beast
} // boost
#include <boost/beast/websocket/impl/rfc6455.ipp>
#include <boost/beast/websocket/impl/rfc6455.hpp>
#endif

View File

@@ -79,6 +79,6 @@ async_teardown(
} // beast
} // boost
#include <boost/beast/websocket/impl/ssl.ipp>
#include <boost/beast/websocket/impl/ssl.hpp>
#endif

View File

@@ -16,15 +16,9 @@
#include <boost/beast/websocket/role.hpp>
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/beast/websocket/stream_fwd.hpp>
#include <boost/beast/websocket/detail/frame.hpp>
#include <boost/beast/websocket/detail/hybi13.hpp>
#include <boost/beast/websocket/detail/mask.hpp>
#include <boost/beast/websocket/detail/pmd_extension.hpp>
#include <boost/beast/websocket/detail/soft_mutex.hpp>
#include <boost/beast/websocket/detail/stream_base.hpp>
#include <boost/beast/websocket/detail/utf8_checker.hpp>
#include <boost/beast/core/saved_handler.hpp>
#include <boost/beast/core/static_buffer.hpp>
#include <boost/beast/websocket/detail/hybi13.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/core/detail/type_traits.hpp>
#include <boost/beast/http/detail/type_traits.hpp>
@@ -34,6 +28,7 @@
#include <cstdint>
#include <functional>
#include <limits>
#include <memory>
#include <type_traits>
namespace boost {
@@ -124,7 +119,7 @@ template<
bool deflateSupported>
class stream
#if ! BOOST_BEAST_DOXYGEN
: private detail::stream_base<deflateSupported>
: private detail::stream_base
#endif
{
friend class close_test;
@@ -153,64 +148,9 @@ class stream
failed
};
NextLayer stream_; // the wrapped stream
close_reason cr_; // set from received close frame
control_cb_type ctrl_cb_; // control callback
struct impl_type;
std::size_t rd_msg_max_ // max message size
= 16 * 1024 * 1024;
std::uint64_t rd_size_ // total size of current message so far
= 0;
std::uint64_t rd_remain_ // message frame bytes left in current frame
= 0;
detail::frame_header rd_fh_; // current frame header
detail::prepared_key rd_key_; // current stateful mask key
detail::frame_buffer rd_fb_; // to write control frames (during reads)
detail::utf8_checker rd_utf8_; // to validate utf8
static_buffer<
+tcp_frame_size> rd_buf_; // buffer for reads
detail::opcode rd_op_ // current message binary or text
= detail::opcode::text;
bool rd_cont_ // `true` if the next frame is a continuation
= false;
bool rd_done_ // set when a message is done
= true;
bool rd_close_ // did we read a close frame?
= false;
detail::soft_mutex rd_block_; // op currently reading
role_type role_ // server or client
= role_type::client;
status status_
= status::closed;
detail::soft_mutex wr_block_; // op currently writing
bool wr_close_ // did we write a close frame?
= false;
bool wr_cont_ // next write is a continuation
= false;
bool wr_frag_ // autofrag the current message
= false;
bool wr_frag_opt_ // autofrag option setting
= true;
bool wr_compress_ // compress current message
= false;
detail::opcode wr_opcode_ // message type
= detail::opcode::text;
std::unique_ptr<
std::uint8_t[]> wr_buf_; // write buffer
std::size_t wr_buf_size_ // write buffer size (current message)
= 0;
std::size_t wr_buf_opt_ // write buffer size option setting
= 4096;
detail::fh_buffer wr_fb_; // header buffer used for writes
saved_handler paused_rd_; // paused read op
saved_handler paused_wr_; // paused write op
saved_handler paused_ping_; // paused ping op
saved_handler paused_close_; // paused close op
saved_handler paused_r_rd_; // paused read op (async read)
saved_handler paused_r_close_;// paused close op (async read)
std::shared_ptr<impl_type> impl_;
public:
/// Indicates if the permessage-deflate extension is supported
@@ -281,10 +221,7 @@ public:
@return A copy of the executor that stream will use to dispatch handlers.
*/
executor_type
get_executor() noexcept
{
return stream_.get_executor();
}
get_executor() const noexcept;
/** Get a reference to the next layer
@@ -295,10 +232,7 @@ public:
stream layers.
*/
next_layer_type&
next_layer()
{
return stream_;
}
next_layer() noexcept;
/** Get a reference to the next layer
@@ -309,10 +243,7 @@ public:
stream layers.
*/
next_layer_type const&
next_layer() const
{
return stream_;
}
next_layer() const noexcept;
/** Get a reference to the lowest layer
@@ -322,11 +253,9 @@ public:
@return A reference to the lowest layer in the stack of
stream layers.
*/
// DEPRECATED
lowest_layer_type&
lowest_layer()
{
return stream_.lowest_layer();
}
lowest_layer() noexcept;
/** Get a reference to the lowest layer
@@ -336,11 +265,9 @@ public:
@return A reference to the lowest layer in the stack of
stream layers. Ownership is not transferred to the caller.
*/
// DEPRECATED
lowest_layer_type const&
lowest_layer() const
{
return stream_.lowest_layer();
}
lowest_layer() const noexcept;
//--------------------------------------------------------------------------
//
@@ -354,10 +281,7 @@ public:
no error has occurred.
*/
bool
is_open() const
{
return status_ == status::open;
}
is_open() const noexcept;
/** Returns `true` if the latest message data indicates binary.
@@ -369,10 +293,7 @@ public:
undefined.
*/
bool
got_binary() const
{
return rd_op_ == detail::opcode::binary;
}
got_binary() const noexcept;
/** Returns `true` if the latest message data indicates text.
@@ -391,20 +312,14 @@ public:
/// Returns `true` if the last completed read finished the current message.
bool
is_message_done() const
{
return rd_done_;
}
is_message_done() const noexcept;
/** Returns the close reason received from the peer.
This is only valid after a read completes with error::closed.
*/
close_reason const&
reason() const
{
return cr_;
}
reason() const noexcept;
/** Returns a suggested maximum buffer size for the next call to read.
@@ -421,11 +336,7 @@ public:
*/
std::size_t
read_size_hint(
std::size_t initial_size = +tcp_frame_size) const
{
return this->read_size_hint_pmd(
initial_size, rd_done_, rd_remain_, rd_fh_);
}
std::size_t initial_size = +tcp_frame_size) const;
/** Returns a suggested maximum buffer size for the next call to read.
@@ -462,17 +373,11 @@ public:
`client_enable` or `server_enable` is `true`.
*/
void
set_option(permessage_deflate const& o)
{
this->set_option_pmd(o);
}
set_option(permessage_deflate const& o);
/// Get the permessage-deflate extension options
void
get_option(permessage_deflate& o)
{
this->get_option_pmd(o);
}
get_option(permessage_deflate& o);
/** Set the automatic fragmentation option.
@@ -494,17 +399,11 @@ public:
@endcode
*/
void
auto_fragment(bool value)
{
wr_frag_opt_ = value;
}
auto_fragment(bool value);
/// Returns `true` if the automatic fragmentation option is set.
bool
auto_fragment() const
{
return wr_frag_opt_;
}
auto_fragment() const;
/** Set the binary message write option.
@@ -526,19 +425,11 @@ public:
@endcode
*/
void
binary(bool value)
{
wr_opcode_ = value ?
detail::opcode::binary :
detail::opcode::text;
}
binary(bool value);
/// Returns `true` if the binary message write option is set.
bool
binary() const
{
return wr_opcode_ == detail::opcode::binary;
}
binary() const;
/** Set a callback to be invoked on each incoming control frame.
@@ -585,20 +476,14 @@ public:
in undefined behavior.
*/
void
control_callback(std::function<void(frame_type, string_view)> cb)
{
ctrl_cb_ = std::move(cb);
}
control_callback(std::function<void(frame_type, string_view)> cb);
/** Reset the control frame callback.
This function removes any previously set control frame callback.
*/
void
control_callback()
{
ctrl_cb_ = {};
}
control_callback();
/** Set the maximum incoming message size option.
@@ -618,17 +503,11 @@ public:
@param amount The limit on the size of incoming messages.
*/
void
read_message_max(std::size_t amount)
{
rd_msg_max_ = amount;
}
read_message_max(std::size_t amount);
/// Returns the maximum incoming message size setting.
std::size_t
read_message_max() const
{
return rd_msg_max_;
}
read_message_max() const;
/** Set whether the PRNG is cryptographically secure
@@ -656,10 +535,7 @@ public:
cryptographically secure.
*/
void
secure_prng(bool value)
{
this->secure_prng_ = value;
}
secure_prng(bool value);
/** Set the write buffer size option.
@@ -687,20 +563,11 @@ public:
@param amount The size of the write buffer in bytes.
*/
void
write_buffer_size(std::size_t amount)
{
if(amount < 8)
BOOST_THROW_EXCEPTION(std::invalid_argument{
"write buffer size underflow"});
wr_buf_opt_ = amount;
};
write_buffer_size(std::size_t amount);
/// Returns the size of the write buffer.
std::size_t
write_buffer_size() const
{
return wr_buf_opt_;
}
write_buffer_size() const;
/** Set the text message write option.
@@ -722,19 +589,11 @@ public:
@endcode
*/
void
text(bool value)
{
wr_opcode_ = value ?
detail::opcode::text :
detail::opcode::binary;
}
text(bool value);
/// Returns `true` if the text message write option is set.
bool
text() const
{
return wr_opcode_ == detail::opcode::text;
}
text() const;
//--------------------------------------------------------------------------
//
@@ -3372,38 +3231,6 @@ private:
static void default_decorate_req(request_type&) {}
static void default_decorate_res(response_type&) {}
void open(role_type role);
void close();
void reset();
void begin_msg();
bool
check_open(error_code& ec)
{
if(status_ != status::open)
{
ec = net::error::operation_aborted;
return false;
}
ec = {};
return true;
}
bool
check_ok(error_code& ec)
{
if(ec)
{
if(status_ != status::closed)
status_ = status::failed;
return false;
}
return true;
}
template<class DynamicBuffer>
bool
parse_fh(
@@ -3507,7 +3334,7 @@ private:
@see stream::secure_prng
*/
inline
BOOST_BEAST_DECL
void
seed_prng(std::seed_seq& ss)
{
@@ -3518,12 +3345,13 @@ seed_prng(std::seed_seq& ss)
} // beast
} // boost
#include <boost/beast/websocket/impl/accept.ipp>
#include <boost/beast/websocket/impl/close.ipp>
#include <boost/beast/websocket/impl/handshake.ipp>
#include <boost/beast/websocket/impl/ping.ipp>
#include <boost/beast/websocket/impl/read.ipp>
#include <boost/beast/websocket/impl/stream.ipp>
#include <boost/beast/websocket/impl/write.ipp>
#include <boost/beast/websocket/impl/accept.hpp>
#include <boost/beast/websocket/impl/close.hpp>
#include <boost/beast/websocket/impl/handshake.hpp>
#include <boost/beast/websocket/impl/stream_impl.hpp>
#include <boost/beast/websocket/impl/ping.hpp>
#include <boost/beast/websocket/impl/read.hpp>
#include <boost/beast/websocket/impl/stream.hpp>
#include <boost/beast/websocket/impl/write.hpp>
#endif

View File

@@ -170,6 +170,6 @@ async_teardown(
} // beast
} // boost
#include <boost/beast/websocket/impl/teardown.ipp>
#include <boost/beast/websocket/impl/teardown.hpp>
#endif

View File

@@ -16,6 +16,8 @@ add_executable (tests-beast-websocket
${EXTRAS_FILES}
${TEST_MAIN}
Jamfile
_detail_prng.cpp
_detail_stream_base.cpp
test.hpp
_detail_prng.cpp
accept.cpp

View File

@@ -9,6 +9,7 @@
local SOURCES =
_detail_prng.cpp
_detail_stream_base.cpp
accept.cpp
close.cpp
error.cpp

View File

@@ -0,0 +1,11 @@
//
// Copyright (w) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
// Test that header file is self-contained.
#include <boost/beast/websocket/detail/stream_base.hpp>

View File

@@ -190,7 +190,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
BEAST_EXPECT(count == 0);
ws.async_close({},
[&](error_code ec)
@@ -223,7 +223,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == 1);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
BEAST_EXPECT(count == 0);
ws.async_close({},
[&](error_code ec)
@@ -259,7 +259,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -300,7 +300,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 1);
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -341,7 +341,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 1);
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -436,7 +436,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 3);
});
BEAST_EXPECT(ws.rd_block_.is_locked());
BEAST_EXPECT(ws.impl_->rd_block.is_locked());
ws.async_close({},
[&](error_code ec)
{
@@ -446,7 +446,7 @@ public:
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(ws.is_open());
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
BEAST_EXPECT(count == 0);
ioc.run();
BEAST_EXPECT(count == 3);

View File

@@ -115,7 +115,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == 12);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
@@ -147,7 +147,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
@@ -183,7 +183,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -225,7 +225,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -266,7 +266,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -306,7 +306,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -345,7 +345,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == 1);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ws.async_ping("",
[&](error_code ec)
{
@@ -389,7 +389,7 @@ public:
ec.message());
});
if(! BEAST_EXPECT(run_until(ioc, 100,
[&]{ return ws.wr_close_; })))
[&]{ return ws.impl_->wr_close; })))
return;
// Try to ping
ws.async_ping("payload",

View File

@@ -45,7 +45,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 1);
});
while(! ws.rd_block_.is_locked())
while(! ws.impl_->rd_block.is_locked())
ioc.run_one();
multi_buffer b;
ws.async_read(b,
@@ -80,7 +80,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BOOST_ASSERT(ws.rd_block_.is_locked());
BOOST_ASSERT(ws.impl_->rd_block.is_locked());
ws.async_close({},
[&](error_code ec)
{
@@ -117,7 +117,7 @@ public:
BEAST_EXPECT(buffers_to_string(b.data()) == s);
++count;
});
BEAST_EXPECT(ws.rd_block_.is_locked());
BEAST_EXPECT(ws.impl_->rd_block.is_locked());
ws.async_write(buffer(s),
[&](error_code ec, std::size_t n)
{
@@ -127,7 +127,7 @@ public:
BEAST_EXPECT(n == s.size());
++count;
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ioc.run();
BEAST_EXPECT(count == 2);
});
@@ -166,7 +166,7 @@ public:
BEAST_EXPECT(++count == 3);
});
});
BEAST_EXPECT(ws.rd_block_.is_locked());
BEAST_EXPECT(ws.impl_->rd_block.is_locked());
ws.async_close({},
[&](error_code ec)
{
@@ -175,7 +175,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ioc.run();
BEAST_EXPECT(count == 3);
});
@@ -204,7 +204,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(ws.rd_block_.is_locked());
BEAST_EXPECT(ws.impl_->rd_block.is_locked());
ws.async_close({},
[&](error_code ec)
{
@@ -213,7 +213,7 @@ public:
system_error{ec});
BEAST_EXPECT(++count == 1);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ioc.run();
BEAST_EXPECT(count == 2);
});

View File

@@ -124,16 +124,16 @@ public:
BOOST_STATIC_ASSERT(std::is_constructible<
stream<test::stream&>, test::stream&>::value);
// VFALCO Should these be allowed for NextLayer references?
BOOST_STATIC_ASSERT(std::is_move_constructible<
stream<test::stream&>>::value);
BOOST_STATIC_ASSERT(! std::is_move_assignable<
BOOST_STATIC_ASSERT(std::is_move_assignable<
stream<test::stream&>>::value);
log << "sizeof(websocket::stream_base<true>) == " <<
sizeof(websocket::detail::stream_base<true>) << std::endl;
log << "sizeof(websocket::stream) == " <<
sizeof(websocket::stream<test::stream&>) << std::endl;
log << "sizeof(websocket::stream::impl_type) == " <<
sizeof(websocket::stream<test::stream&>::impl_type) << std::endl;
testOptions();
}

View File

@@ -290,7 +290,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
BEAST_EXPECT(count == 0);
ws.async_write(sbuf("*"),
[&](error_code ec, std::size_t n)
@@ -323,7 +323,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
BEAST_EXPECT(count == 0);
ws.async_write(sbuf("*"),
[&](error_code ec, std::size_t)
@@ -359,7 +359,7 @@ public:
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_.is_locked())
while(! ws.impl_->wr_block.is_locked())
{
ioc.run_one();
if(! BEAST_EXPECT(! ioc.stopped()))
@@ -401,7 +401,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == 16384);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ws.async_ping("",
[&](error_code ec)
{
@@ -435,7 +435,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == 16384);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ws.async_ping("",
[&](error_code ec)
{
@@ -468,7 +468,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == 16384);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ws.async_ping("",
[&](error_code ec)
{
@@ -500,7 +500,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == 16384);
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ws.async_ping("",
[&](error_code ec)
{
@@ -538,7 +538,7 @@ public:
system_error{ec});
BEAST_EXPECT(n == s.size());
});
BEAST_EXPECT(ws.wr_block_.is_locked());
BEAST_EXPECT(ws.impl_->wr_block.is_locked());
ws.async_ping("",
[&](error_code ec)
{