Refactor HTTP async read composed operations

fix #810
This commit is contained in:
Vinnie Falco
2017-11-18 15:04:17 -08:00
parent 5ae15432b8
commit 75fcea69ee
3 changed files with 122 additions and 128 deletions

View File

@ -2,6 +2,7 @@ Version 145:
* Rename some detail functions * Rename some detail functions
* Refactor basic_fields allocator internals * Refactor basic_fields allocator internals
* Refactor HTTP async read composed operations
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------

View File

@ -20,6 +20,7 @@
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/asio/associated_allocator.hpp> #include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp> #include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/handler_continuation_hook.hpp> #include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
@ -38,15 +39,14 @@ namespace detail {
template<class Stream, class DynamicBuffer, template<class Stream, class DynamicBuffer,
bool isRequest, class Derived, class Handler> bool isRequest, class Derived, class Handler>
class read_some_op class read_some_op
: public boost::asio::coroutine
{ {
int state_ = 0;
Stream& s_; Stream& s_;
DynamicBuffer& b_; DynamicBuffer& b_;
basic_parser<isRequest, Derived>& p_; basic_parser<isRequest, Derived>& p_;
boost::optional<typename
DynamicBuffer::mutable_buffers_type> mb_;
std::size_t bytes_transferred_ = 0; std::size_t bytes_transferred_ = 0;
Handler h_; Handler h_;
bool cont_ = false;
public: public:
read_some_op(read_some_op&&) = default; read_some_op(read_some_op&&) = default;
@ -83,14 +83,15 @@ public:
void void
operator()( operator()(
error_code ec = {}, error_code ec,
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0,
bool cont = true);
friend friend
bool asio_handler_is_continuation(read_some_op* op) bool asio_handler_is_continuation(read_some_op* op)
{ {
using boost::asio::asio_handler_is_continuation; using boost::asio::asio_handler_is_continuation;
return op->state_ >= 2 ? true : return op->cont_ ? true :
asio_handler_is_continuation( asio_handler_is_continuation(
std::addressof(op->h_)); std::addressof(op->h_));
} }
@ -103,75 +104,69 @@ read_some_op<Stream, DynamicBuffer,
isRequest, Derived, Handler>:: isRequest, Derived, Handler>::
operator()( operator()(
error_code ec, error_code ec,
std::size_t bytes_transferred) std::size_t bytes_transferred,
bool cont)
{ {
switch(state_) cont_ = cont;
boost::optional<typename
DynamicBuffer::mutable_buffers_type> mb;
BOOST_ASIO_CORO_REENTER(*this)
{ {
case 0:
state_ = 1;
if(b_.size() == 0) if(b_.size() == 0)
goto do_read; goto do_read;
goto do_parse; for(;;)
case 1:
state_ = 2;
case 2:
if(ec == boost::asio::error::eof)
{ {
BOOST_ASSERT(bytes_transferred == 0); // parse
if(p_.got_some())
{ {
// caller sees EOF on next read auto const used = p_.put(b_.data(), ec);
ec.assign(0, ec.category()); bytes_transferred_ += used;
p_.put_eof(ec); b_.consume(used);
if(ec)
goto upcall;
BOOST_ASSERT(p_.is_done());
goto upcall;
} }
ec = error::end_of_stream; if(ec != http::error::need_more)
goto upcall; break;
}
if(ec)
goto upcall;
b_.commit(bytes_transferred);
do_parse: do_read:
{ try
auto const used = p_.put(b_.data(), ec); {
bytes_transferred_ += used; mb.emplace(b_.prepare(
b_.consume(used); read_size_or_throw(b_, 65536)));
if(! ec || ec != http::error::need_more) }
goto do_upcall; catch(std::length_error const&)
ec.assign(0, ec.category()); {
ec = error::buffer_overflow;
break;
}
BOOST_ASIO_CORO_YIELD
s_.async_read_some(*mb, std::move(*this));
if(ec == boost::asio::error::eof)
{
BOOST_ASSERT(bytes_transferred == 0);
if(p_.got_some())
{
// caller sees EOF on next read
ec.assign(0, ec.category());
p_.put_eof(ec);
if(ec)
goto upcall;
BOOST_ASSERT(p_.is_done());
goto upcall;
}
ec = error::end_of_stream;
break;
}
if(ec)
break;
b_.commit(bytes_transferred);
}
upcall:
if(! cont_)
return boost::asio::post(
s_.get_executor(),
bind_handler(std::move(h_),
ec, bytes_transferred_));
h_(ec, bytes_transferred_);
} }
do_read:
try
{
mb_.emplace(b_.prepare(
read_size_or_throw(b_, 65536)));
}
catch(std::length_error const&)
{
ec = error::buffer_overflow;
goto do_upcall;
}
return s_.async_read_some(*mb_, std::move(*this));
do_upcall:
if(state_ >= 2)
goto upcall;
state_ = 3;
return boost::asio::post(
s_.get_executor(),
bind_handler(std::move(*this), ec, 0));
case 3:
break;
}
upcall:
h_(ec, bytes_transferred_);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -202,13 +197,14 @@ template<class Stream, class DynamicBuffer,
bool isRequest, class Derived, class Condition, bool isRequest, class Derived, class Condition,
class Handler> class Handler>
class read_op class read_op
: public boost::asio::coroutine
{ {
int state_ = 0;
Stream& s_; Stream& s_;
DynamicBuffer& b_; DynamicBuffer& b_;
basic_parser<isRequest, Derived>& p_; basic_parser<isRequest, Derived>& p_;
std::size_t bytes_transferred_ = 0; std::size_t bytes_transferred_ = 0;
Handler h_; Handler h_;
bool cont_ = false;
public: public:
read_op(read_op&&) = default; read_op(read_op&&) = default;
@ -246,14 +242,15 @@ public:
void void
operator()( operator()(
error_code ec = {}, error_code ec,
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0,
bool cont = true);
friend friend
bool asio_handler_is_continuation(read_op* op) bool asio_handler_is_continuation(read_op* op)
{ {
using boost::asio::asio_handler_is_continuation; using boost::asio::asio_handler_is_continuation;
return op->state_ >= 3 ? true : return op->cont_ ? true :
asio_handler_is_continuation( asio_handler_is_continuation(
std::addressof(op->h_)); std::addressof(op->h_));
} }
@ -267,39 +264,33 @@ read_op<Stream, DynamicBuffer,
isRequest, Derived, Condition, Handler>:: isRequest, Derived, Condition, Handler>::
operator()( operator()(
error_code ec, error_code ec,
std::size_t bytes_transferred) std::size_t bytes_transferred,
bool cont)
{ {
switch(state_) cont_ = cont;
BOOST_ASIO_CORO_REENTER(*this)
{ {
case 0:
if(Condition{}(p_)) if(Condition{}(p_))
{ {
state_ = 1; BOOST_ASIO_CORO_YIELD
return boost::asio::post( boost::asio::post(s_.get_executor(),
s_.get_executor(),
bind_handler(std::move(*this), ec)); bind_handler(std::move(*this), ec));
goto upcall;
} }
state_ = 2; for(;;)
{
do_read: BOOST_ASIO_CORO_YIELD
return async_read_some( async_read_some(
s_, b_, p_, std::move(*this)); s_, b_, p_, std::move(*this));
if(ec)
case 1: goto upcall;
goto upcall; bytes_transferred_ += bytes_transferred;
if(Condition{}(p_))
case 2: goto upcall;
case 3: }
if(ec) upcall:
goto upcall; h_(ec, bytes_transferred_);
bytes_transferred_ += bytes_transferred;
if(Condition{}(p_))
goto upcall;
state_ = 3;
goto do_read;
} }
upcall:
h_(ec, bytes_transferred_);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -308,6 +299,7 @@ template<class Stream, class DynamicBuffer,
bool isRequest, class Body, class Allocator, bool isRequest, class Body, class Allocator,
class Handler> class Handler>
class read_msg_op class read_msg_op
: public boost::asio::coroutine
{ {
using parser_type = using parser_type =
parser<isRequest, Body, Allocator>; parser<isRequest, Body, Allocator>;
@ -317,12 +309,12 @@ class read_msg_op
struct data struct data
{ {
int state = 0;
Stream& s; Stream& s;
DynamicBuffer& b; DynamicBuffer& b;
message_type& m; message_type& m;
parser_type p; parser_type p;
std::size_t bytes_transferred = 0; std::size_t bytes_transferred = 0;
bool cont = false;
data(Handler&, Stream& s_, data(Handler&, Stream& s_,
DynamicBuffer& b_, message_type& m_) DynamicBuffer& b_, message_type& m_)
@ -369,14 +361,15 @@ public:
void void
operator()( operator()(
error_code ec = {}, error_code ec,
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0,
bool cont = true);
friend friend
bool asio_handler_is_continuation(read_msg_op* op) bool asio_handler_is_continuation(read_msg_op* op)
{ {
using boost::asio::asio_handler_is_continuation; using boost::asio::asio_handler_is_continuation;
return op->d_->state >= 2 ? true : return op->d_->cont ? true :
asio_handler_is_continuation( asio_handler_is_continuation(
std::addressof(op->d_.handler())); std::addressof(op->d_.handler()));
} }
@ -390,35 +383,32 @@ read_msg_op<Stream, DynamicBuffer,
isRequest, Body, Allocator, Handler>:: isRequest, Body, Allocator, Handler>::
operator()( operator()(
error_code ec, error_code ec,
std::size_t bytes_transferred) std::size_t bytes_transferred,
bool cont)
{ {
auto& d = *d_; auto& d = *d_;
switch(d.state) d.cont = cont;
BOOST_ASIO_CORO_REENTER(*this)
{ {
case 0: for(;;)
d.state = 1;
do_read:
return async_read_some(
d.s, d.b, d.p, std::move(*this));
case 1:
case 2:
if(ec)
goto upcall;
d.bytes_transferred +=
bytes_transferred;
if(d.p.is_done())
{ {
d.m = d.p.release(); BOOST_ASIO_CORO_YIELD
goto upcall; async_read_some(
d.s, d.b, d.p, std::move(*this));
if(ec)
goto upcall;
d.bytes_transferred +=
bytes_transferred;
if(d.p.is_done())
{
d.m = d.p.release();
goto upcall;
}
} }
d.state = 2; upcall:
goto do_read; bytes_transferred = d.bytes_transferred;
d_.invoke(ec, bytes_transferred);
} }
upcall:
bytes_transferred = d.bytes_transferred;
d_.invoke(ec, bytes_transferred);
} }
} // detail } // detail
@ -541,7 +531,8 @@ async_read_some(
detail::read_some_op<AsyncReadStream, detail::read_some_op<AsyncReadStream,
DynamicBuffer, isRequest, Derived, BOOST_ASIO_HANDLER_TYPE( DynamicBuffer, isRequest, Derived, BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{ ReadHandler, void(error_code, std::size_t))>{
init.completion_handler, stream, buffer, parser}(); init.completion_handler, stream, buffer, parser}(
{}, 0, false);
return init.result.get(); return init.result.get();
} }
@ -628,7 +619,8 @@ async_read_header(
detail::read_op<AsyncReadStream, DynamicBuffer, detail::read_op<AsyncReadStream, DynamicBuffer,
isRequest, Derived, detail::parser_is_header_done, isRequest, Derived, detail::parser_is_header_done,
BOOST_ASIO_HANDLER_TYPE(ReadHandler, void(error_code, std::size_t))>{ BOOST_ASIO_HANDLER_TYPE(ReadHandler, void(error_code, std::size_t))>{
init.completion_handler, stream, buffer, parser}(); init.completion_handler, stream, buffer, parser}(
{}, 0, false);
return init.result.get(); return init.result.get();
} }
@ -716,7 +708,8 @@ async_read(
detail::read_op<AsyncReadStream, DynamicBuffer, detail::read_op<AsyncReadStream, DynamicBuffer,
isRequest, Derived, detail::parser_is_done, isRequest, Derived, detail::parser_is_done,
BOOST_ASIO_HANDLER_TYPE(ReadHandler, void(error_code, std::size_t))>{ BOOST_ASIO_HANDLER_TYPE(ReadHandler, void(error_code, std::size_t))>{
init.completion_handler, stream, buffer, parser}(); init.completion_handler, stream, buffer, parser}(
{}, 0, false);
return init.result.get(); return init.result.get();
} }
@ -810,7 +803,8 @@ async_read(
isRequest, Body, Allocator, isRequest, Body, Allocator,
BOOST_ASIO_HANDLER_TYPE( BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{ ReadHandler, void(error_code, std::size_t))>{
init.completion_handler, stream, buffer, msg}(); init.completion_handler, stream, buffer, msg}(
{}, 0, false);
return init.result.get(); return init.result.get();
} }

View File

@ -14,7 +14,6 @@
#include <boost/beast/core/buffers_cat.hpp> #include <boost/beast/core/buffers_cat.hpp>
#include <boost/beast/core/buffers_prefix.hpp> #include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/buffers_suffix.hpp> #include <boost/beast/core/buffers_suffix.hpp>
#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/flat_static_buffer.hpp> #include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/clamp.hpp> #include <boost/beast/core/detail/clamp.hpp>