Refactor stream open state variable

This commit is contained in:
Vinnie Falco
2017-08-26 14:58:20 -07:00
parent 526ecc5246
commit 4c335a64cf
8 changed files with 96 additions and 89 deletions

View File

@@ -1,3 +1,9 @@
Version 110:
* Refactor stream open state variable
--------------------------------------------------------------------------------
Version 109:
* refactor test::stream

View File

@@ -134,7 +134,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
d.ws.wr_block_ = d.tok;
// Make sure the stream is open
if(d.ws.failed_)
if(! d.ws.open_)
{
BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post(
@@ -160,7 +160,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open
if(d.ws.failed_)
if(! d.ws.open_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
@@ -175,7 +175,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
d.fb.data(), std::move(*this));
if(ec)
{
d.ws.failed_ = true;
d.ws.open_ = false;
goto upcall;
}
@@ -214,7 +214,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
BOOST_ASSERT(d.ws.rd_block_ == d.tok);
// Handle the stream closing while suspended
if(d.ws.failed_)
if(! d.ws.open_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
@@ -242,7 +242,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
std::move(*this));
if(ec)
{
d.ws.failed_ = true;
d.ws.open_ = false;
goto upcall;
}
d.ws.rd_.buf.commit(bytes_transferred);
@@ -285,7 +285,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
std::move(*this));
if(ec)
{
d.ws.failed_ = true;
d.ws.open_ = false;
goto upcall;
}
d.ws.rd_.buf.commit(bytes_transferred);
@@ -312,7 +312,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
}
if(! ec)
ec = d.ev;
d.ws.failed_ = true;
d.ws.open_ = false;
upcall:
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
@@ -354,7 +354,7 @@ close(close_reason const& cr, error_code& ec)
"SyncStream requirements not met");
using beast::detail::clamp;
// Make sure the stream is open
if(failed_)
if(! open_)
{
ec = boost::asio::error::operation_aborted;
return;
@@ -368,8 +368,8 @@ close(close_reason const& cr, error_code& ec)
write_close<flat_static_buffer_base>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
}
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
// Drain the connection
close_code code{};
@@ -387,8 +387,8 @@ close(close_reason const& cr, error_code& ec)
stream_.read_some(
rd_.buf.prepare(read_size(rd_.buf,
rd_.buf.max_size())), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
rd_.buf.commit(bytes_transferred);
}
@@ -425,8 +425,8 @@ close(close_reason const& cr, error_code& ec)
stream_.read_some(
rd_.buf.prepare(read_size(rd_.buf,
rd_.buf.max_size())), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
rd_.buf.commit(bytes_transferred);
}

View File

@@ -136,7 +136,7 @@ operator()(error_code ec, std::size_t)
d.ws.wr_block_ = d.tok;
// Make sure the stream is open
if(d.ws.failed_)
if(! d.ws.open_)
{
BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post(
@@ -162,7 +162,7 @@ operator()(error_code ec, std::size_t)
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open
if(d.ws.failed_)
if(! d.ws.open_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
@@ -180,8 +180,8 @@ operator()(error_code ec, std::size_t)
d.ws.stream_, d.fb.data(),
std::move(*this));
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.failed_ = !!ec;
if(d.ws.failed_)
d.ws.open_ = ! ec;
if(! d.ws.open_)
goto upcall;
}
// Teardown
@@ -199,7 +199,7 @@ operator()(error_code ec, std::size_t)
}
if(! ec)
ec = d.ev;
d.ws.failed_ = true;
d.ws.open_ = false;
upcall:
if(d.ws.wr_block_ == d.tok)
d.ws.wr_block_.reset();
@@ -227,8 +227,8 @@ do_fail(
write_close<
flat_static_buffer_base>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
}
using beast::websocket::teardown;
@@ -239,11 +239,11 @@ do_fail(
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
ec = ev;
failed_ = true;
open_ = false;
}
/* _Fail the WebSocket Connection_

View File

@@ -131,7 +131,7 @@ operator()(error_code ec, std::size_t)
d.ws.wr_block_ = d.tok;
// Make sure the stream is open
if(d.ws.failed_)
if(! d.ws.open_)
{
BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post(
@@ -157,7 +157,7 @@ operator()(error_code ec, std::size_t)
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open
if(d.ws.failed_)
if(! d.ws.open_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
@@ -169,7 +169,7 @@ operator()(error_code ec, std::size_t)
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
if(ec)
d.ws.failed_ = true;
d.ws.open_ = false;
upcall:
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
@@ -200,7 +200,7 @@ stream<NextLayer>::
ping(ping_data const& payload, error_code& ec)
{
// Make sure the stream is open
if(failed_)
if(! open_)
{
ec = boost::asio::error::operation_aborted;
return;
@@ -228,7 +228,7 @@ stream<NextLayer>::
pong(ping_data const& payload, error_code& ec)
{
// Make sure the stream is open
if(failed_)
if(! open_)
{
ec = boost::asio::error::operation_aborted;
return;

View File

@@ -140,7 +140,7 @@ operator()(
ws_.rd_block_ = tok_;
// Make sure the stream is open
if(ws_.failed_)
if(! ws_.open_)
{
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
@@ -167,7 +167,7 @@ operator()(
dispatched_ = true;
// Handle the stream closing while suspended
if(ws_.failed_)
if(! ws_.open_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
@@ -197,8 +197,8 @@ operator()(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
ws_.rd_.buf.commit(bytes_transferred);
}
@@ -261,7 +261,7 @@ operator()(
dispatched_ = true;
// Make sure the stream is open
if(ws_.failed_)
if(! ws_.open_)
{
ws_.wr_block_.reset();
ec = boost::asio::error::operation_aborted;
@@ -284,8 +284,8 @@ operator()(
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
ws_.wr_block_.reset();
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
goto loop;
}
@@ -365,8 +365,8 @@ operator()(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask)
@@ -408,8 +408,8 @@ operator()(
ws_.stream_.async_read_some(buffer_prefix(
clamp(ws_.rd_.remain), cb_), std::move(*this));
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
@@ -453,8 +453,8 @@ operator()(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
ws_.rd_.buf.commit(bytes_transferred);
@@ -497,8 +497,8 @@ operator()(
zs.avail_in = sizeof(empty_block);
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(! ec);
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
break;
// VFALCO See:
// https://github.com/madler/zlib/issues/280
@@ -521,8 +521,8 @@ operator()(
}
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream);
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
break;
if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
ws_.rd_.size, zs.total_out, ws_.rd_msg_max_))
@@ -893,7 +893,7 @@ read_some(
close_code code{};
std::size_t bytes_written = 0;
// Make sure the stream is open
if(failed_)
if(! open_)
{
ec = boost::asio::error::operation_aborted;
return 0;
@@ -919,8 +919,8 @@ loop:
rd_.buf.prepare(read_size(
rd_.buf, rd_.buf.max_size())),
ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return bytes_written;
rd_.buf.commit(bytes_transferred);
}
@@ -959,8 +959,8 @@ loop:
write_ping<flat_static_buffer_base>(fb,
detail::opcode::pong, payload);
boost::asio::write(stream_, fb.data(), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return bytes_written;
goto loop;
}
@@ -1024,8 +1024,8 @@ loop:
rd_.buf.commit(stream_.read_some(
rd_.buf.prepare(read_size(rd_.buf,
rd_.buf.max_size())), ec));
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return bytes_written;
if(rd_.fh.mask)
detail::mask_inplace(
@@ -1068,8 +1068,8 @@ loop:
auto const bytes_transferred =
stream_.read_some(buffer_prefix(
clamp(rd_.remain), buffers), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
@@ -1131,8 +1131,8 @@ loop:
rd_.buf.prepare(read_size(
rd_.buf, rd_.buf.max_size())),
ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
rd_.buf.commit(bytes_transferred);
@@ -1162,8 +1162,8 @@ loop:
zs.avail_in = sizeof(empty_block);
pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(! ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return bytes_written;
// VFALCO See:
// https://github.com/madler/zlib/issues/280
@@ -1186,8 +1186,8 @@ loop:
}
pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_.size, zs.total_out, rd_msg_max_))

View File

@@ -134,7 +134,7 @@ open(role_type role)
{
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
failed_ = false;
open_ = true;
rd_.remain = 0;
rd_.cont = false;
rd_.done = true;
@@ -194,7 +194,8 @@ void
stream<NextLayer>::
reset()
{
failed_ = false;
BOOST_ASSERT(! open_);
open_ = false; // VFALCO is this needed?
rd_.remain = 0;
rd_.cont = false;
rd_.done = true;

View File

@@ -211,7 +211,7 @@ operator()(error_code ec,
ws_.wr_block_ = tok_;
// Make sure the stream is open
if(ws_.failed_)
if(! ws_.open_)
{
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
@@ -237,7 +237,7 @@ operator()(error_code ec,
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(ws_.failed_)
if(! ws_.open_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
@@ -262,7 +262,7 @@ operator()(error_code ec,
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
ws_.failed_ = true;
ws_.open_ = false;
goto upcall;
}
@@ -290,7 +290,7 @@ operator()(error_code ec,
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
{
ws_.failed_ = true;
ws_.open_ = false;
goto upcall;
}
if(remain_ == 0)
@@ -343,7 +343,7 @@ operator()(error_code ec,
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
{
ws_.failed_ = true;
ws_.open_ = false;
goto upcall;
}
while(remain_ > 0)
@@ -364,7 +364,7 @@ operator()(error_code ec,
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
{
ws_.failed_ = true;
ws_.open_ = false;
goto upcall;
}
}
@@ -401,7 +401,7 @@ operator()(error_code ec,
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
{
ws_.failed_ = true;
ws_.open_ = false;
goto upcall;
}
if(remain_ == 0)
@@ -435,8 +435,8 @@ operator()(error_code ec,
ws_.wr_.buf_size);
more_ = detail::deflate(
ws_.pmd_->zo, b, cb_, fin_, ec);
ws_.failed_ = !!ec;
if(ws_.failed_)
ws_.open_ = ! ec;
if(! ws_.open_)
{
// Always dispatching is easiest
BOOST_ASIO_CORO_YIELD
@@ -484,7 +484,7 @@ operator()(error_code ec,
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
{
ws_.failed_ = true;
ws_.open_ = false;
goto upcall;
}
if(more_)
@@ -567,7 +567,7 @@ write_some(bool fin,
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
// Make sure the stream is open
if(failed_)
if(! open_)
{
ec = boost::asio::error::operation_aborted;
return;
@@ -598,8 +598,8 @@ write_some(bool fin,
wr_.buf.get(), wr_.buf_size);
auto const more = detail::deflate(
pmd_->zo, b, cb, fin, ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
auto const n = buffer_size(b);
if(n == 0)
@@ -627,8 +627,8 @@ write_some(bool fin,
wr_.cont = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
if(! more)
break;
@@ -656,8 +656,8 @@ write_some(bool fin,
wr_.cont = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffers), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
}
else
@@ -679,8 +679,8 @@ write_some(bool fin,
boost::asio::write(stream_,
buffer_cat(fh_buf.data(),
buffer_prefix(n, cb)), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
if(remain == 0)
break;
@@ -713,8 +713,8 @@ write_some(bool fin,
wr_.cont = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
}
while(remain > 0)
@@ -726,8 +726,8 @@ write_some(bool fin,
remain -= n;
detail::mask_inplace(b, key);
boost::asio::write(stream_, b, ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
}
return;
@@ -755,8 +755,8 @@ write_some(bool fin,
flat_static_buffer_base>(fh_buf, fh);
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
failed_ = !!ec;
if(failed_)
open_ = ! ec;
if(! open_)
return;
if(remain == 0)
break;

View File

@@ -228,7 +228,7 @@ class stream
detail::opcode::text; // outgoing message type
control_cb_type ctrl_cb_; // control callback
role_type role_; // server or client
bool failed_ = true; // the connection failed
bool open_ = false; // `true` if established
bool rd_close_; // read close frame
bool wr_close_; // sent close frame