Frame processing routines are member functions

This commit is contained in:
Vinnie Falco
2016-06-10 09:43:57 -04:00
parent fbc8ddbc81
commit 74f9211366
7 changed files with 322 additions and 307 deletions

View File

@@ -1,6 +1,7 @@
1.0.0-b16
* Make value optional in param-list
* Frame processing routines are member functions
--------------------------------------------------------------------------------

View File

@@ -18,6 +18,7 @@ Core:
* Complete allocator testing in basic_streambuf
WebSocket:
* Move check for message size limit to account for compression
* more invokable unit test coverage
* More control over the HTTP request and response during handshakes
* optimized versions of key/masking, choose prepared_key size

View File

@@ -23,16 +23,6 @@ namespace beast {
namespace websocket {
namespace detail {
/// Identifies the role of a WebSockets stream.
enum class role_type
{
/// Stream is operating as a client.
client,
/// Stream is operating as a server.
server
};
// Contents of a WebSocket frame header
struct frame_header
{
@@ -163,134 +153,6 @@ write(DynamicBuffer& db, frame_header const& fh)
db.prepare(n), buffer(b)));
}
// Read fixed frame header
// Requires at least 2 bytes
//
template<class DynamicBuffer>
std::size_t
read_fh1(frame_header& fh, DynamicBuffer& db,
role_type role, close_code::value& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
std::uint8_t b[2];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
std::size_t need;
fh.len = b[1] & 0x7f;
switch(fh.len)
{
case 126: need = 2; break;
case 127: need = 8; break;
default:
need = 0;
}
fh.mask = (b[1] & 0x80) != 0;
if(fh.mask)
need += 4;
fh.op = static_cast<opcode>(b[0] & 0x0f);
fh.fin = (b[0] & 0x80) != 0;
fh.rsv1 = (b[0] & 0x40) != 0;
fh.rsv2 = (b[0] & 0x20) != 0;
fh.rsv3 = (b[0] & 0x10) != 0;
// invalid length for control message
if(is_control(fh.op) && fh.len > 125)
{
code = close_code::protocol_error;
return 0;
}
// reserved bits not cleared
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
code = close_code::protocol_error;
return 0;
}
// reserved opcode
if(is_reserved(fh.op))
{
code = close_code::protocol_error;
return 0;
}
// fragmented control message
if(is_control(fh.op) && ! fh.fin)
{
code = close_code::protocol_error;
return 0;
}
// unmasked frame from client
if(role == role_type::server && ! fh.mask)
{
code = close_code::protocol_error;
return 0;
}
// masked frame from server
if(role == role_type::client && fh.mask)
{
code = close_code::protocol_error;
return 0;
}
code = close_code::none;
return need;
}
// Decode variable frame header from stream
//
template<class DynamicBuffer>
void
read_fh2(frame_header& fh, DynamicBuffer& db,
role_type role, close_code::value& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
using namespace boost::endian;
switch(fh.len)
{
case 126:
{
std::uint8_t b[2];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.len = big_uint16_to_native(&b[0]);
// length not canonical
if(fh.len < 126)
{
code = close_code::protocol_error;
return;
}
break;
}
case 127:
{
std::uint8_t b[8];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.len = big_uint64_to_native(&b[0]);
// length not canonical
if(fh.len < 65536)
{
code = close_code::protocol_error;
return;
}
break;
}
}
if(fh.mask)
{
std::uint8_t b[4];
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
fh.key = little_uint32_to_native(&b[0]);
}
else
{
// initialize this otherwise operator== breaks
fh.key = 0;
}
code = close_code::none;
}
// Read data from buffers
// This is for ping and pong payloads
//

View File

@@ -51,11 +51,23 @@ clamp(UInt x, std::size_t limit)
using pong_cb = std::function<void(ping_data const&)>;
/// Identifies the role of a WebSockets stream.
enum class role_type
{
/// Stream is operating as a client.
client,
/// Stream is operating as a server.
server
};
//------------------------------------------------------------------------------
struct stream_base
{
protected:
friend class frame_test;
struct op {};
detail::maskgen maskgen_; // source of mask keys
@@ -102,9 +114,13 @@ protected:
void
open(role_type role);
template<class = void>
template<class DynamicBuffer>
std::size_t
read_fh1(DynamicBuffer& db, close_code::value& code);
template<class DynamicBuffer>
void
prepare_fh(close_code::value& code);
read_fh2(DynamicBuffer& db, close_code::value& code);
template<class DynamicBuffer>
void
@@ -115,6 +131,287 @@ protected:
write_ping(DynamicBuffer& db, opcode op, ping_data const& data);
};
template<class _>
void
stream_base::open(role_type role)
{
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
failed_ = false;
rd_need_ = 0;
rd_cont_ = false;
wr_close_ = false;
wr_cont_ = false;
wr_block_ = nullptr; // should be nullptr on close anyway
pong_data_ = nullptr; // should be nullptr on close anyway
}
// Read fixed frame header
// Requires at least 2 bytes
//
template<class DynamicBuffer>
std::size_t
stream_base::
read_fh1(DynamicBuffer& db, close_code::value& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
auto const err =
[&](close_code::value cv)
{
code = cv;
return 0;
};
std::uint8_t b[2];
assert(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
std::size_t need;
rd_fh_.len = b[1] & 0x7f;
switch(rd_fh_.len)
{
case 126: need = 2; break;
case 127: need = 8; break;
default:
need = 0;
}
rd_fh_.mask = (b[1] & 0x80) != 0;
if(rd_fh_.mask)
need += 4;
rd_fh_.op = static_cast<opcode>(b[0] & 0x0f);
rd_fh_.fin = (b[0] & 0x80) != 0;
rd_fh_.rsv1 = (b[0] & 0x40) != 0;
rd_fh_.rsv2 = (b[0] & 0x20) != 0;
rd_fh_.rsv3 = (b[0] & 0x10) != 0;
switch(rd_fh_.op)
{
case opcode::binary:
case opcode::text:
if(rd_cont_)
{
// new data frame when continuation expected
return err(close_code::protocol_error);
}
if(rd_fh_.rsv1 || rd_fh_.rsv2 || rd_fh_.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
case opcode::cont:
if(! rd_cont_)
{
// continuation without an active message
return err(close_code::protocol_error);
}
if(rd_fh_.rsv1 || rd_fh_.rsv2 || rd_fh_.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
default:
if(is_reserved(rd_fh_.op))
{
// reserved opcode
return err(close_code::protocol_error);
}
if(! rd_fh_.fin)
{
// fragmented control message
return err(close_code::protocol_error);
}
if(rd_fh_.len > 125)
{
// invalid length for control message
return err(close_code::protocol_error);
}
if(rd_fh_.rsv1 || rd_fh_.rsv2 || rd_fh_.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
}
// unmasked frame from client
if(role_ == role_type::server && ! rd_fh_.mask)
{
code = close_code::protocol_error;
return 0;
}
// masked frame from server
if(role_ == role_type::client && rd_fh_.mask)
{
code = close_code::protocol_error;
return 0;
}
code = close_code::none;
return need;
}
// Decode variable frame header from stream
//
template<class DynamicBuffer>
void
stream_base::
read_fh2(DynamicBuffer& db, close_code::value& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
using namespace boost::endian;
switch(rd_fh_.len)
{
case 126:
{
std::uint8_t b[2];
assert(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
rd_fh_.len = big_uint16_to_native(&b[0]);
// length not canonical
if(rd_fh_.len < 126)
{
code = close_code::protocol_error;
return;
}
break;
}
case 127:
{
std::uint8_t b[8];
assert(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
rd_fh_.len = big_uint64_to_native(&b[0]);
// length not canonical
if(rd_fh_.len < 65536)
{
code = close_code::protocol_error;
return;
}
break;
}
}
if(rd_fh_.mask)
{
std::uint8_t b[4];
assert(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
rd_fh_.key = little_uint32_to_native(&b[0]);
}
else
{
// initialize this otherwise operator== breaks
rd_fh_.key = 0;
}
if(rd_fh_.mask)
prepare_key(rd_key_, rd_fh_.key);
if(! is_control(rd_fh_.op))
{
if(rd_fh_.op != opcode::cont)
{
rd_size_ = rd_fh_.len;
rd_opcode_ = rd_fh_.op;
}
else
{
if(rd_size_ > std::numeric_limits<
std::uint64_t>::max() - rd_fh_.len)
{
code = close_code::too_big;
return;
}
rd_size_ += rd_fh_.len;
}
if(rd_msg_max_ && rd_size_ > rd_msg_max_)
{
code = close_code::too_big;
return;
}
rd_need_ = rd_fh_.len;
rd_cont_ = ! rd_fh_.fin;
}
code = close_code::none;
}
template<class DynamicBuffer>
void
stream_base::write_close(
DynamicBuffer& db, close_reason const& cr)
{
using namespace boost::endian;
frame_header fh;
fh.op = opcode::close;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
fh.mask = role_ == detail::role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(cr.code != close_code::none)
{
detail::prepared_key_type key;
if(fh.mask)
detail::prepare_key(key, fh.key);
{
std::uint8_t b[2];
::new(&b[0]) big_uint16_buf_t{
(std::uint16_t)cr.code};
auto d = db.prepare(2);
boost::asio::buffer_copy(d,
boost::asio::buffer(b));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(2);
}
if(! cr.reason.empty())
{
auto d = db.prepare(cr.reason.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffer(
cr.reason.data(), cr.reason.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(cr.reason.size());
}
}
}
template<class DynamicBuffer>
void
stream_base::write_ping(DynamicBuffer& db,
opcode op, ping_data const& data)
{
frame_header fh;
fh.op = op;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = data.size();
fh.mask = role_ == role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(data.empty())
return;
detail::prepared_key_type key;
if(fh.mask)
detail::prepare_key(key, fh.key);
auto d = db.prepare(data.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffers_1(
data.data(), data.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(data.size());
}
} // detail
} // websocket
} // beast

View File

@@ -243,8 +243,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
{
d.fb.commit(bytes_transferred);
code = close_code::none;
auto const n = detail::read_fh1(
d.ws.rd_fh_, d.fb, d.ws.role_, code);
auto const n = d.ws.read_fh1(d.fb, code);
if(code != close_code::none)
{
// protocol error
@@ -266,10 +265,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
case do_read_fh + 2:
d.fb.commit(bytes_transferred);
code = close_code::none;
detail::read_fh2(d.ws.rd_fh_,
d.fb, d.ws.role_, code);
if(code == close_code::none)
d.ws.prepare_fh(code);
d.ws.read_fh2(d.fb, code);
if(code != close_code::none)
{
// protocol error

View File

@@ -38,142 +38,6 @@
namespace beast {
namespace websocket {
namespace detail {
template<class _>
void
stream_base::open(role_type role)
{
role_ = role;
}
template<class _>
void
stream_base::prepare_fh(close_code::value& code)
{
// continuation without an active message
if(! rd_cont_ && rd_fh_.op == opcode::cont)
{
code = close_code::protocol_error;
return;
}
// new data frame when continuation expected
if(rd_cont_ && ! is_control(rd_fh_.op) &&
rd_fh_.op != opcode::cont)
{
code = close_code::protocol_error;
return;
}
if(rd_fh_.mask)
prepare_key(rd_key_, rd_fh_.key);
if(! is_control(rd_fh_.op))
{
if(rd_fh_.op != opcode::cont)
{
rd_size_ = rd_fh_.len;
rd_opcode_ = rd_fh_.op;
}
else
{
if(rd_size_ > std::numeric_limits<
std::uint64_t>::max() - rd_fh_.len)
{
code = close_code::too_big;
return;
}
rd_size_ += rd_fh_.len;
}
if(rd_msg_max_ && rd_size_ > rd_msg_max_)
{
code = close_code::too_big;
return;
}
rd_need_ = rd_fh_.len;
rd_cont_ = ! rd_fh_.fin;
}
}
template<class DynamicBuffer>
void
stream_base::write_close(
DynamicBuffer& db, close_reason const& cr)
{
using namespace boost::endian;
frame_header fh;
fh.op = opcode::close;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
fh.mask = role_ == detail::role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(cr.code != close_code::none)
{
detail::prepared_key_type key;
if(fh.mask)
detail::prepare_key(key, fh.key);
{
std::uint8_t b[2];
::new(&b[0]) big_uint16_buf_t{
(std::uint16_t)cr.code};
auto d = db.prepare(2);
boost::asio::buffer_copy(d,
boost::asio::buffer(b));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(2);
}
if(! cr.reason.empty())
{
auto d = db.prepare(cr.reason.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffer(
cr.reason.data(), cr.reason.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(cr.reason.size());
}
}
}
template<class DynamicBuffer>
void
stream_base::write_ping(DynamicBuffer& db,
opcode op, ping_data const& data)
{
frame_header fh;
fh.op = op;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = data.size();
fh.mask = role_ == role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(db, fh);
if(data.empty())
return;
detail::prepared_key_type key;
if(fh.mask)
detail::prepare_key(key, fh.key);
auto d = db.prepare(data.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffers_1(
data.data(), data.size()));
if(fh.mask)
detail::mask_inplace(d, key);
db.commit(data.size());
}
} // detail
//------------------------------------------------------------------------------
template<class NextLayer>
template<class... Args>
stream<NextLayer>::
@@ -1037,8 +901,7 @@ do_read_fh(detail::frame_streambuf& fb,
stream_, fb.prepare(2), ec));
if(ec)
return;
auto const n = detail::read_fh1(
rd_fh_, fb, role_, code);
auto const n = read_fh1(fb, code);
if(code != close_code::none)
return;
if(n > 0)
@@ -1048,11 +911,7 @@ do_read_fh(detail::frame_streambuf& fb,
if(ec)
return;
}
detail::read_fh2(
rd_fh_, fb, role_, code);
if(code != close_code::none)
return;
prepare_fh(code);
read_fh2(fb, code);
}
} // websocket

View File

@@ -6,6 +6,7 @@
//
#include <beast/websocket/detail/frame.hpp>
#include <beast/websocket/detail/stream_base.hpp>
#include <beast/unit_test/suite.hpp>
#include <initializer_list>
#include <climits>
@@ -76,20 +77,20 @@ public:
{
fh_streambuf sb;
write(sb, fh);
frame_header fh1;
close_code::value code;
auto const n = read_fh1(
fh1, sb, role, code);
stream_base stream;
stream.open(role);
auto const n = stream.read_fh1(sb, code);
if(! BEAST_EXPECT(! code))
return;
if(! BEAST_EXPECT(sb.size() == n))
return;
read_fh2(fh1, sb, role, code);
stream.read_fh2(sb, code);
if(! BEAST_EXPECT(! code))
return;
if(! BEAST_EXPECT(sb.size() == 0))
return;
BEAST_EXPECT(fh1 == fh);
BEAST_EXPECT(stream.rd_fh_ == fh);
};
test_fh fh;
@@ -113,7 +114,7 @@ public:
fh.len = 65536;
check(fh);
fh.len = std::numeric_limits<std::uint64_t>::max();
fh.len = 65537;
check(fh);
}
@@ -126,10 +127,10 @@ public:
{
fh_streambuf sb;
write(sb, fh);
frame_header fh1;
close_code::value code;
auto const n = read_fh1(
fh1, sb, role, code);
stream_base stream;
stream.open(role);
auto const n = stream.read_fh1(sb, code);
if(code)
{
pass();
@@ -137,7 +138,7 @@ public:
}
if(! BEAST_EXPECT(sb.size() == n))
return;
read_fh2(fh1, sb, role, code);
stream.read_fh2(sb, code);
if(! BEAST_EXPECT(code))
return;
if(! BEAST_EXPECT(sb.size() == 0))
@@ -186,15 +187,14 @@ public:
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
static role_type constexpr role =
role_type::client;
static role_type constexpr role = role_type::client;
std::vector<std::uint8_t> v{bs};
fh_streambuf sb;
sb.commit(buffer_copy(
sb.prepare(v.size()), buffer(v)));
frame_header fh;
sb.commit(buffer_copy(sb.prepare(v.size()), buffer(v)));
stream_base stream;
stream.open(role);
close_code::value code;
auto const n = read_fh1(fh, sb, role, code);
auto const n = stream.read_fh1(sb, code);
if(code)
{
pass();
@@ -202,7 +202,7 @@ public:
}
if(! BEAST_EXPECT(sb.size() == n))
return;
read_fh2(fh, sb, role, code);
stream.read_fh2(sb, code);
if(! BEAST_EXPECT(code))
return;
if(! BEAST_EXPECT(sb.size() == 0))
@@ -233,4 +233,3 @@ BEAST_DEFINE_TESTSUITE(frame,websocket,beast);
} // detail
} // websocket
} // beast