Next layer is a base class

This commit is contained in:
Vinnie Falco
2019-02-27 14:48:31 -08:00
parent e90f2087e7
commit 05b5843e19
9 changed files with 73 additions and 68 deletions

View File

@@ -193,7 +193,7 @@ public:
: stable_async_base<Handler,
beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, res_(beast::allocate_stable<response_type>(*this,
sp->build_response(req, decorator, result_)))
@@ -220,7 +220,7 @@ public:
// Send response
BOOST_ASIO_CORO_YIELD
http::async_write(
impl.stream, res_, std::move(*this));
impl.stream(), res_, std::move(*this));
if(impl.check_stop_now(ec))
goto upcall;
if(! ec)
@@ -261,7 +261,7 @@ public:
: stable_async_base<Handler,
beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, p_(beast::allocate_stable<
http::request_parser<http::empty_body>>(*this))
@@ -300,7 +300,7 @@ public:
goto upcall;
BOOST_ASIO_CORO_YIELD
http::async_read(impl.stream,
http::async_read(impl.stream(),
impl.rd_buf, p_, std::move(*this));
if(ec == http::error::end_of_stream)
ec = error::closed;
@@ -407,7 +407,7 @@ do_accept(
error_code result;
auto const res = impl_->build_response(req, decorator, result);
http::write(impl_->stream, res, ec);
http::write(impl_->stream(), res, ec);
if(ec)
return;
ec = result;

View File

@@ -55,7 +55,7 @@ public:
: stable_async_base<Handler,
beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, fb_(beast::allocate_stable<
detail::frame_buffer>(*this))
@@ -102,7 +102,7 @@ public:
impl.change_status(status::closing);
impl.update_timer(this->get_executor());
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, fb_.data(),
net::async_write(impl.stream(), fb_.data(),
beast::detail::bind_continuation(std::move(*this)));
if(impl.check_stop_now(ec))
goto upcall;
@@ -142,7 +142,7 @@ public:
if(ev_)
goto teardown;
BOOST_ASIO_CORO_YIELD
impl.stream.async_read_some(
impl.stream().async_read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())),
beast::detail::bind_continuation(std::move(*this)));
@@ -182,7 +182,7 @@ public:
impl.rd_remain -= impl.rd_buf.size();
impl.rd_buf.consume(impl.rd_buf.size());
BOOST_ASIO_CORO_YIELD
impl.stream.async_read_some(
impl.stream().async_read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())),
beast::detail::bind_continuation(std::move(*this)));
@@ -200,7 +200,7 @@ public:
BOOST_ASSERT(impl.wr_block.is_locked(this));
using beast::websocket::async_teardown;
BOOST_ASIO_CORO_YIELD
async_teardown(impl.role, impl.stream,
async_teardown(impl.role, impl.stream(),
beast::detail::bind_continuation(std::move(*this)));
BOOST_ASSERT(impl.wr_block.is_locked(this));
if(ec == net::error::eof)
@@ -297,7 +297,7 @@ close(close_reason const& cr, error_code& ec)
impl.change_status(status::closing);
detail::frame_buffer fb;
impl.template write_close<flat_static_buffer_base>(fb, cr);
net::write(impl.stream, fb.data(), ec);
net::write(impl.stream(), fb.data(), ec);
if(impl.check_stop_now(ec))
return;
}
@@ -317,7 +317,7 @@ close(close_reason const& cr, error_code& ec)
// Protocol violation
return do_fail(close_code::none, ev, ec);
}
impl.rd_buf.commit(impl.stream.read_some(
impl.rd_buf.commit(impl.stream().read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())), ec));
if(impl.check_stop_now(ec))
@@ -359,7 +359,7 @@ close(close_reason const& cr, error_code& ec)
impl.rd_remain -= impl.rd_buf.size();
impl.rd_buf.consume(impl.rd_buf.size());
impl.rd_buf.commit(
impl.stream.read_some(
impl.stream().read_some(
impl.rd_buf.prepare(
read_size(
impl.rd_buf,

View File

@@ -72,7 +72,7 @@ public:
: stable_async_base<Handler,
beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, key_(key)
, res_p_(res_p)
@@ -103,14 +103,14 @@ public:
// write HTTP request
impl.do_pmd_config(d_.req);
BOOST_ASIO_CORO_YIELD
http::async_write(impl.stream,
http::async_write(impl.stream(),
d_.req, std::move(*this));
if(impl.check_stop_now(ec))
goto upcall;
// read HTTP response
BOOST_ASIO_CORO_YIELD
http::async_read(impl.stream,
http::async_read(impl.stream(),
impl.rd_buf, d_.p,
std::move(*this));
if(ec == http::error::buffer_overflow)
@@ -125,7 +125,7 @@ public:
impl.rd_buf.clear();
BOOST_ASIO_CORO_YIELD
http::async_read(impl.stream,
http::async_read(impl.stream(),
d_.fb, d_.p, std::move(*this));
if(! ec)
@@ -213,7 +213,7 @@ do_handshake(
auto const req = impl.build_request(
key, host, target, decorator);
impl.do_pmd_config(req);
http::write(impl.stream, req, ec);
http::write(impl.stream(), req, ec);
}
if(impl.check_stop_now(ec))
return;

View File

@@ -52,7 +52,7 @@ public:
: stable_async_base<Handler,
beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, fb_(beast::allocate_stable<
detail::frame_buffer>(*this))
@@ -91,7 +91,7 @@ public:
// Send ping frame
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, fb_.data(),
net::async_write(impl.stream(), fb_.data(),
beast::detail::bind_continuation(std::move(*this)));
if(impl.check_stop_now(ec))
goto upcall;
@@ -182,7 +182,7 @@ public:
// Send ping frame
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, fb_->data(),
net::async_write(impl.stream(), fb_->data(),
//beast::detail::bind_continuation(std::move(*this)));
std::move(*this));
if(impl.check_stop_now(ec))
@@ -253,7 +253,7 @@ ping(ping_data const& payload, error_code& ec)
detail::frame_buffer fb;
impl_->template write_ping<flat_static_buffer_base>(
fb, detail::opcode::ping, payload);
net::write(impl_->stream, fb.data(), ec);
net::write(impl_->stream(), fb.data(), ec);
if(impl_->check_stop_now(ec))
return;
}
@@ -279,7 +279,7 @@ pong(ping_data const& payload, error_code& ec)
detail::frame_buffer fb;
impl_->template write_ping<flat_static_buffer_base>(
fb, detail::opcode::pong, payload);
net::write(impl_->stream, fb.data(), ec);
net::write(impl_->stream(), fb.data(), ec);
if(impl_->check_stop_now(ec))
return;
}

View File

@@ -68,7 +68,7 @@ public:
: async_base<
Handler, beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, bs_(bs)
, cb_(bs)
@@ -159,7 +159,7 @@ public:
}
BOOST_ASSERT(impl.rd_block.is_locked(this));
BOOST_ASIO_CORO_YIELD
impl.stream.async_read_some(
impl.stream().async_read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())),
std::move(*this));
@@ -250,7 +250,7 @@ public:
BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASIO_CORO_YIELD
net::async_write(
impl.stream, impl.rd_fb.data(),
impl.stream(), impl.rd_fb.data(),
beast::detail::bind_continuation(std::move(*this)));
BOOST_ASSERT(impl.wr_block.is_locked(this));
if(impl.check_stop_now(ec))
@@ -355,7 +355,7 @@ public:
// Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O.
BOOST_ASIO_CORO_YIELD
impl.stream.async_read_some(
impl.stream().async_read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())),
std::move(*this));
@@ -401,7 +401,7 @@ public:
BOOST_ASSERT(buffer_size(buffers_prefix(
clamp(impl.rd_remain), cb_)) > 0);
BOOST_ASIO_CORO_YIELD
impl.stream.async_read_some(buffers_prefix(
impl.stream().async_read_some(buffers_prefix(
clamp(impl.rd_remain), cb_), std::move(*this));
if(impl.check_stop_now(ec))
goto upcall;
@@ -443,7 +443,7 @@ public:
{
// read new
BOOST_ASIO_CORO_YIELD
impl.stream.async_read_some(
impl.stream().async_read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())),
std::move(*this));
@@ -567,7 +567,7 @@ public:
// Send close frame
BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, impl.rd_fb.data(),
net::async_write(impl.stream(), impl.rd_fb.data(),
beast::detail::bind_continuation(std::move(*this)));
BOOST_ASSERT(impl.wr_block.is_locked(this));
if(impl.check_stop_now(ec))
@@ -578,7 +578,7 @@ public:
using beast::websocket::async_teardown;
BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASIO_CORO_YIELD
async_teardown(impl.role, impl.stream,
async_teardown(impl.role, impl.stream(),
beast::detail::bind_continuation(std::move(*this)));
BOOST_ASSERT(impl.wr_block.is_locked(this));
if(ec == net::error::eof)
@@ -634,7 +634,7 @@ public:
: async_base<Handler,
beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, b_(b)
, limit_(limit ? limit : (
@@ -959,7 +959,7 @@ loop:
return bytes_written;
}
auto const bytes_transferred =
impl.stream.read_some(
impl.stream().read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())),
ec);
@@ -1001,7 +1001,7 @@ loop:
detail::frame_buffer fb;
impl.template write_ping<flat_static_buffer_base>(fb,
detail::opcode::pong, payload);
net::write(impl.stream, fb.data(), ec);
net::write(impl.stream(), fb.data(), ec);
if(impl.check_stop_now(ec))
return bytes_written;
goto loop;
@@ -1065,7 +1065,7 @@ loop:
{
// Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O.
impl.rd_buf.commit(impl.stream.read_some(
impl.rd_buf.commit(impl.stream().read_some(
impl.rd_buf.prepare(read_size(impl.rd_buf,
impl.rd_buf.max_size())), ec));
if(impl.check_stop_now(ec))
@@ -1109,7 +1109,7 @@ loop:
BOOST_ASSERT(buffer_size(buffers_prefix(
clamp(impl.rd_remain), buffers)) > 0);
auto const bytes_transferred =
impl.stream.read_some(buffers_prefix(
impl.stream().read_some(buffers_prefix(
clamp(impl.rd_remain), buffers), ec);
// VFALCO What if some bytes were written?
if(impl.check_stop_now(ec))
@@ -1170,7 +1170,7 @@ loop:
{
// read new
auto const bytes_transferred =
impl.stream.read_some(
impl.stream().read_some(
impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())),
ec);

View File

@@ -60,7 +60,7 @@ stream<NextLayer, deflateSupported>::
get_executor() const noexcept ->
executor_type
{
return impl_->stream.get_executor();
return impl_->stream().get_executor();
}
template<class NextLayer, bool deflateSupported>
@@ -69,7 +69,7 @@ stream<NextLayer, deflateSupported>::
next_layer() noexcept ->
next_layer_type&
{
return impl_->stream;
return impl_->stream();
}
template<class NextLayer, bool deflateSupported>
@@ -78,7 +78,7 @@ stream<NextLayer, deflateSupported>::
next_layer() const noexcept ->
next_layer_type const&
{
return impl_->stream;
return impl_->stream();
}
template<class NextLayer, bool deflateSupported>
@@ -324,12 +324,12 @@ do_fail(
detail::frame_buffer fb;
impl_->template write_close<
flat_static_buffer_base>(fb, code);
net::write(impl_->stream, fb.data(), ec);
net::write(impl_->stream(), fb.data(), ec);
if(impl_->check_stop_now(ec))
return;
}
using beast::websocket::teardown;
teardown(impl_->role, impl_->stream, ec);
teardown(impl_->role, impl_->stream(), ec);
if(ec == net::error::eof)
{
// Rationale:

View File

@@ -46,8 +46,13 @@ template<
struct stream<NextLayer, deflateSupported>::impl_type
: boost::enable_shared_from_this<impl_type>
, detail::impl_base<deflateSupported>
, boost::empty_value<NextLayer>
{
NextLayer stream; // The underlying stream
NextLayer& stream() noexcept
{
return this->boost::empty_value<NextLayer>::get();
}
net::steady_timer timer; // used for timeouts
close_reason cr; // set from received close frame
control_cb_type ctrl_cb; // control callback
@@ -100,11 +105,12 @@ struct stream<NextLayer, deflateSupported>::impl_type
detail::decorator decorator_opt; // Decorator for HTTP messages
timeout timeout_opt; // Timeout/idle settings
template<class... Args>
impl_type(Args&&... args)
: stream(std::forward<Args>(args)...)
, timer(stream.get_executor())
: boost::empty_value<NextLayer>(
boost::empty_init_t{},
std::forward<Args>(args)...)
, timer(this->stream().get_executor())
{
timeout_opt.handshake_timeout = none();
timeout_opt.idle_timeout = none();
@@ -472,7 +478,7 @@ private:
{
case status::handshake:
impl.timed_out = true;
close_socket(get_lowest_layer(impl.stream));
close_socket(get_lowest_layer(impl.stream()));
return;
case status::open:
@@ -494,12 +500,12 @@ private:
// timeout
impl.timed_out = true;
close_socket(get_lowest_layer(impl.stream));
close_socket(get_lowest_layer(impl.stream()));
return;
case status::closing:
impl.timed_out = true;
close_socket(get_lowest_layer(impl.stream));
close_socket(get_lowest_layer(impl.stream()));
return;
case status::closed:

View File

@@ -76,7 +76,7 @@ public:
: beast::async_base<Handler,
beast::executor_type<stream>>(
std::forward<Handler_>(h),
sp->stream.get_executor())
sp->stream().get_executor())
, wp_(sp)
, cb_(bs)
, fin_(fin)
@@ -192,7 +192,7 @@ operator()(
impl.wr_fb, fh_);
impl.wr_cont = ! fin_;
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream,
net::async_write(impl.stream(),
buffers_cat(impl.wr_fb.data(), cb_),
beast::detail::bind_continuation(std::move(*this)));
bytes_transferred_ += clamp(fh_.len);
@@ -218,7 +218,7 @@ operator()(
impl.wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, buffers_cat(
net::async_write(impl.stream(), buffers_cat(
impl.wr_fb.data(),
buffers_prefix(clamp(fh_.len), cb_)),
beast::detail::bind_continuation(std::move(*this)));
@@ -269,7 +269,7 @@ operator()(
impl.wr_cont = ! fin_;
// write frame header and some payload
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, buffers_cat(
net::async_write(impl.stream(), buffers_cat(
impl.wr_fb.data(),
net::buffer(impl.wr_buf.get(), n)),
beast::detail::bind_continuation(std::move(*this)));
@@ -289,7 +289,7 @@ operator()(
remain_ -= n;
// write more payload
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream,
net::async_write(impl.stream(),
net::buffer(impl.wr_buf.get(), n),
beast::detail::bind_continuation(std::move(*this)));
bytes_transferred_ += bytes_transferred;
@@ -322,7 +322,7 @@ operator()(
impl.wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, buffers_cat(
net::async_write(impl.stream(), buffers_cat(
impl.wr_fb.data(),
net::buffer(impl.wr_buf.get(), n)),
beast::detail::bind_continuation(std::move(*this)));
@@ -386,7 +386,7 @@ operator()(
impl.wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, buffers_cat(
net::async_write(impl.stream(), buffers_cat(
impl.wr_fb.data(), b),
beast::detail::bind_continuation(std::move(*this)));
bytes_transferred_ += in_;
@@ -555,7 +555,7 @@ write_some(bool fin,
detail::write<
flat_static_buffer_base>(fh_buf, fh);
impl.wr_cont = ! fin;
net::write(impl.stream,
net::write(impl.stream(),
buffers_cat(fh_buf.data(), b), ec);
if(impl.check_stop_now(ec))
return bytes_transferred;
@@ -578,7 +578,7 @@ write_some(bool fin,
detail::write<
flat_static_buffer_base>(fh_buf, fh);
impl.wr_cont = ! fin;
net::write(impl.stream,
net::write(impl.stream(),
buffers_cat(fh_buf.data(), buffers), ec);
if(impl.check_stop_now(ec))
return bytes_transferred;
@@ -600,7 +600,7 @@ write_some(bool fin,
detail::write<
flat_static_buffer_base>(fh_buf, fh);
impl.wr_cont = ! fin;
net::write(impl.stream,
net::write(impl.stream(),
beast::buffers_cat(fh_buf.data(),
beast::buffers_prefix(n, cb)), ec);
bytes_transferred += n;
@@ -636,7 +636,7 @@ write_some(bool fin,
remain -= n;
detail::mask_inplace(b, key);
impl.wr_cont = ! fin;
net::write(impl.stream,
net::write(impl.stream(),
buffers_cat(fh_buf.data(), b), ec);
bytes_transferred += n;
if(impl.check_stop_now(ec))
@@ -652,7 +652,7 @@ write_some(bool fin,
cb.consume(n);
remain -= n;
detail::mask_inplace(b, key);
net::write(impl.stream, b, ec);
net::write(impl.stream(), b, ec);
bytes_transferred += n;
if(impl.check_stop_now(ec))
return bytes_transferred;
@@ -682,7 +682,7 @@ write_some(bool fin,
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
net::write(impl.stream,
net::write(impl.stream(),
buffers_cat(fh_buf.data(), b), ec);
bytes_transferred += n;
if(impl.check_stop_now(ec))

View File

@@ -143,7 +143,6 @@ struct stream_base
}
return opt;
}
};
protected: