Fix resume of invokables in websocket composed operations:

When a suspended composed operation is resumed, the operation
now posts to the io_service to get in the correct context. Previously,
invokables resumed in the context of a different completion handler.

* asio_handler_invoke for any resumed invokable will return `true`.
This commit is contained in:
Vinnie Falco
2016-05-11 07:56:36 -04:00
parent 6f38d4ea21
commit 0d69868051
6 changed files with 97 additions and 77 deletions

View File

@@ -32,6 +32,10 @@ WebSocket:
* Don't try to read requests into empty_body * Don't try to read requests into empty_body
* Give callers control over the http request/response used during handshake * Give callers control over the http request/response used during handshake
* Investigate poor autobahn results in Debug builds * Investigate poor autobahn results in Debug builds
* Fall through composed operation switch cases
* Replace stream::error_ with stream::state_, example states: ok, error, abort_io
Need a cancel state so waking up a ping stored in invokable knows to call the
final handler with operation_aborted
HTTP: HTTP:
* Define Parser concept in HTTP * Define Parser concept in HTTP

View File

@@ -69,19 +69,11 @@ public:
void operator()() void operator()()
{ {
auto& d = *d_; (*this)(error_code{}, 0, true);
d.cont = false;
(*this)(error_code{}, 0, false);
}
void operator()(error_code const& ec)
{
(*this)(ec, 0);
} }
void void
operator()(error_code ec, operator()(error_code ec, std::size_t, bool again = true);
std::size_t bytes_transferred, bool again = true);
friend friend
void* asio_handler_allocate( void* asio_handler_allocate(
@@ -117,8 +109,8 @@ public:
template<class NextLayer> template<class NextLayer>
template<class Handler> template<class Handler>
void void
stream<NextLayer>::close_op<Handler>::operator()( stream<NextLayer>::close_op<Handler>::
error_code ec, std::size_t bytes_transferred, bool again) operator()(error_code ec, std::size_t, bool again)
{ {
auto& d = *d_; auto& d = *d_;
d.cont = d.cont || again; d.cont = d.cont || again;
@@ -144,11 +136,20 @@ stream<NextLayer>::close_op<Handler>::operator()(
boost::asio::error::operation_aborted, 0)); boost::asio::error::operation_aborted, 0));
return; return;
} }
d.state = 2; d.state = 3;
break; break;
// resume // resume
case 1: case 1:
// VFALCO NOTE Should d.cont be `true` or false here?
// Does this count as a continuation of the original call
// to the asynchronous initiation function (async_close)?
d.state = 2;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, 0));
return;
case 2:
if(d.ws.error_) if(d.ws.error_)
{ {
// call handler // call handler
@@ -156,10 +157,10 @@ stream<NextLayer>::close_op<Handler>::operator()(
ec = boost::asio::error::operation_aborted; ec = boost::asio::error::operation_aborted;
break; break;
} }
d.state = 2; d.state = 3; // VFALCO fall through?
break; break;
case 2: case 3:
// send close // send close
d.state = 99; d.state = 99;
assert(! d.ws.wr_close_); assert(! d.ws.wr_close_);

View File

@@ -64,16 +64,11 @@ public:
std::forward<DeducedHandler>(h), ws, std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...)) std::forward<Args>(args)...))
{ {
(*this)(error_code{}, 0, false); (*this)(error_code{}, false);
} }
void operator()(error_code const& ec) void
{ operator()(error_code ec, bool again = true);
(*this)(ec, 0);
}
void operator()(error_code ec,
std::size_t bytes_transferred, bool again = true);
friend friend
void* asio_handler_allocate( void* asio_handler_allocate(
@@ -109,9 +104,8 @@ public:
template<class NextLayer> template<class NextLayer>
template<class Handler> template<class Handler>
void void
stream<NextLayer>::handshake_op< stream<NextLayer>::handshake_op<Handler>::
Handler>::operator()(error_code ec, operator()(error_code ec, bool again)
std::size_t bytes_transferred, bool again)
{ {
auto& d = *d_; auto& d = *d_;
d.cont = d.cont || again; d.cont = d.cont || again;

View File

@@ -81,9 +81,7 @@ public:
void operator()() void operator()()
{ {
auto& d = *d_; (*this)(error_code{}, 0, true);
d.cont = false;
(*this)(error_code{}, 0, false);
} }
void operator()(error_code const& ec) void operator()(error_code const& ec)
@@ -187,7 +185,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
! d.ws.rd_utf8_check_.finish())) ! d.ws.rd_utf8_check_.finish()))
{ {
// invalid utf8 // invalid utf8
d.state = 16; d.state = 18;
code = close_code::bad_payload; code = close_code::bad_payload;
break; break;
} }
@@ -215,7 +213,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
if(code != close_code::none) if(code != close_code::none)
{ {
// protocol error // protocol error
d.state = 16; d.state = 18;
break; break;
} }
d.state = 6; d.state = 6;
@@ -241,7 +239,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
if(code != close_code::none) if(code != close_code::none)
{ {
// protocol error // protocol error
d.state = 16; d.state = 18;
break; break;
} }
if(detail::is_control(d.ws.rd_fh_.op)) if(detail::is_control(d.ws.rd_fh_.op))
@@ -292,7 +290,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
if(code != close_code::none) if(code != close_code::none)
{ {
// protocol error // protocol error
d.state = 16; d.state = 18;
break; break;
} }
d.fb.reset(); d.fb.reset();
@@ -323,7 +321,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
if(code != close_code::none) if(code != close_code::none)
{ {
// protocol error // protocol error
d.state = 16; d.state = 18;
break; break;
} }
d.fb.reset(); d.fb.reset();
@@ -337,7 +335,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
detail::read(d.ws.cr_, d.fb.data(), code); detail::read(d.ws.cr_, d.fb.data(), code);
if(code != close_code::none) if(code != close_code::none)
{ {
d.state = 16; d.state = 18;
break; break;
} }
if(! d.ws.wr_close_) if(! d.ws.wr_close_)
@@ -357,7 +355,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
read_frame_op>(std::move(*this)); read_frame_op>(std::move(*this));
return; return;
} }
d.state = 10; d.state = 11;
break; break;
} }
// call handler; // call handler;
@@ -368,6 +366,12 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
// resume // resume
case 9: case 9:
d.state = 10;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case 10:
if(d.ws.error_) if(d.ws.error_)
{ {
// call handler // call handler
@@ -382,12 +386,12 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
ec = error::closed; ec = error::closed;
break; break;
} }
d.state = 10; d.state = 11;
break; break;
// send close // send close
case 10: case 11:
d.state = 11; d.state = 12;
assert(! d.ws.wr_block_); assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d; d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_, boost::asio::async_write(d.ws.stream_,
@@ -395,20 +399,26 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
return;; return;;
// teardown // teardown
case 11: case 12:
d.state = 12; d.state = 13;
websocket_helpers::call_async_teardown( websocket_helpers::call_async_teardown(
d.ws.next_layer(), std::move(*this)); d.ws.next_layer(), std::move(*this));
return; return;
case 12: case 13:
// call handler // call handler
d.state = 99; d.state = 99;
ec = error::closed; ec = error::closed;
break; break;
// resume // resume
case 13: case 14:
d.state = 15;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case 15:
if(d.ws.error_) if(d.ws.error_)
{ {
// call handler // call handler
@@ -422,12 +432,12 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
d.state = 2; d.state = 2;
break; break;
} }
d.state = 14; d.state = 16;
break; break;
case 14: case 16:
// write ping/pong // write ping/pong
d.state = 15; d.state = 17;
assert(! d.ws.wr_block_); assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d; d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_, boost::asio::async_write(d.ws.stream_,
@@ -435,14 +445,14 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
return; return;
// sent ping/pong // sent ping/pong
case 15: case 17:
d.fb.reset(); d.fb.reset();
d.state = 2; d.state = 2;
d.ws.wr_block_ = nullptr; d.ws.wr_block_ = nullptr;
break; break;
// fail the connection // fail the connection
case 16: case 18:
if(! d.ws.wr_close_) if(! d.ws.wr_close_)
{ {
d.fb.reset(); d.fb.reset();
@@ -451,28 +461,36 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
if(d.ws.wr_block_) if(d.ws.wr_block_)
{ {
// suspend // suspend
d.state = 17; d.state = 19;
d.ws.rd_op_.template emplace< d.ws.rd_op_.template emplace<
read_frame_op>(std::move(*this)); read_frame_op>(std::move(*this));
return; return;
} }
d.state = 18; d.state = 21;
break; break;
} }
d.state = 22;
// resume
case 17:
if(d.ws.wr_close_)
{
d.state = 19;
break;
}
d.state = 18;
break; break;
case 18: // resume
case 19:
d.state = 20;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case 20:
if(d.ws.wr_close_)
{
d.state = 22;
break;
}
d.state = 21;
break;
case 21:
// send close // send close
d.state = 19; d.state = 22;
d.ws.wr_close_ = true; d.ws.wr_close_ = true;
assert(! d.ws.wr_block_); assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d; d.ws.wr_block_ = &d;
@@ -481,13 +499,13 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
return; return;
// teardown // teardown
case 19: case 22:
d.state = 20; d.state = 23;
websocket_helpers::call_async_teardown( websocket_helpers::call_async_teardown(
d.ws.next_layer(), std::move(*this)); d.ws.next_layer(), std::move(*this));
return; return;
case 20: case 23:
// call handler // call handler
d.state = 99; d.state = 99;
ec = error::failed; ec = error::failed;

View File

@@ -56,8 +56,7 @@ public:
} }
void void
operator()( operator()(error_code ec, std::size_t, bool again = true);
error_code ec, std::size_t, bool again = true);
friend friend
void* asio_handler_allocate(std::size_t size, void* asio_handler_allocate(std::size_t size,

View File

@@ -57,9 +57,9 @@ class stream<NextLayer>::write_frame_op
opcode::cont : ws.wr_opcode_; opcode::cont : ws.wr_opcode_;
ws.wr_cont_ = ! fin; ws.wr_cont_ = ! fin;
fh.fin = fin; fh.fin = fin;
fh.rsv1 = 0; fh.rsv1 = false;
fh.rsv2 = 0; fh.rsv2 = false;
fh.rsv3 = 0; fh.rsv3 = false;
fh.len = boost::asio::buffer_size(cb); fh.len = boost::asio::buffer_size(cb);
fh.mask = ws.role_ == role_type::client; fh.mask = ws.role_ == role_type::client;
if(fh.mask) if(fh.mask)
@@ -105,9 +105,7 @@ public:
void operator()() void operator()()
{ {
auto& d = *d_; (*this)(error_code{}, 0, true);
d.cont = false;
(*this)(error_code{}, 0, false);
} }
void operator()(error_code ec, void operator()(error_code ec,
@@ -179,11 +177,17 @@ operator()(
return; return;
} }
assert(! d.ws.wr_close_); assert(! d.ws.wr_close_);
d.state = 2; d.state = 3;
break; break;
// resume // resume
case 1: case 1:
d.state = 2;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case 2:
if(d.ws.error_) if(d.ws.error_)
{ {
// call handler // call handler
@@ -191,10 +195,10 @@ operator()(
ec = boost::asio::error::operation_aborted; ec = boost::asio::error::operation_aborted;
break; break;
} }
d.state = 2; d.state = 3;
break; break;
case 2: case 3:
{ {
if(! d.fh.mask) if(! d.fh.mask)
{ {
@@ -215,7 +219,7 @@ operator()(
d.remain -= n; d.remain -= n;
detail::mask_inplace(mb, d.key); detail::mask_inplace(mb, d.key);
// send header and payload // send header and payload
d.state = d.remain > 0 ? 3 : 99; d.state = d.remain > 0 ? 4 : 99;
assert(! d.ws.wr_block_); assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d; d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_, boost::asio::async_write(d.ws.stream_,
@@ -225,7 +229,7 @@ operator()(
} }
// sent masked payload // sent masked payload
case 3: case 4:
{ {
auto const n = auto const n =
detail::clamp(d.remain, d.tmp_size); detail::clamp(d.remain, d.tmp_size);