Always go through write_some:

HTTP stream serialization algorithms always go
through write_some or async_write some.
This commit is contained in:
Vinnie Falco
2017-07-07 07:07:50 -07:00
parent ae7270463d
commit f65a611b85
3 changed files with 159 additions and 166 deletions

View File

@ -1,3 +1,9 @@
Version 76:
* Always go through write_some
--------------------------------------------------------------------------------
Version 75: Version 75:
* Use file_body for valid requests, string_body otherwise. * Use file_body for valid requests, string_body otherwise.

View File

@ -28,14 +28,13 @@ namespace beast {
namespace http { namespace http {
namespace detail { namespace detail {
template<class Stream, bool isRequest, template<class Stream, class Handler,
class Body, class Fields, class Decorator, bool isRequest, class Body,
class Handler> class Fields, class Decorator>
class write_some_op class write_some_op
{ {
Stream& s_; Stream& s_;
serializer<isRequest, serializer<isRequest,Body, Fields, Decorator>& sr_;
Body, Fields, Decorator>& sr_;
Handler h_; Handler h_;
class lambda class lambda
@ -56,8 +55,8 @@ class write_some_op
operator()(error_code& ec, operator()(error_code& ec,
ConstBufferSequence const& buffers) ConstBufferSequence const& buffers)
{ {
ec.assign(0, ec.category());
invoked = true; invoked = true;
ec.assign(0, ec.category());
return op_.s_.async_write_some( return op_.s_.async_write_some(
buffers, std::move(op_)); buffers, std::move(op_));
} }
@ -119,42 +118,54 @@ public:
} }
}; };
template<class Stream, bool isRequest,class Body, template<class Stream, class Handler,
class Fields, class Decorator, class Handler> bool isRequest, class Body,
class Fields, class Decorator>
void void
write_some_op<Stream, isRequest, Body, write_some_op<Stream, Handler,
Fields, Decorator, Handler>:: isRequest, Body, Fields, Decorator>::
operator()() operator()()
{ {
error_code ec; error_code ec;
if(sr_.is_done()) if(! sr_.is_done())
return s_.get_io_service().post(
bind_handler(std::move(*this), ec, 0));
lambda f{*this};
sr_.next(ec, f);
if(ec)
{ {
BOOST_ASSERT(! f.invoked); lambda f{*this};
return s_.get_io_service().post( sr_.next(ec, f);
bind_handler(std::move(*this), ec, 0)); if(ec)
{
BOOST_ASSERT(! f.invoked);
return s_.get_io_service().post(
bind_handler(std::move(*this), ec, 0));
}
if(f.invoked)
{
// *this has been moved from,
// cannot access members here.
return;
}
// What else could it be?
BOOST_ASSERT(sr_.is_done());
} }
if(f.invoked)
// *this has been moved from,
// cannot access members here.
return;
return s_.get_io_service().post( return s_.get_io_service().post(
bind_handler(std::move(*this), ec, 0)); bind_handler(std::move(*this), ec, 0));
} }
template<class Stream, bool isRequest, class Body, template<class Stream, class Handler,
class Fields, class Decorator, class Handler> bool isRequest, class Body,
class Fields, class Decorator>
void void
write_some_op<Stream, isRequest, Body, write_some_op<Stream, Handler,
Fields, Decorator, Handler>:: isRequest, Body, Fields, Decorator>::
operator()(error_code ec, std::size_t bytes_transferred) operator()(
error_code ec, std::size_t bytes_transferred)
{ {
if(! ec) if(! ec)
{
sr_.consume(bytes_transferred); sr_.consume(bytes_transferred);
if(sr_.is_done())
if(sr_.need_close())
ec = error::end_of_stream;
}
h_(ec); h_(ec);
} }
@ -184,9 +195,12 @@ struct serializer_is_done
} }
}; };
template<class Stream, bool isRequest, //------------------------------------------------------------------------------
class Body, class Fields, class Decorator,
class Predicate, class Handler> template<
class Stream, class Handler, class Predicate,
bool isRequest, class Body,
class Fields, class Decorator>
class write_op class write_op
{ {
int state_ = 0; int state_ = 0;
@ -195,31 +209,6 @@ class write_op
Body, Fields, Decorator>& sr_; Body, Fields, Decorator>& sr_;
Handler h_; Handler h_;
class lambda
{
write_op& op_;
public:
bool invoked = false;
explicit
lambda(write_op& op)
: op_(op)
{
}
template<class ConstBufferSequence>
void
operator()(error_code& ec,
ConstBufferSequence const& buffers)
{
ec.assign(0, ec.category());
invoked = true;
return op_.s_.async_write_some(
buffers, std::move(op_));
}
};
public: public:
write_op(write_op&&) = default; write_op(write_op&&) = default;
write_op(write_op const&) = default; write_op(write_op const&) = default;
@ -235,8 +224,7 @@ public:
} }
void void
operator()(error_code ec, operator()(error_code ec);
std::size_t bytes_transferred);
friend friend
void* asio_handler_allocate( void* asio_handler_allocate(
@ -275,14 +263,14 @@ public:
} }
}; };
template<class Stream, bool isRequest, class Body, template<
class Fields, class Decorator, class Predicate, class Stream, class Handler, class Predicate,
class Handler> bool isRequest, class Body,
class Fields, class Decorator>
void void
write_op<Stream, isRequest, Body, Fields, write_op<Stream, Handler, Predicate,
Decorator, Predicate, Handler>:: isRequest, Body, Fields, Decorator>::
operator()(error_code ec, operator()(error_code ec)
std::size_t bytes_transferred)
{ {
if(ec) if(ec)
goto upcall; goto upcall;
@ -294,25 +282,10 @@ operator()(error_code ec,
{ {
state_ = 1; state_ = 1;
return s_.get_io_service().post( return s_.get_io_service().post(
bind_handler(std::move(*this), ec, 0)); bind_handler(std::move(*this), ec));
} }
lambda f{*this};
state_ = 2; state_ = 2;
sr_.next(ec, f); return async_write_some(s_, sr_, std::move(*this));
if(ec)
{
BOOST_ASSERT(! f.invoked);
return s_.get_io_service().post(
bind_handler(std::move(*this), ec, 0));
}
if(f.invoked)
// *this has been moved from,
// cannot access members here.
return;
BOOST_ASSERT(Predicate{}(sr_));
state_ = 1;
return s_.get_io_service().post(
bind_handler(std::move(*this), ec, 0));
} }
case 1: case 1:
@ -324,22 +297,9 @@ operator()(error_code ec,
case 3: case 3:
{ {
sr_.consume(bytes_transferred);
if(Predicate{}(sr_)) if(Predicate{}(sr_))
goto upcall; goto upcall;
lambda f{*this}; return async_write_some(s_, sr_, std::move(*this));
sr_.next(ec, f);
if(ec)
{
BOOST_ASSERT(! f.invoked);
goto upcall;
}
if(f.invoked)
// *this has been moved from,
// cannot access members here.
return;
BOOST_ASSERT(Predicate{}(sr_));
goto upcall;
} }
} }
upcall: upcall:
@ -424,7 +384,8 @@ public:
template<class Stream, class Handler, template<class Stream, class Handler,
bool isRequest, class Body, class Fields> bool isRequest, class Body, class Fields>
void void
write_msg_op<Stream, Handler, isRequest, Body, Fields>:: write_msg_op<
Stream, Handler, isRequest, Body, Fields>::
operator()() operator()()
{ {
auto& d = *d_; auto& d = *d_;
@ -434,13 +395,10 @@ operator()()
template<class Stream, class Handler, template<class Stream, class Handler,
bool isRequest, class Body, class Fields> bool isRequest, class Body, class Fields>
void void
write_msg_op<Stream, Handler, isRequest, Body, Fields>:: write_msg_op<
Stream, Handler, isRequest, Body, Fields>::
operator()(error_code ec) operator()(error_code ec)
{ {
auto& d = *d_;
if(! ec)
if(d.sr.need_close())
ec = error::end_of_stream;
d_.invoke(ec); d_.invoke(ec);
} }
@ -502,6 +460,38 @@ public:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
namespace detail {
template<
class SyncWriteStream,
bool isRequest, class Body, class Fields, class Decorator>
void
write_some(
SyncWriteStream& stream, serializer<
isRequest, Body, Fields, Decorator>& sr,
error_code& ec)
{
if(! sr.is_done())
{
write_some_lambda<SyncWriteStream> f{stream};
sr.next(ec, f);
if(ec)
return;
if(f.invoked)
sr.consume(f.bytes_transferred);
if(sr.is_done())
if(sr.need_close())
ec = error::end_of_stream;
return;
}
if(sr.need_close())
ec = error::end_of_stream;
else
ec.assign(0, ec.category());
}
} // detail
template<class SyncWriteStream, bool isRequest, template<class SyncWriteStream, bool isRequest,
class Body, class Fields, class Decorator> class Body, class Fields, class Decorator>
void void
@ -534,17 +524,7 @@ write_some(SyncWriteStream& stream, serializer<
"Body requirements not met"); "Body requirements not met");
static_assert(is_body_reader<Body>::value, static_assert(is_body_reader<Body>::value,
"BodyReader requirements not met"); "BodyReader requirements not met");
detail::write_some_lambda<SyncWriteStream> f{stream}; detail::write_some(stream, sr, ec);
if(sr.is_done())
{
ec.assign(0, ec.category());
return;
}
sr.next(ec, f);
if(ec)
return;
if(f.invoked)
sr.consume(f.bytes_transferred);
} }
template<class AsyncWriteStream, template<class AsyncWriteStream,
@ -564,9 +544,9 @@ async_write_some(AsyncWriteStream& stream, serializer<
"BodyReader requirements not met"); "BodyReader requirements not met");
async_completion<WriteHandler, async_completion<WriteHandler,
void(error_code)> init{handler}; void(error_code)> init{handler};
detail::write_some_op<AsyncWriteStream, isRequest, detail::write_some_op<AsyncWriteStream,
Body, Fields, Decorator, handler_type< handler_type<WriteHandler, void(error_code)>,
WriteHandler, void(error_code)>>{ isRequest, Body, Fields, Decorator>{
init.completion_handler, stream, sr}(); init.completion_handler, stream, sr}();
return init.result.get(); return init.result.get();
} }
@ -607,21 +587,23 @@ write_header(SyncWriteStream& stream, serializer<
static_assert(is_body_reader<Body>::value, static_assert(is_body_reader<Body>::value,
"BodyReader requirements not met"); "BodyReader requirements not met");
sr.split(true); sr.split(true);
if(sr.is_header_done()) if(! sr.is_header_done())
{
detail::write_lambda<SyncWriteStream> f{stream};
do
{
sr.next(ec, f);
if(ec)
return;
BOOST_ASSERT(f.invoked);
sr.consume(f.bytes_transferred);
}
while(! sr.is_header_done());
}
else
{ {
ec.assign(0, ec.category()); ec.assign(0, ec.category());
return;
} }
detail::write_lambda<SyncWriteStream> f{stream};
do
{
sr.next(ec, f);
if(ec)
return;
BOOST_ASSERT(f.invoked);
sr.consume(f.bytes_transferred);
}
while(! sr.is_header_done());
} }
template<class AsyncWriteStream, template<class AsyncWriteStream,
@ -642,22 +624,25 @@ async_write_header(AsyncWriteStream& stream, serializer<
sr.split(true); sr.split(true);
async_completion<WriteHandler, async_completion<WriteHandler,
void(error_code)> init{handler}; void(error_code)> init{handler};
detail::write_op<AsyncWriteStream, isRequest, Body, Fields, detail::write_op<AsyncWriteStream, handler_type<
Decorator, detail::serializer_is_header_done, WriteHandler, void(error_code)>,
handler_type<WriteHandler, void(error_code)>>{ detail::serializer_is_header_done,
init.completion_handler, stream, sr}( isRequest, Body, Fields, Decorator>{
error_code{}, 0); init.completion_handler, stream, sr}(
error_code{}, 0);
return init.result.get(); return init.result.get();
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template<class SyncWriteStream, template<
bool isRequest, class Body, class Fields, class SyncWriteStream,
class Decorator> bool isRequest, class Body,
class Fields, class Decorator>
void void
write(SyncWriteStream& stream, serializer< write(
isRequest, Body, Fields, Decorator>& sr) SyncWriteStream& stream,
serializer<isRequest, Body, Fields, Decorator>& sr)
{ {
static_assert(is_sync_write_stream<SyncWriteStream>::value, static_assert(is_sync_write_stream<SyncWriteStream>::value,
"SyncWriteStream requirements not met"); "SyncWriteStream requirements not met");
@ -667,34 +652,27 @@ write(SyncWriteStream& stream, serializer<
BOOST_THROW_EXCEPTION(system_error{ec}); BOOST_THROW_EXCEPTION(system_error{ec});
} }
template<class SyncWriteStream, template<
bool isRequest, class Body, class Fields, class SyncWriteStream,
class Decorator> bool isRequest, class Body,
class Fields, class Decorator>
void void
write(SyncWriteStream& stream, serializer< write(
isRequest, Body, Fields, Decorator>& sr, SyncWriteStream& stream,
error_code& ec) serializer<isRequest, Body, Fields, Decorator>& sr,
error_code& ec)
{ {
static_assert(is_sync_write_stream<SyncWriteStream>::value, static_assert(is_sync_write_stream<SyncWriteStream>::value,
"SyncWriteStream requirements not met"); "SyncWriteStream requirements not met");
sr.split(false); sr.split(false);
if(sr.is_done()) for(;;)
{ {
ec.assign(0, ec.category()); write_some(stream, sr, ec);
return;
}
detail::write_lambda<SyncWriteStream> f{stream};
do
{
sr.next(ec, f);
if(ec) if(ec)
return; return;
if(f.invoked) if(sr.is_done())
sr.consume(f.bytes_transferred); break;
} }
while(! sr.is_done());
if(sr.need_close())
ec = error::end_of_stream;
} }
template<class AsyncWriteStream, template<class AsyncWriteStream,
@ -715,11 +693,12 @@ async_write(AsyncWriteStream& stream, serializer<
sr.split(false); sr.split(false);
async_completion<WriteHandler, async_completion<WriteHandler,
void(error_code)> init{handler}; void(error_code)> init{handler};
detail::write_op<AsyncWriteStream, isRequest, Body, detail::write_op<AsyncWriteStream, handler_type<
Fields, Decorator, detail::serializer_is_done, WriteHandler, void(error_code)>,
handler_type<WriteHandler, void(error_code)>>{ detail::serializer_is_done,
isRequest, Body, Fields, Decorator>{
init.completion_handler, stream, sr}( init.completion_handler, stream, sr}(
error_code{}, 0); error_code{});
return init.result.get(); return init.result.get();
} }
@ -779,9 +758,9 @@ async_write(AsyncWriteStream& stream,
async_completion<WriteHandler, async_completion<WriteHandler,
void(error_code)> init{handler}; void(error_code)> init{handler};
detail::write_msg_op<AsyncWriteStream, handler_type< detail::write_msg_op<AsyncWriteStream, handler_type<
WriteHandler, void(error_code)>, WriteHandler, void(error_code)>, isRequest,
isRequest, Body, Fields>{ Body, Fields>{init.completion_handler,
init.completion_handler, stream, msg}(); stream, msg}();
return init.result.get(); return init.result.get();
} }

View File

@ -811,8 +811,16 @@ public:
void run() override void run() override
{ {
yield_to([&](yield_context yield){ testAsyncWrite(yield); }); yield_to(
yield_to([&](yield_context yield){ testFailures(yield); }); [&](yield_context yield)
{
testAsyncWrite(yield);
});
yield_to(
[&](yield_context yield)
{
testFailures(yield);
});
testOutput(); testOutput();
test_std_ostream(); test_std_ostream();
testIoService(); testIoService();