WebSocket ping, fixes, coverage:

* Improve test coverage
* tests for invokable in composed ops

* Update documentation
* Add License badge to README
* Target Windows 7 SDK and later
* Make role_type private
* Remove extra unused masking functions
* Allow stream reuse / reconnect after failure
* Restructure logic of composed operations
* Allow 0 for read_message_max meaning no limit
* Respect keep alive when building HTTP responses
* Check version in upgrade request
* Response with 426 status on unsupported WebSocket version
* Remove unnecessary Sec-WebSocket-Key in HTTP responses
* Rename to mask_buffer_size

* Remove maybe_throw
* Add ping, async_ping, async_on_pong
* Add ping_op
* Add pong_op
* Fix crash in accept_op
* Fix suspend in close_op
* Fix read_frame_op logic
* Fix crash in read_op
* Fix races in echo sync and async echo servers
This commit is contained in:
Vinnie Falco
2016-05-15 16:22:25 -04:00
parent bfb840fe8e
commit 039244cda4
40 changed files with 2757 additions and 1365 deletions

View File

@@ -106,7 +106,7 @@ project beast
<os>SOLARIS:<define>__EXTENSIONS__
<os>SOLARIS:<library>socket
<os>SOLARIS:<library>nsl
<os>NT:<define>_WIN32_WINNT=0x0501
<os>NT:<define>_WIN32_WINNT=0x0601
<os>NT,<toolset>cw:<library>ws2_32
<os>NT,<toolset>cw:<library>mswsock
<os>NT,<toolset>gcc:<library>ws2_32

View File

@@ -2,7 +2,8 @@
[![Build Status](https://travis-ci.org/vinniefalco/Beast.svg?branch=master)](https://travis-ci.org/vinniefalco/Beast) [![codecov]
(https://codecov.io/gh/vinniefalco/Beast/branch/master/graph/badge.svg)](https://codecov.io/gh/vinniefalco/Beast) [![Documentation]
(https://img.shields.io/badge/documentation-master-brightgreen.svg)](http://vinniefalco.github.io/beast/)
(https://img.shields.io/badge/documentation-master-brightgreen.svg)](http://vinniefalco.github.io/beast/) [![License]
(https://img.shields.io/badge/license-boost-brightgreen.svg)](LICENSE_1_0.txt)
Beast provides implementations of the HTTP and WebSocket protocols
built on top of Boost.Asio and other parts of boost.

View File

@@ -36,6 +36,9 @@ WebSocket:
* Replace stream::error_ with stream::state_, example states: ok, error, abort_io
Need a cancel state so waking up a ping stored in invokable knows to call the
final handler with operation_aborted
* Use close_code::no_code instead of close_code::none
* Make request_type, response_type public APIs,
use in stream member function signatures
HTTP:
* Define Parser concept in HTTP

View File

@@ -22,7 +22,6 @@
[template mdash[] '''&mdash; ''']
[template indexterm1[term1] '''<indexterm><primary>'''[term1]'''</primary></indexterm>''']
[template indexterm2[term1 term2] '''<indexterm><primary>'''[term1]'''</primary><secondary>'''[term2]'''</secondary></indexterm>''']
[template ticket[number]'''<ulink url="https://svn.boost.org/trac/boost/ticket/'''[number]'''">'''#[number]'''</ulink>''']
[def __POSIX__ /POSIX/]
[def __Windows__ /Windows/]
[def __accept__ [@http://www.opengroup.org/onlinepubs/000095399/functions/accept.html `accept()`]]

View File

@@ -73,16 +73,20 @@
<bridgehead renderas="sect3">Classes</bridgehead>
<simplelist type="vert" columns="1">
<member><link linkend="beast.ref.websocket__close_reason">close_reason</link></member>
<member><link linkend="beast.ref.websocket__ping_data">ping_data</link></member>
<member><link linkend="beast.ref.websocket__stream">stream</link></member>
<member><link linkend="beast.ref.websocket__reason_string">reason_string</link></member>
</simplelist>
<bridgehead renderas="sect3">Options</bridgehead>
<simplelist type="vert" columns="1">
<member><link linkend="beast.ref.websocket__auto_fragment_size">auto_fragment_size</link></member>
<member><link linkend="beast.ref.websocket__decorate">decorate</link></member>
<member><link linkend="beast.ref.websocket__keep_alive">keep_alive</link></member>
<member><link linkend="beast.ref.websocket__mask_buffer_size">mask_buffer_size</link></member>
<member><link linkend="beast.ref.websocket__message_type">message_type</link></member>
<member><link linkend="beast.ref.websocket__pong_callback">pong_callback</link></member>
<member><link linkend="beast.ref.websocket__read_buffer_size">read_buffer_size</link></member>
<member><link linkend="beast.ref.websocket__read_message_max">read_message_max</link></member>
<member><link linkend="beast.ref.websocket__write_buffer_size">write_buffer_size</link></member>
</simplelist>
</entry>
<entry valign="top">
@@ -94,6 +98,7 @@
<bridgehead renderas="sect3">Constants</bridgehead>
<simplelist type="vert" columns="1">
<member><link linkend="beast.ref.websocket__close_code">close_code</link></member>
<member><link linkend="beast.ref.websocket__error">error</link></member>
<member><link linkend="beast.ref.websocket__opcode">opcode</link></member>
</simplelist>
</entry>

View File

@@ -9,7 +9,7 @@
A `BufferSequence` is a type meeting either of the following requirements:
* [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/ConstBufferSequence.html [*`ConstBufferSequence`]]
* [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/MutableBufferSequence.html [*`MutableBufferSequence`]]
* [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/ConstBufferSequence.html [*`ConstBufferSequence`]]
* [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/MutableBufferSequence.html [*`MutableBufferSequence`]]
[endsect]

View File

@@ -17,7 +17,7 @@ In this table:
* `a` denotes a value of type `X`.
* `b` is a value meeting the requirements of [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/ConvertibleToConstBuffer.html [*`ConvertibleToConstBuffer`]].
* `b` is a value meeting the requirements of [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/ConvertibleToConstBuffer.html [*`ConvertibleToConstBuffer`]].
* `ec` is a value of type [link beast.ref.error_code `error_code&`].

View File

@@ -40,12 +40,12 @@ In the table below:
[
[`X::const_buffers_type`]
[`T`]
[`T` meets the requirements for [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/ConstBufferSequence.html `ConstBufferSequence`].]
[`T` meets the requirements for [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/ConstBufferSequence.html `ConstBufferSequence`].]
]
[
[`X::mutable_buffers_type`]
[`U`]
[`U` meets the requirements for [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/MutableBufferSequence.html `MutableBufferSequence`].]
[`U` meets the requirements for [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/MutableBufferSequence.html `MutableBufferSequence`].]
]
[
[`a.commit(n)`]

View File

@@ -14,21 +14,21 @@ asynchronous I/O. They are based on concepts from `boost::asio`.
A type modeling [*`Stream`] meets either or both of the following requirements:
* [link beast.types.streams.AsyncStream [*`AsyncStream`]]
* [link beast.types.streams.SyncStream [*`SyncStream`]]
* [*`AsyncStream`]
* [*`SyncStream`]
[heading:AsyncStream AsyncStream]
A type modeling [*`AsyncStream`] meets the following requirements:
* [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/AsyncReadStream.html [*`AsyncReadStream`]]
* [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/AsyncWriteStream.html [*`AsyncWriteStream`]]
* [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/AsyncReadStream.html [*`AsyncReadStream`]]
* [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/AsyncWriteStream.html [*`AsyncWriteStream`]]
[heading:SyncStream SyncStream]
A type modeling [*`SyncStream`] meets the following requirements:
* [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/SyncReadStream.html [*`SyncReadStream`]]
* [@http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/SyncWriteStream.html [*`SyncWriteStream`]]
* [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/SyncReadStream.html [*`SyncReadStream`]]
* [@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/SyncWriteStream.html [*`SyncWriteStream`]]
[endsect]

View File

@@ -46,13 +46,15 @@ model, handler allocation, and handler invocation hooks. Calls to
Beast.WebSocket asynchronous initiation functions allow callers the choice
of using a completion handler, stackful or stackless coroutines, futures,
or user defined customizations (for example, Boost.Fiber). The
implementation uses handler invocation hooks (`asio_handler_invoke`),
implementation uses handler invocation hooks
([@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/asio_handler_invoke.html `asio_handler_invoke`]),
providing execution guarantees on composed operations in a manner
identical to Boost.Asio. The implementation also uses handler allocation
hooks (`asio_handler_allocate`) when allocating memory internally for
composed operations.
identical to Boost.Asio. The implementation also uses handler allocation hooks
([@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/asio_handler_allocate.html `asio_handler_allocate`])
when allocating memory internally for composed operations.
There is no need for inheritance or virtual members in `websocket::stream`.
There is no need for inheritance or virtual members in a
[link beast.ref.websocket__stream `beast::websocket::stream`].
All operations are templated and transparent to the compiler, allowing for
maximum inlining and optimization.
@@ -67,43 +69,47 @@ both Boost.Asio and the WebSocket protocol specification described in
[section:creating Creating the socket]
The interface to Beast's WebSocket implementation is a single template
class [link beast.ref.websocket__stream `websocket::stream`] which wraps a
"next layer" object. The next layer object must meet the requirements of
`SyncReadStream` and `SyncWriteStream` if synchronous operations are performed,
`AsyncReadStream` and `AsyncWriteStream` is asynchronous operations are
performed, or both. Arguments supplied during construction are passed to
next layer's constructor. Here we declare two websockets which have ownership
of the next layer:
class [link beast.ref.websocket__stream `beast::websocket::stream`] which
wraps a "next layer" object. The next layer object must meet the requirements
of [link beast.types.streams.SyncStream [*`SyncReadStream`]] if synchronous
operations are performed, or
[link beast.types.streams.AsyncStream [*`AsyncStream`]] if asynchronous
operations are performed, or both. Arguments supplied during construction are
passed to next layer's constructor. Here we declare two websockets which have
ownership of the next layer:
```
io_service ios;
websocket::stream<ip::tcp::socket> ws(ios);
boost::asio::io_service ios;
beast::websocket::stream<boost::asio::ip::tcp::socket> ws(ios);
ssl::context ctx(ssl::context::sslv23);
websocket::stream<ssl::stream<ip::tcp::socket>> wss(ios, ctx);
boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
beast::websocket::stream<
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> wss(ios, ctx);
```
For servers that can handshake in multiple protocols, it may be desired
to wrap an object that already exists. This socket can be moved in:
```
tcp::socket&& sock;
boost::asio::ip::tcp::socket&& sock;
...
websocket::stream<ip::tcp::socket> ws(std::move(sock));
beast::websocket::stream<
boost::asio::ip::tcp::socket> ws(std::move(sock));
```
Or, the wrapper can be constructed with a non-owning reference. In
this case, the caller is responsible for managing the lifetime of the
underlying socket being wrapped:
```
tcp::socket sock;
boost::asio::ip::tcp::socket sock;
...
websocket::stream<ip::tcp::socket&> ws(sock);
beast::websocket::stream<boost::asio::ip::tcp::socket&> ws(sock);
```
The layer being wrapped can be accessed through the websocket's "next layer",
permitting callers to interact directly with its interface.
```
ssl::context ctx(ssl::context::sslv23);
websocket::stream<ssl::stream<ip::tcp::socket>> ws(ios, ctx);
boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
beast::websocket::stream<
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> ws(ios, ctx);
...
ws.next_layer().shutdown(); // ssl::stream shutdown
```
@@ -122,17 +128,18 @@ Connections are established by using the interfaces which already exist
for the next layer. For example, making an outgoing connection:
```
std::string const host = "mywebapp.com";
io_service ios;
tcp::resolver r(ios);
websocket::stream<ip::tcp::socket> ws(ios);
connect(ws.next_layer(), r.resolve(tcp::resolver::query{host, "ws"}));
boost::asio::io_service ios;
boost::asio::ip::tcp::resolver r(ios);
beast::websocket::stream<boost::asio::ip::tcp::socket> ws(ios);
boost::asio::connect(ws.next_layer(),
r.resolve(boost::asio::ip::tcp::resolver::query{host, "ws"}));
```
Accepting an incoming connection:
```
void do_accept(tcp::acceptor& acceptor)
void do_accept(boost::asio::ip::tcp::acceptor& acceptor)
{
websocket::stream<ip::tcp::socket> ws(acceptor.get_io_service());
beast::websocket::stream<boost::asio::ip::tcp::socket> ws(acceptor.get_io_service());
acceptor.accept(ws.next_layer());
}
```
@@ -149,26 +156,26 @@ A WebSocket session begins when one side sends the HTTP Upgrade request
for websocket, and the other side sends an appropriate HTTP response
indicating that the request was accepted and that the connection has
been upgraded. The HTTP Upgrade request must include the Host HTTP field,
and the URI of the resource to request. `hanshake` is used to send the
and the URI of the resource to request. `handshake` is used to send the
request with the required host and resource strings.
```
websocket::stream<ip::tcp::socket> ws(ios);
beast::websocket::stream<boost::asio::ip::tcp::socket> ws(ios);
...
ws.set_option(websocket::keep_alive(true));
ws.handshake("ws.mywebapp.com:80", "/cgi-bin/bitcoin-prices");
ws.set_option(beast::websocket::keep_alive(true));
ws.handshake("ws.example.com:80", "/cgi-bin/bitcoin-prices");
```
The `websocket::stream` automatically handles receiving and processing
the HTTP response to the handshake request. The call to handshake is
successful if a HTTP response is received with the 101 "Switching Protocols"
status code. On failure, an error is returned or an exception is thrown.
Depending on the keep alive setting, the socket may remain open for a
subsequent handshake attempt
The [link beast.ref.websocket__stream `beast::websocket::stream`] automatically
handles receiving and processing the HTTP response to the handshake request.
The call to handshake is successful if a HTTP response is received with the
101 "Switching Protocols" status code. On failure, an error is returned or an
exception is thrown. Depending on the keep alive setting, the socket may remain
open for a subsequent handshake attempt
Performing a handshake for an incoming websocket upgrade request operates
similarly. If the handshake fails, an error is returned or exception thrown:
```
websocket::stream<ip::tcp::socket> ws(ios);
beast::websocket::stream<boost::asio::ip::tcp::socket> ws(ios);
...
ws.accept();
```
@@ -178,12 +185,12 @@ on the connection, or might have already received an entire HTTP request
containing the upgrade request. Overloads of `accept` allow callers to
pass in this additional buffered handshake data.
```
void do_accept(tcp::socket& sock)
void do_accept(boost::asio::ip::tcp::socket& sock)
{
boost::asio::streambuf sb;
read_until(sock, sb, "\r\n\r\n");
boost::asio::read_until(sock, sb, "\r\n\r\n");
...
websocket::stream<ip::tcp::socket&> ws(sock);
beast::websocket::stream<boost::asio::ip::tcp::socket&> ws(sock);
ws.accept(sb.data());
...
}
@@ -192,12 +199,12 @@ void do_accept(tcp::socket& sock)
Alternatively, the caller can pass an entire HTTP request if it was
obtained elsewhere:
```
void do_accept(tcp::socket& sock)
void do_accept(boost::asio::ip::tcp::socket& sock)
{
boost::asio::streambuf sb;
http::request<http::empty_body> request;
http::read(sock, request);
if(http::is_upgrade(request))
beast::http::request<http::empty_body> request;
beast::http::read(sock, request);
if(beast::http::is_upgrade(request))
{
websocket::stream<ip::tcp::socket&> ws(sock);
ws.accept(request);
@@ -206,8 +213,6 @@ void do_accept(tcp::socket& sock)
}
```
[note Identifiers in the `http` namespace are part of Beast.HTTP. ]
[endsect]
@@ -218,20 +223,21 @@ After the WebSocket handshake is accomplished, callers may send and receive
messages using the message oriented interface. This interface requires that
all of the buffers representing the message are known ahead of time:
```
void echo(websocket::stream<ip::tcp::socket>& ws)
void echo(beast::websocket::stream<boost::asio::ip::tcp::socket>& ws)
{
streambuf sb;
websocket::opcode::value op;
beast::streambuf sb;
beast::websocket::opcode::value op;
ws.read(sb);
ws.set_option(websocket::message_type(op));
websocket::write(ws, sb.data());
ws.set_option(beast::websocket::message_type(op));
ws.write(sb.data());
sb.consume(sb.size());
}
```
[important Calls to `set_option` must be made from the same implicit
or explicit strand as that used to perform other operations. ]
[important Calls to [link beast.ref.websocket__stream.set_option `set_option`]
must be made from the same implicit or explicit strand as that used to perform
other operations. ]
[endsect]
@@ -249,18 +255,19 @@ message ahead of time:
For these cases, the frame oriented interface may be used. This
example reads and echoes a complete message using this interface:
```
void echo(websocket::stream<ip::tcp::socket>& ws)
void echo(beast::websocket::stream<boost::asio::ip::tcp::socket>& ws)
{
streambuf sb;
websocket::frame_info fi;
beast::streambuf sb;
beast::websocket::frame_info fi;
for(;;)
{
ws.read_frame(fi, sb);
if(fi.fin)
break;
}
ws.set_option(websocket::message_type(fi.op));
consuming_buffers<streambuf::const_buffers_type> cb(sb.data());
ws.set_option(beast::websocket::message_type(fi.op));
beast::consuming_buffers<
beast::streambuf::const_buffers_type> cb(sb.data());
for(;;)
{
using boost::asio::buffer_size;
@@ -287,10 +294,11 @@ void echo(websocket::stream<ip::tcp::socket>& ws)
During read operations, the implementation automatically reads and processes
WebSocket control frames such as ping, pong, and close. Pings are replied
to as soon as possible, pongs are noted. The receipt of a close frame
initiates the WebSocket close procedure, eventually resulting in the
error code `websocket::error::closed` being delivered to the caller in
a subsequent read operation, assuming no other error takes place.
to as soon as possible, pongs are delivered to the pong callback. The receipt
of a close frame initiates the WebSocket close procedure, eventually resulting
in the error code [link beast.ref.websocket__error `error::closed`] being
delivered to the caller in a subsequent read operation, assuming no other error
takes place.
To ensure timely delivery of control frames, large messages are broken up
into smaller sized frames. The implementation chooses the size and number
@@ -298,19 +306,49 @@ of the frames making up the message. The automatic fragment size option
gives callers control over the size of these frames:
```
...
ws.set_option(websocket::auto_fragment_size(8192));
ws.set_option(beast::websocket::auto_fragment_size(8192));
```
The WebSocket protocol defines a procedure and control message for initiating
a close of the session. Handling of close initiated by the remote end of the
connection is performed automatically. To manually initiate a close, use
`websocket::stream::close`:
[link beast.ref.websocket__stream.close `close`]:
```
ws.close();
```
[note To receive the `websocket::error::closed` error, a read operation
is required. ]
[note To receive the [link beast.ref.websocket__error `error::closed`]
error, a read operation is required. ]
[endsect]
[section:pongs Pong messages]
To receive pong control frames, callers may register a "pong callback" using
[link beast.ref.websocket__stream.set_option `set_option`]:
the following signature:
```
void on_pong(ping_data const& payload);
...
ws.set_option(pong_callback{&on_pong});
```
When a pong callback is registered, any pongs received through either
synchronous read functions or asynchronous read functions will invoke the
pong callback, passing the payload in the pong message as the argument.
Unlike regular completion handlers used in calls to asynchronous initiation
functions, the pong callback only needs to be set once. The callback is not
reset when a pong is received. The same callback is used for both synchronous
and asynchronous reads. The pong callback is passive; in order to receive
pongs, a synchronous or asynchronous stream read function must be active.
[note When an asynchronous read function receives a pong, the the pong callback
is invoked in the same manner as that used to invoke the final completion
handler of the corresponding read function.]
[endsect]
@@ -320,7 +358,8 @@ is required. ]
Because calls to read data may return a variable amount of bytes, the
interface to calls that read data require an object that meets the requirements
of `Streambuf`. This concept is modeled on `boost::asio::basic_streambuf`.
of [link beast.types.Streambuf [*`Streambuf`]]. This concept is modeled on
[@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/basic_streambuf.html `boost::asio::basic_streambuf`].
The implementation does not perform queueing or buffering of messages. If
desired, these features should be provided by callers. The impact of this
@@ -331,6 +370,7 @@ of the underlying TCP/IP connection.
[endsect]
[section:async Asynchronous interface]
Asynchronous versions are available for all functions:
@@ -361,12 +401,14 @@ void echo(websocket::stream<ip::tcp::socket>& ws,
[section:io_service io_service]
[section:io_service The io_service]
The creation and operation of the `boost::asio::io_service` associated with
the underlying stream is left to the callers, permitting any implementation
strategy including one that does not require threads for environments where
threads are unavailable. Beast.WSProto itself does not use or require threads.
The creation and operation of the
[@http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html `boost::asio::io_service`]
associated with the underlying stream is left to the callers, permitting any
implementation strategy including one that does not require threads for
environments where threads are unavailable. Beast.WebSocket itself does not
use or require threads.
[endsect]
@@ -374,13 +416,13 @@ threads are unavailable. Beast.WSProto itself does not use or require threads.
[section:safety Thread Safety]
Like a regular asio socket, a `websocket::stream` is not thread safe. Callers
are responsible for synchronizing operations on the socket using an implicit
or explicit strand, as per the Asio documentation. The asynchronous interface
supports one active read and one active write simultaneously. Undefined
behavior results if two or more reads or two or more writes are attempted
concurrently. Caller initiated WebSocket ping, pong, and close operations
each count as an active write.
Like a regular asio socket, a [link beast.ref.websocket__stream `stream`] is
not thread safe. Callers are responsible for synchronizing operations on the
socket using an implicit or explicit strand, as per the Asio documentation.
The asynchronous interface supports one active read and one active write
simultaneously. Undefined behavior results if two or more reads or two or
more writes are attempted concurrently. Caller initiated WebSocket ping, pong,
and close operations each count as an active write.
The implementation uses composed asynchronous operations internally; a high
level read can cause both reads and writes to take place on the underlying

View File

@@ -157,25 +157,35 @@ public:
return next_layer_.async_write_some(buffers,
std::forward<WriteHandler>(handler));
}
friend
void
teardown(fail_stream<NextLayer>& stream,
boost::system::error_code& ec)
{
if(stream.pfc_->fail(ec))
return;
websocket_helpers::call_teardown(stream.next_layer(), ec);
}
template<class TeardownHandler>
friend
void
async_teardown(fail_stream<NextLayer>& stream,
TeardownHandler&& handler)
{
error_code ec;
if(stream.pfc_->fail(ec))
{
stream.get_io_service().post(
bind_handler(std::move(handler), ec));
return;
}
websocket_helpers::call_async_teardown(
stream.next_layer(), std::forward<TeardownHandler>(handler));
}
};
template<class NextLayer>
void
teardown(fail_stream<NextLayer>& stream,
boost::system::error_code& ec)
{
websocket_helpers::call_teardown(stream.next_layer(), ec);
}
template<class NextLayer, class TeardownHandler>
void
async_teardown(fail_stream<NextLayer>& stream,
TeardownHandler&& handler)
{
websocket_helpers::call_async_teardown(
stream.next_layer(), std::forward<TeardownHandler>(handler));
}
} // test
} // beast

View File

@@ -105,33 +105,6 @@ prepare_content_length(prepare_info& pi,
} // detail
template<bool isRequest, class Body, class Headers>
void
prepare_connection(
message_v1<isRequest, Body, Headers>& msg)
{
if(msg.version >= 11)
{
if(! msg.headers.exists("Content-Length") &&
! rfc2616::token_in_list(
msg.headers["Transfer-Encoding"], "chunked"))
if(! rfc2616::token_in_list(
msg.headers["Connection"], "close"))
msg.headers.insert("Connection", "close");
}
else
{
if(! msg.headers.exists("Content-Length"))
{
// VFALCO We are erasing the whole header when we
// should be removing just the keep-alive.
if(rfc2616::token_in_list(
msg.headers["Connection"], "keep-alive"))
msg.headers.erase("Connection");
}
}
}
template<
bool isRequest, class Body, class Headers,
class... Options>

View File

@@ -23,6 +23,16 @@ namespace beast {
namespace websocket {
namespace detail {
/// Identifies the role of a WebSockets stream.
enum class role_type
{
/// Stream is operating as a client.
client,
/// Stream is operating as a server.
server
};
// Contents of a WebSocket frame header
struct frame_header
{
@@ -286,8 +296,7 @@ read_fh2(frame_header& fh, Streambuf& sb,
//
template<class Buffers>
void
read(ping_payload_type& data,
Buffers const& bs, close_code::value& code)
read(ping_data& data, Buffers const& bs)
{
using boost::asio::buffer_copy;
using boost::asio::buffer_size;

View File

@@ -71,9 +71,9 @@ using maskgen = maskgen_t<std::mt19937>;
//------------------------------------------------------------------------------
//using prepared_key_type = std::size_t;
using prepared_key_type = std::uint32_t;
//using prepared_key_type = std::uint64_t;
using prepared_key_type =
std::conditional<sizeof(void*) == 8,
std::uint64_t, std::uint32_t>::type;
inline
void
@@ -93,19 +93,6 @@ prepare_key(std::uint64_t& prepared, std::uint32_t key)
template<class T>
inline
typename std::enable_if<std::is_integral<T>::value, T>::type
rol(T t, unsigned n = 1)
{
auto constexpr bits =
static_cast<unsigned>(
sizeof(T) * CHAR_BIT);
n &= bits-1;
return static_cast<T>((t << n) | (
static_cast<typename std::make_unsigned<T>::type>(t) >> (bits - n)));
}
template <class T>
inline
typename std::enable_if<std::is_integral<T>::value, T>::type
ror(T t, unsigned n = 1)
{
auto constexpr bits =
@@ -120,7 +107,7 @@ ror(T t, unsigned n = 1)
//
template<class = void>
void
mask_inplace_safe(
mask_inplace_general(
boost::asio::mutable_buffer const& b,
std::uint32_t& key)
{
@@ -151,7 +138,7 @@ mask_inplace_safe(
//
template<class = void>
void
mask_inplace_safe(
mask_inplace_general(
boost::asio::mutable_buffer const& b,
std::uint64_t& key)
{
@@ -186,164 +173,13 @@ mask_inplace_safe(
}
}
// 32-bit optimized
template<class = void>
void
mask_inplace_32(
boost::asio::mutable_buffer const& b,
std::uint32_t& key)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
auto n = buffer_size(b);
auto p = buffer_cast<std::uint8_t*>(b);
auto m = reinterpret_cast<
uintptr_t>(p) % sizeof(key);
switch(m)
{
case 1: *p ^= key ; ++p; --n;
case 2: *p ^= (key >> 8); ++p; --n;
case 3: *p ^= (key >>16); ++p; --n;
key = ror(key, m * 8);
case 0:
break;
}
for(auto i = n / sizeof(key); i; --i)
{
*reinterpret_cast<
std::uint32_t*>(p) ^= key;
p += sizeof(key);
}
n %= sizeof(key);
switch(n)
{
case 3: p[2] ^= (key >>16);
case 2: p[1] ^= (key >> 8);
case 1: p[0] ^= key;
key = ror(key, n*8);
default:
break;
}
}
// 64-bit optimized
//
template<class = void>
void
mask_inplace_64(
boost::asio::mutable_buffer const& b,
std::uint64_t& key)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
auto n = buffer_size(b);
auto p = buffer_cast<std::uint8_t*>(b);
auto m = reinterpret_cast<
uintptr_t>(p) % sizeof(key);
switch(m)
{
case 1: *p ^= key ; ++p; --n;
case 2: *p ^= (key >> 8); ++p; --n;
case 3: *p ^= (key >>16); ++p; --n;
case 4: *p ^= (key >>24); ++p; --n;
case 5: *p ^= (key >>32); ++p; --n;
case 6: *p ^= (key >>40); ++p; --n;
case 7: *p ^= (key >>48); ++p; --n;
key = ror(key, m * 8);
case 0:
break;
}
for(auto i = n / sizeof(key); i; --i)
{
*reinterpret_cast<
std::uint64_t*>(p) ^= key;
p += sizeof(key);
}
n %= sizeof(key);
switch(n)
{
case 3: p[2] ^= (key >>16);
case 2: p[1] ^= (key >> 8);
case 1: p[0] ^= key;
key = ror(key, n*8);
default:
break;
}
}
// 32-bit x86 optimized
//
template<class = void>
void
mask_inplace_x86(
boost::asio::mutable_buffer const& b,
std::uint32_t& key)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
auto n = buffer_size(b);
auto p = buffer_cast<std::uint8_t*>(b);
for(auto i = n / sizeof(key); i; --i)
{
*reinterpret_cast<
std::uint32_t*>(p) ^= key;
p += sizeof(key);
}
n %= sizeof(key);
switch(n)
{
case 3: p[2] ^= (key >>16);
case 2: p[1] ^= (key >> 8);
case 1: p[0] ^= key;
key = ror(key, n*8);
default:
break;
}
}
// 64-bit amd64 optimized
//
template<class = void>
void
mask_inplace_amd(
boost::asio::mutable_buffer const& b,
std::uint64_t& key)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
auto n = buffer_size(b);
auto p = buffer_cast<std::uint8_t*>(b);
for(auto i = n / sizeof(key); i; --i)
{
*reinterpret_cast<
std::uint64_t*>(p) ^= key;
p += sizeof(key);
}
n %= sizeof(key);
switch(n)
{
case 7: p[6] ^= (key >>16);
case 6: p[5] ^= (key >> 8);
case 5: p[4] ^= key;
case 4: p[3] ^= (key >>24);
case 3: p[2] ^= (key >>16);
case 2: p[1] ^= (key >> 8);
case 1: p[0] ^= key;
key = ror(key, n*8);
default:
break;
}
}
inline
void
mask_inplace(
boost::asio::mutable_buffer const& b,
std::uint32_t& key)
{
mask_inplace_safe(b, key);
//mask_inplace_32(b, key);
//mask_inplace_x86(b, key);
mask_inplace_general(b, key);
}
inline
@@ -352,9 +188,7 @@ mask_inplace(
boost::asio::mutable_buffer const& b,
std::uint64_t& key)
{
mask_inplace_safe(b, key);
//mask_inplace_64(b, key);
//mask_inplace_amd(b, key);
mask_inplace_general(b, key);
}
// Apply mask in place

View File

@@ -30,15 +30,6 @@ namespace beast {
namespace websocket {
namespace detail {
template<class String>
inline
void
maybe_throw(error_code const& ec, String const&)
{
if(ec)
throw system_error{ec};
}
template<class UInt>
static
std::size_t
@@ -59,6 +50,8 @@ clamp(UInt x, std::size_t limit)
return static_cast<std::size_t>(x);
}
using pong_cb = std::function<void(ping_data const&)>;
//------------------------------------------------------------------------------
struct stream_base
@@ -69,42 +62,46 @@ protected:
detail::maskgen maskgen_; // source of mask keys
decorator_type d_; // adorns http messages
bool keep_alive_ = false; // close on failed upgrade
role_type role_; // server or client
bool error_ = false; // non-zero ec was delivered
std::size_t rd_msg_max_ =
16 * 1024 * 1024; // max message size
std::size_t
wr_frag_size_ = 16 * 1024; // size of auto-fragments
std::size_t mask_buf_size_ = 4096; // mask buffer size
opcode wr_opcode_ = opcode::text; // outgoing message type
pong_cb pong_cb_; // pong callback
role_type role_; // server or client
bool failed_; // the connection failed
detail::frame_header rd_fh_; // current frame header
detail::prepared_key_type rd_key_; // prepared masking key
detail::utf8_checker rd_utf8_check_;// for current text msg
std::uint64_t rd_size_; // size of the current message so far
std::uint64_t rd_need_ = 0; // bytes left in msg frame payload
opcode rd_opcode_; // opcode of current msg
bool rd_cont_ = false; // expecting a continuation frame
bool rd_close_ = false; // got close frame
op* rd_block_ = nullptr; // op currently reading
bool rd_cont_; // expecting a continuation frame
std::size_t
wr_frag_size_ = 16 * 1024; // size of auto-fragments
std::size_t wr_buf_size_ = 4096; // write buffer size
opcode wr_opcode_ = opcode::text; // outgoing message type
bool wr_close_ = false; // sent close frame
bool wr_cont_ = false; // next write is continuation frame
op* wr_block_ = nullptr; // op currenly writing
bool wr_close_; // sent close frame
bool wr_cont_; // next write is continuation frame
op* wr_block_; // op currenly writing
ping_data* pong_data_; // where to put pong payload
invokable rd_op_; // invoked after write completes
invokable wr_op_; // invoked after read completes
close_reason cr_; // set from received close frame
stream_base(stream_base&&) = default;
stream_base(stream_base const&) = delete;
stream_base& operator=(stream_base&&) = default;
stream_base& operator=(stream_base const&) = delete;
stream_base()
: d_(new decorator<default_decorator>{})
{
}
stream_base(stream_base&&) = default;
stream_base(stream_base const&) = delete;
stream_base& operator=(stream_base&&) = default;
stream_base& operator=(stream_base const&) = delete;
template<class = void>
void
open(role_type role);
template<class = void>
void
@@ -118,7 +115,7 @@ protected:
template<class Streambuf>
void
write_ping(Streambuf& sb, opcode op,
ping_payload_type const& data);
ping_data const& data);
};
} // detail

View File

@@ -48,6 +48,7 @@ class stream<NextLayer>::accept_op
{
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
ws.reset();
ws.stream_.buffer().commit(buffer_copy(
ws.stream_.buffer().prepare(
buffer_size(buffers)), buffers));
@@ -133,8 +134,14 @@ operator()(error_code const& ec,
// got message
case 1:
// respond to request
#if 1
// VFALCO I have no idea why passing std::move(*this) crashes
d.state = 99;
d.ws.async_accept(d.req, *this);
#else
response_op<Handler>{
std::move(d.h), d.ws, d.req, true};
#endif
return;
}
}

View File

@@ -21,12 +21,9 @@ template<class NextLayer>
template<class Handler>
class stream<NextLayer>::close_op
{
using alloc_type =
handler_alloc<char, Handler>;
using fb_type =
detail::frame_streambuf;
using fmb_type =
typename fb_type::mutable_buffers_type;
using alloc_type = handler_alloc<char, Handler>;
using fb_type = detail::frame_streambuf;
struct data : op
{
@@ -64,16 +61,19 @@ public:
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
{
(*this)(error_code{}, 0, false);
(*this)(error_code{}, false);
}
void operator()()
{
(*this)(error_code{}, 0, true);
(*this)(error_code{});
}
void
operator()(error_code ec, std::size_t, bool again = true);
operator()(error_code ec, std::size_t);
void
operator()(error_code ec, bool again = true);
friend
void* asio_handler_allocate(
@@ -110,11 +110,25 @@ template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::close_op<Handler>::
operator()(error_code ec, std::size_t, bool again)
operator()(error_code ec, std::size_t)
{
auto& d = *d_;
if(ec)
d.ws.failed_ = true;
(*this)(ec);
}
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::close_op<Handler>::
operator()(error_code ec, bool again)
{
auto& d = *d_;
d.cont = d.cont || again;
while(! ec && d.state != 99)
if(ec)
goto upcall;
for(;;)
{
switch(d.state)
{
@@ -122,58 +136,52 @@ operator()(error_code ec, std::size_t, bool again)
if(d.ws.wr_block_)
{
// suspend
d.state = 1;
d.ws.rd_op_.template emplace<
d.state = 2;
d.ws.wr_op_.template emplace<
close_op>(std::move(*this));
return;
}
if(d.ws.error_)
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
d.state = 99;
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted, 0));
boost::asio::error::operation_aborted));
return;
}
d.state = 3;
break;
// fall through
// resume
case 1:
// VFALCO NOTE Should d.cont be `true` or false here?
// Does this count as a continuation of the original call
// to the asynchronous initiation function (async_close)?
d.state = 2;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, 0));
return;
case 2:
if(d.ws.error_)
{
// call handler
d.state = 99;
ec = boost::asio::error::operation_aborted;
break;
}
d.state = 3; // VFALCO fall through?
break;
case 3:
// send close
// send close frame
d.state = 99;
assert(! d.ws.wr_close_);
d.ws.wr_close_ = true;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;
case 2:
d.state = 3;
d.ws.get_io_service().post(
bind_handler(std::move(*this), ec));
return;
case 3:
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
ec = boost::asio::error::operation_aborted;
goto upcall;
}
d.state = 1;
break;
case 99:
goto upcall;
}
}
if(ec)
d.ws.error_ = true;
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.rd_op_.maybe_invoke();

View File

@@ -48,6 +48,7 @@ class stream<NextLayer>::handshake_op
, cont(boost_asio_handler_cont_helpers::
is_continuation(h))
{
ws.reset();
}
};

View File

@@ -0,0 +1,193 @@
//
// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BEAST_WEBSOCKET_IMPL_PING_OP_HPP
#define BEAST_WEBSOCKET_IMPL_PING_OP_HPP
#include <beast/core/bind_handler.hpp>
#include <beast/core/handler_alloc.hpp>
#include <beast/websocket/detail/frame.hpp>
#include <memory>
namespace beast {
namespace websocket {
// write a ping frame
//
template<class NextLayer>
template<class Handler>
class stream<NextLayer>::ping_op
{
using alloc_type =
handler_alloc<char, Handler>;
struct data : op
{
stream<NextLayer>& ws;
Handler h;
detail::frame_streambuf fb;
bool cont;
int state = 0;
template<class DeducedHandler>
data(DeducedHandler&& h_, stream<NextLayer>& ws_,
ping_data const& payload)
: ws(ws_)
, h(std::forward<DeducedHandler>(h_))
, cont(boost_asio_handler_cont_helpers::
is_continuation(h))
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
ws.template write_ping<static_streambuf>(
fb, opcode::ping, payload);
}
};
std::shared_ptr<data> d_;
public:
ping_op(ping_op&&) = default;
ping_op(ping_op const&) = default;
template<class DeducedHandler, class... Args>
ping_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(std::make_shared<data>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
{
(*this)(error_code{}, false);
}
void operator()()
{
(*this)(error_code{});
}
void operator()(error_code ec, std::size_t);
void operator()(error_code ec, bool again = true);
friend
void* asio_handler_allocate(
std::size_t size, ping_op* op)
{
return boost_asio_handler_alloc_helpers::
allocate(size, op->d_->h);
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, ping_op* op)
{
return boost_asio_handler_alloc_helpers::
deallocate(p, size, op->d_->h);
}
friend
bool asio_handler_is_continuation(ping_op* op)
{
return op->d_->cont;
}
template <class Function>
friend
void asio_handler_invoke(Function&& f, ping_op* op)
{
return boost_asio_handler_invoke_helpers::
invoke(f, op->d_->h);
}
};
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::ping_op<Handler>::
operator()(error_code ec, std::size_t)
{
auto& d = *d_;
if(ec)
d.ws.failed_ = true;
(*this)(ec);
}
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::
ping_op<Handler>::
operator()(error_code ec, bool again)
{
auto& d = *d_;
d.cont = d.cont || again;
if(ec)
goto upcall;
for(;;)
{
switch(d.state)
{
case 0:
if(d.ws.wr_block_)
{
// suspend
d.state = 2;
d.ws.wr_op_.template emplace<
ping_op>(std::move(*this));
return;
}
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
d.state = 99;
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted));
return;
}
// fall through
case 1:
// send ping frame
d.state = 99;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;
case 2:
d.state = 3;
d.ws.get_io_service().post(
bind_handler(std::move(*this), ec));
return;
case 3:
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
ec = boost::asio::error::operation_aborted;
goto upcall;
}
d.state = 1;
break;
case 99:
goto upcall;
}
}
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.rd_op_.maybe_invoke();
d.h(ec);
}
} // websocket
} // beast
#endif

View File

@@ -86,11 +86,14 @@ public:
void operator()(error_code const& ec)
{
(*this)(ec, 0);
(*this)(ec, 0, true);
}
void operator()(error_code ec,
std::size_t bytes_transferred, bool again = true);
std::size_t bytes_transferred);
void operator()(error_code ec,
std::size_t bytes_transferred, bool again);
friend
void* asio_handler_allocate(
@@ -127,393 +130,415 @@ template<class NextLayer>
template<class Buffers, class Handler>
void
stream<NextLayer>::read_frame_op<Buffers, Handler>::
operator()(error_code ec,std::size_t bytes_transferred, bool again)
operator()(error_code ec, std::size_t bytes_transferred)
{
auto& d = *d_;
d.cont = d.cont || again;
close_code::value code = close_code::none;
while(! ec && d.state != 99)
if(ec)
d.ws.failed_ = true;
(*this)(ec, bytes_transferred, true);
}
template<class NextLayer>
template<class Buffers, class Handler>
void
stream<NextLayer>::read_frame_op<Buffers, Handler>::
operator()(error_code ec,std::size_t bytes_transferred, bool again)
{
enum
{
switch(d.state)
do_start = 0,
do_read_payload = 1,
do_frame_done = 3,
do_read_fh = 4,
do_control_payload = 7,
do_control = 8,
do_pong_resume = 9,
do_pong = 11,
do_close_resume = 13,
do_close = 15,
do_fail = 18,
do_call_handler = 99
};
auto& d = *d_;
if(! ec)
{
d.cont = d.cont || again;
close_code::value code = close_code::none;
do
{
case 0:
if(d.ws.error_)
switch(d.state)
{
case do_start:
if(d.ws.failed_)
{
d.state = do_call_handler;
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted, 0));
return;
}
d.state = d.ws.rd_need_ > 0 ?
do_read_payload : do_read_fh;
break;
//------------------------------------------------------------------
case do_read_payload:
d.state = do_read_payload + 1;
d.smb = d.sb.prepare(
detail::clamp(d.ws.rd_need_));
// receive payload data
d.ws.stream_.async_read_some(
*d.smb, std::move(*this));
return;
case do_read_payload + 1:
{
d.ws.rd_need_ -= bytes_transferred;
auto const pb = prepare_buffers(
bytes_transferred, *d.smb);
if(d.ws.rd_fh_.mask)
detail::mask_inplace(pb, d.ws.rd_key_);
if(d.ws.rd_opcode_ == opcode::text)
{
if(! d.ws.rd_utf8_check_.write(pb) ||
(d.ws.rd_need_ == 0 && d.ws.rd_fh_.fin &&
! d.ws.rd_utf8_check_.finish()))
{
// invalid utf8
code = close_code::bad_payload;
d.state = do_fail;
break;
}
}
d.sb.commit(bytes_transferred);
if(d.ws.rd_need_ > 0)
{
d.state = do_read_payload;
break;
}
// fall through
}
//------------------------------------------------------------------
case do_frame_done:
// call handler
d.state = 99;
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted, 0));
d.fi.op = d.ws.rd_opcode_;
d.fi.fin = d.ws.rd_fh_.fin &&
d.ws.rd_need_ == 0;
goto upcall;
//------------------------------------------------------------------
case do_read_fh:
d.state = do_read_fh + 1;
boost::asio::async_read(d.ws.stream_,
d.fb.prepare(2), std::move(*this));
return;
case do_read_fh + 1:
{
d.fb.commit(bytes_transferred);
code = close_code::none;
auto const n = detail::read_fh1(
d.ws.rd_fh_, d.fb, d.ws.role_, code);
if(code != close_code::none)
{
// protocol error
d.state = do_fail;
break;
}
d.state = do_read_fh + 2;
if (n == 0)
{
bytes_transferred = 0;
break;
}
// read variable header
boost::asio::async_read(d.ws.stream_,
d.fb.prepare(n), std::move(*this));
return;
}
if(d.ws.rd_need_ > 0)
{
d.state = 1;
break;
}
d.state = 2;
break;
case 1:
// read payload
d.state = 3;
d.smb = d.sb.prepare(
detail::clamp(d.ws.rd_need_));
d.ws.stream_.async_read_some(
*d.smb, std::move(*this));
return;
case 2:
// read fixed header
d.state = 5;
boost::asio::async_read(d.ws.stream_,
d.fb.prepare(2), std::move(*this));
return;
// got payload
case 3:
{
d.ws.rd_need_ -= bytes_transferred;
auto const pb = prepare_buffers(
bytes_transferred, *d.smb);
if(d.ws.rd_fh_.mask)
detail::mask_inplace(pb, d.ws.rd_key_);
if(d.ws.rd_opcode_ == opcode::text)
{
if(! d.ws.rd_utf8_check_.write(pb) ||
(d.ws.rd_need_ == 0 && d.ws.rd_fh_.fin &&
! d.ws.rd_utf8_check_.finish()))
{
// invalid utf8
d.state = 18;
code = close_code::bad_payload;
break;
}
}
d.sb.commit(bytes_transferred);
d.state = 4;
break;
}
// call handler
case 4:
d.state = 99;
d.fi.op = d.ws.rd_opcode_;
d.fi.fin = d.ws.rd_fh_.fin &&
d.ws.rd_need_ == 0;
break;
// got fixed header
case 5:
{
d.fb.commit(bytes_transferred);
code = close_code::none;
auto const n = detail::read_fh1(
d.ws.rd_fh_, d.fb, d.ws.role_, code);
if(code != close_code::none)
{
// protocol error
d.state = 18;
break;
}
d.state = 6;
if (n == 0)
{
bytes_transferred = 0;
break;
}
// read variable header
boost::asio::async_read(d.ws.stream_,
d.fb.prepare(n), std::move(*this));
return;
}
// got variable header
case 6:
d.fb.commit(bytes_transferred);
code = close_code::none;
detail::read_fh2(d.ws.rd_fh_,
d.fb, d.ws.role_, code);
if(code == close_code::none)
d.ws.prepare_fh(code);
if(code != close_code::none)
{
// protocol error
d.state = 18;
break;
}
if(detail::is_control(d.ws.rd_fh_.op))
{
if(d.ws.rd_fh_.len > 0)
{
// read control payload
d.state = 7;
d.fmb = d.fb.prepare(static_cast<
std::size_t>(d.ws.rd_fh_.len));
boost::asio::async_read(d.ws.stream_,
*d.fmb, std::move(*this));
return;
}
d.state = 8;
break;
}
if(d.ws.rd_need_ > 0)
{
d.state = 1;
break;
}
if(! d.ws.rd_fh_.fin)
{
d.state = 2;
break;
}
// empty frame with fin
d.state = 4;
break;
// got control payload
case 7:
if(d.ws.rd_fh_.mask)
detail::mask_inplace(
*d.fmb, d.ws.rd_key_);
d.fb.commit(bytes_transferred);
d.state = 8;
break;
// do control
case 8:
if(d.ws.rd_fh_.op == opcode::ping)
{
case do_read_fh + 2:
d.fb.commit(bytes_transferred);
code = close_code::none;
ping_payload_type data;
detail::read(data, d.fb.data(), code);
detail::read_fh2(d.ws.rd_fh_,
d.fb, d.ws.role_, code);
if(code == close_code::none)
d.ws.prepare_fh(code);
if(code != close_code::none)
{
// protocol error
d.state = 18;
d.state = do_fail;
break;
}
d.fb.reset();
if(d.ws.wr_close_)
if(detail::is_control(d.ws.rd_fh_.op))
{
d.state = 2;
if(d.ws.rd_fh_.len > 0)
{
// read control payload
d.state = do_control_payload;
d.fmb = d.fb.prepare(static_cast<
std::size_t>(d.ws.rd_fh_.len));
boost::asio::async_read(d.ws.stream_,
*d.fmb, std::move(*this));
return;
}
d.state = do_control;
break;
}
d.ws.template write_ping<static_streambuf>(
d.fb, opcode::pong, data);
if(d.ws.wr_block_)
if(d.ws.rd_need_ > 0)
{
assert(d.ws.wr_block_ != &d);
// suspend
d.state = 13;
d.ws.rd_op_.template emplace<
read_frame_op>(std::move(*this));
return;
d.state = do_read_payload;
break;
}
d.state = 14;
// empty frame
d.state = do_frame_done;
break;
}
else if(d.ws.rd_fh_.op == opcode::pong)
{
code = close_code::none;
ping_payload_type data;
detail::read(data, d.fb.data(), code);
if(code != close_code::none)
{
// protocol error
d.state = 18;
break;
}
d.fb.reset();
// VFALCO TODO maybe_invoke an async pong handler
// For now just ignore the pong.
d.state = 2;
//------------------------------------------------------------------
case do_control_payload:
if(d.ws.rd_fh_.mask)
detail::mask_inplace(
*d.fmb, d.ws.rd_key_);
d.fb.commit(bytes_transferred);
d.state = do_control; // VFALCO fall through?
break;
}
assert(d.ws.rd_fh_.op == opcode::close);
{
detail::read(d.ws.cr_, d.fb.data(), code);
if(code != close_code::none)
//------------------------------------------------------------------
case do_control:
if(d.ws.rd_fh_.op == opcode::ping)
{
d.state = 18;
break;
}
if(! d.ws.wr_close_)
{
auto cr = d.ws.cr_;
if(cr.code == close_code::none)
cr.code = close_code::normal;
cr.reason = "";
ping_data data;
detail::read(data, d.fb.data());
d.fb.reset();
d.ws.template write_close<
static_streambuf>(d.fb, cr);
if(d.ws.wr_close_)
{
// ignore ping when closing
d.state = do_read_fh;
break;
}
d.ws.template write_ping<static_streambuf>(
d.fb, opcode::pong, data);
if(d.ws.wr_block_)
{
// suspend
d.state = 9;
d.state = do_pong_resume;
assert(d.ws.wr_block_ != &d);
d.ws.rd_op_.template emplace<
read_frame_op>(std::move(*this));
return;
}
d.state = 11;
d.state = do_pong;
break;
}
// call handler;
d.state = 99;
ec = error::closed;
break;
}
else if(d.ws.rd_fh_.op == opcode::pong)
{
code = close_code::none;
ping_data payload;
detail::read(payload, d.fb.data());
if(d.ws.pong_cb_)
d.ws.pong_cb_(payload);
d.fb.reset();
d.state = do_read_fh;
break;
}
assert(d.ws.rd_fh_.op == opcode::close);
{
detail::read(d.ws.cr_, d.fb.data(), code);
if(code != close_code::none)
{
// protocol error
d.state = do_fail;
break;
}
if(! d.ws.wr_close_)
{
auto cr = d.ws.cr_;
if(cr.code == close_code::none)
cr.code = close_code::normal;
cr.reason = "";
d.fb.reset();
d.ws.template write_close<
static_streambuf>(d.fb, cr);
if(d.ws.wr_block_)
{
// suspend
d.state = do_close_resume;
d.ws.rd_op_.template emplace<
read_frame_op>(std::move(*this));
return;
}
d.state = do_close;
break;
}
// call handler;
ec = error::closed;
goto upcall;
}
// resume
case 9:
d.state = 10;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
//------------------------------------------------------------------
case 10:
if(d.ws.error_)
{
// call handler
d.state = 99;
ec = boost::asio::error::operation_aborted;
break;
}
if(d.ws.wr_close_)
{
// call handler
d.state = 99;
ec = error::closed;
break;
}
d.state = 11;
break;
case do_pong_resume:
d.state = do_pong_resume + 1;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
// send close
case 11:
d.state = 12;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;;
case do_pong_resume + 1:
if(d.ws.failed_)
{
// call handler
ec = boost::asio::error::operation_aborted;
goto upcall;
}
d.state = do_pong;
break; // VFALCO fall through?
// teardown
case 12:
d.state = 13;
websocket_helpers::call_async_teardown(
d.ws.next_layer(), std::move(*this));
return;
//------------------------------------------------------------------
case 13:
// call handler
d.state = 99;
ec = error::closed;
break;
case do_pong:
if(d.ws.wr_close_)
{
// ignore ping when closing
d.fb.reset();
d.state = do_read_fh;
break;
}
// send pong
d.state = do_pong + 1;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;
// resume
case 14:
d.state = 15;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case 15:
if(d.ws.error_)
{
// call handler
d.state = 99;
ec = boost::asio::error::operation_aborted;
break;
}
if(d.ws.wr_close_)
{
case do_pong + 1:
d.fb.reset();
d.state = 2;
d.state = do_read_fh;
d.ws.wr_block_ = nullptr;
break;
}
d.state = 16;
break;
case 16:
// write ping/pong
d.state = 17;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;
//------------------------------------------------------------------
// sent ping/pong
case 17:
d.fb.reset();
d.state = 2;
d.ws.wr_block_ = nullptr;
break;
case do_close_resume:
d.state = do_close_resume + 1;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
// fail the connection
case 18:
if(! d.ws.wr_close_)
{
case do_close_resume + 1:
if(d.ws.failed_)
{
// call handler
d.state = do_call_handler;
ec = boost::asio::error::operation_aborted;
break;
}
if(d.ws.wr_close_)
{
// call handler
ec = error::closed;
goto upcall;
}
d.state = do_close;
break;
//------------------------------------------------------------------
case do_close:
d.state = do_close + 1;
d.ws.wr_close_ = true;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;
case do_close + 1:
d.state = do_close + 2;
websocket_helpers::call_async_teardown(
d.ws.next_layer(), std::move(*this));
return;
case do_close + 2:
// call handler
ec = error::closed;
goto upcall;
//------------------------------------------------------------------
case do_fail:
if(d.ws.wr_close_)
{
d.state = do_fail + 4;
break;
}
d.fb.reset();
d.ws.template write_close<
static_streambuf>(d.fb, code);
if(d.ws.wr_block_)
{
// suspend
d.state = 19;
d.state = do_fail + 2;
d.ws.rd_op_.template emplace<
read_frame_op>(std::move(*this));
return;
}
d.state = 21;
// fall through
case do_fail + 1:
d.ws.failed_ = true;
// send close frame
d.state = do_fail + 4;
d.ws.wr_close_ = true;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;
case do_fail + 2:
d.state = do_fail + 3;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case do_fail + 3:
if(d.ws.failed_)
{
d.state = do_fail + 5;
break;
}
d.state = do_fail + 1;
break;
case do_fail + 4:
d.state = do_fail + 5;
websocket_helpers::call_async_teardown(
d.ws.next_layer(), std::move(*this));
return;
case do_fail + 5:
// call handler
ec = error::failed;
goto upcall;
//------------------------------------------------------------------
case do_call_handler:
goto upcall;
}
d.state = 22;
break;
// resume
case 19:
d.state = 20;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case 20:
if(d.ws.wr_close_)
{
d.state = 22;
break;
}
d.state = 21;
break;
case 21:
// send close
d.state = 22;
d.ws.wr_close_ = true;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
return;
// teardown
case 22:
d.state = 23;
websocket_helpers::call_async_teardown(
d.ws.next_layer(), std::move(*this));
return;
case 23:
// call handler
d.state = 99;
ec = error::failed;
break;
}
while(! ec);
}
if(ec)
d.ws.error_ = true;
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.wr_op_.maybe_invoke();

View File

@@ -105,24 +105,34 @@ operator()(error_code const& ec, bool again)
{
auto& d = *d_;
d.cont = d.cont || again;
while(! ec && d.state != 99)
while(! ec)
{
switch(d.state)
{
case 0:
// read payload
d.state = 1;
#if 0
// VFALCO This causes dereference of null, because
// the handler is moved from the data block
// before asio_handler_deallocate is called.
d.ws.async_read_frame(
d.fi, d.sb, std::move(*this));
#else
d.ws.async_read_frame(d.fi, d.sb, *this);
#endif
return;
// got payload
case 1:
d.op = d.fi.op;
d.state = d.fi.fin ? 99 : 0;
if(d.fi.fin)
goto upcall;
d.state = 0;
break;
}
}
upcall:
d.h(ec);
}

View File

@@ -44,6 +44,9 @@ class stream<NextLayer>::response_op
, h(std::forward<DeducedHandler>(h_))
, cont(cont_)
{
// can't call stream::reset() here
// otherwise accept_op will malfunction
//
if(resp.status != 101)
final_ec = error::handshake_failed;
}
@@ -123,7 +126,7 @@ operator()(error_code ec, bool again)
d.state = 99;
ec = d.final_ec;
if(! ec)
d.ws.role_ = role_type::server;
d.ws.open(detail::role_type::server);
break;
}
}

View File

@@ -13,6 +13,7 @@
#include <beast/websocket/impl/accept_op.ipp>
#include <beast/websocket/impl/close_op.ipp>
#include <beast/websocket/impl/handshake_op.ipp>
#include <beast/websocket/impl/ping_op.ipp>
#include <beast/websocket/impl/read_op.ipp>
#include <beast/websocket/impl/read_frame_op.ipp>
#include <beast/websocket/impl/response_op.ipp>
@@ -40,6 +41,13 @@ namespace websocket {
namespace detail {
template<class _>
void
stream_base::open(role_type role)
{
role_ = role;
}
template<class _>
void
stream_base::prepare_fh(close_code::value& code)
@@ -76,7 +84,7 @@ stream_base::prepare_fh(close_code::value& code)
}
rd_size_ += rd_fh_.len;
}
if(rd_size_ > rd_msg_max_)
if(rd_msg_max_ && rd_size_ > rd_msg_max_)
{
code = close_code::too_big;
return;
@@ -100,7 +108,7 @@ stream_base::write_close(
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
fh.mask = role_ == role_type::client;
fh.mask = role_ == detail::role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(sb, fh);
@@ -136,7 +144,7 @@ stream_base::write_close(
template<class Streambuf>
void
stream_base::write_ping(Streambuf& sb,
opcode op, ping_payload_type const& data)
opcode op, ping_data const& data)
{
frame_header fh;
fh.op = op;
@@ -184,7 +192,8 @@ accept()
"SyncStream requirements not met");
error_code ec;
accept(boost::asio::null_buffers{}, ec);
detail::maybe_throw(ec, "accept");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -223,7 +232,8 @@ accept(ConstBufferSequence const& buffers)
"ConstBufferSequence requirements not met");
error_code ec;
accept(buffers, ec);
detail::maybe_throw(ec, "accept");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -239,6 +249,7 @@ accept(ConstBufferSequence const& buffers, error_code& ec)
"ConstBufferSequence requirements not met");
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
reset();
stream_.buffer().commit(buffer_copy(
stream_.buffer().prepare(
buffer_size(buffers)), buffers));
@@ -279,7 +290,8 @@ accept(http::request_v1<Body, Headers> const& request)
"SyncStream requirements not met");
error_code ec;
accept(request, ec);
detail::maybe_throw(ec, "accept");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -291,8 +303,11 @@ accept(http::request_v1<Body, Headers> const& req,
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
reset();
auto const res = build_response(req);
http::write(stream_, res, ec);
if(ec)
return;
if(res.status != 101)
{
ec = error::handshake_failed;
@@ -300,7 +315,7 @@ accept(http::request_v1<Body, Headers> const& req,
// teardown if Connection: close.
return;
}
role_ = role_type::server;
open(detail::role_type::server);
}
template<class NextLayer>
@@ -316,6 +331,7 @@ async_accept(http::request_v1<Body, Headers> const& req,
beast::async_completion<
AcceptHandler, void(error_code)
> completion(handler);
reset();
response_op<decltype(completion.handler)>{
completion.handler, *this, req,
boost_asio_handler_cont_helpers::
@@ -333,7 +349,8 @@ handshake(boost::string_ref const& host,
"SyncStream requirements not met");
error_code ec;
handshake(host, resource, ec);
detail::maybe_throw(ec, "upgrade");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -344,6 +361,7 @@ handshake(boost::string_ref const& host,
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
reset();
std::string key;
http::write(stream_,
build_request(host, resource, key), ec);
@@ -383,7 +401,8 @@ close(close_reason const& cr)
"SyncStream requirements not met");
error_code ec;
close(cr, ec);
detail::maybe_throw(ec, "close");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -398,7 +417,7 @@ close(close_reason const& cr, error_code& ec)
detail::frame_streambuf fb;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
failed_ = ec != 0;
}
template<class NextLayer>
@@ -418,6 +437,45 @@ async_close(close_reason const& cr, CloseHandler&& handler)
return completion.result.get();
}
template<class NextLayer>
void
stream<NextLayer>::
ping(ping_data const& payload)
{
error_code ec;
ping(payload, ec);
if(ec)
throw system_error{ec};
}
template<class NextLayer>
void
stream<NextLayer>::
ping(ping_data const& payload, error_code& ec)
{
detail::frame_streambuf sb;
write_ping<static_streambuf>(
sb, opcode::ping, payload);
boost::asio::write(stream_, sb.data(), ec);
}
template<class NextLayer>
template<class PingHandler>
typename async_completion<
PingHandler, void(error_code)>::result_type
stream<NextLayer>::
async_ping(ping_data const& payload, PingHandler&& handler)
{
static_assert(is_AsyncStream<next_layer_type>::value,
"AsyncStream requirements requirements not met");
beast::async_completion<
PingHandler, void(error_code)
> completion(handler);
ping_op<decltype(completion.handler)>{
completion.handler, *this, payload};
return completion.result.get();
}
template<class NextLayer>
template<class Streambuf>
void
@@ -428,7 +486,8 @@ read(opcode& op, Streambuf& streambuf)
"SyncStream requirements not met");
error_code ec;
read(op, streambuf, ec);
detail::maybe_throw(ec, "read");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -481,7 +540,8 @@ read_frame(frame_info& fi, Streambuf& streambuf)
"SyncStream requirements not met");
error_code ec;
read_frame(fi, streambuf, ec);
detail::maybe_throw(ec, "read_some");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -500,8 +560,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
// read header
detail::frame_streambuf fb;
do_read_fh(fb, code, ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
if(code != close_code::none)
break;
@@ -513,8 +573,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
auto const mb = fb.prepare(
static_cast<std::size_t>(rd_fh_.len));
fb.commit(boost::asio::read(stream_, mb, ec));
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
if(rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
@@ -522,27 +582,23 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
}
if(rd_fh_.op == opcode::ping)
{
ping_payload_type data;
detail::read(data, fb.data(), code);
if(code != close_code::none)
break;
ping_data data;
detail::read(data, fb.data());
fb.reset();
write_ping<static_streambuf>(
fb, opcode::pong, data);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
continue;
}
else if(rd_fh_.op == opcode::pong)
{
ping_payload_type data;
detail::read(data, fb.data(), code);
if(code != close_code::none)
break;
// VFALCO How to notify callers using
// the synchronous interface?
ping_data payload;
detail::read(payload, fb.data());
if(pong_cb_)
pong_cb_(payload);
continue;
}
assert(rd_fh_.op == opcode::close);
@@ -560,8 +616,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
wr_close_ = true;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
}
break;
@@ -578,8 +634,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
detail::clamp(rd_need_));
auto const bytes_transferred =
stream_.read_some(smb, ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
rd_need_ -= bytes_transferred;
auto const pb = prepare_buffers(
@@ -610,23 +666,23 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
detail::frame_streambuf fb;
write_close<static_streambuf>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
}
websocket_helpers::call_teardown(next_layer(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
ec = error::failed;
error_ = true;
failed_ = true;
return;
}
if(! ec)
websocket_helpers::call_teardown(next_layer(), ec);
if(! ec)
ec = error::closed;
error_ = ec != 0;
failed_ = ec != 0;
}
template<class NextLayer>
@@ -658,7 +714,8 @@ write(ConstBufferSequence const& buffers)
"SyncStream requirements not met");
error_code ec;
write(buffers, ec);
detail::maybe_throw(ec, "write");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -719,7 +776,8 @@ write_frame(bool fin, ConstBufferSequence const& buffers)
"SyncStream requirements not met");
error_code ec;
write_frame(fin, buffers, ec);
detail::maybe_throw(ec, "write");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -744,7 +802,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = buffer_size(bs);
fh.mask = role_ == role_type::client;
fh.mask = role_ == detail::role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::fh_streambuf fh_buf;
@@ -754,22 +812,21 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
// send header and payload
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), bs), ec);
error_ = ec != 0;
failed_ = ec != 0;
return;
}
detail::prepared_key_type key;
detail::prepare_key(key, fh.key);
auto const tmp_size = detail::clamp(
fh.len, wr_buf_size_);
auto const tmp_size =
detail::clamp(fh.len, mask_buf_size_);
std::unique_ptr<std::uint8_t[]> up(
new std::uint8_t[tmp_size]);
auto const tmp = up.get();
std::uint64_t remain = fh.len;
consuming_buffers<ConstBufferSequence> cb(bs);
{
auto const n =
detail::clamp(remain, tmp_size);
mutable_buffers_1 mb{tmp, n};
mutable_buffers_1 mb{up.get(), n};
buffer_copy(mb, cb);
cb.consume(n);
remain -= n;
@@ -779,7 +836,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
buffer_cat(fh_buf.data(), mb), ec);
if(ec)
{
error_ = ec != 0;
failed_ = ec != 0;
return;
}
}
@@ -787,7 +844,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
{
auto const n =
detail::clamp(remain, tmp_size);
mutable_buffers_1 mb{tmp, n};
mutable_buffers_1 mb{up.get(), n};
buffer_copy(mb, cb);
cb.consume(n);
remain -= n;
@@ -796,7 +853,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
boost::asio::write(stream_, mb, ec);
if(ec)
{
error_ = ec != 0;
failed_ = ec != 0;
return;
}
}
@@ -826,6 +883,23 @@ async_write_frame(bool fin,
//------------------------------------------------------------------------------
template<class NextLayer>
void
stream<NextLayer>::
reset()
{
failed_ = false;
rd_need_ = 0;
rd_cont_ = false;
wr_close_ = false;
wr_cont_ = false;
wr_block_ = nullptr; // should be nullptr on close anyway
pong_data_ = nullptr; // should be nullptr on close anyway
stream_.buffer().consume(
stream_.buffer().size());
}
template<class NextLayer>
http::request_v1<http::empty_body>
stream<NextLayer>::
@@ -860,8 +934,11 @@ build_response(http::request_v1<Body, Headers> const& req)
res.reason = http::reason_string(res.status);
res.version = req.version;
res.body = text;
// VFALCO TODO respect keep-alive here
prepare(res);
(*d_)(res);
prepare(res,
(is_keep_alive(req) && keep_alive_) ?
http::connection::keep_alive :
http::connection::close);
return res;
};
if(req.version < 11)
@@ -874,17 +951,28 @@ build_response(http::request_v1<Body, Headers> const& req)
return err("Missing Host");
if(! req.headers.exists("Sec-WebSocket-Key"))
return err("Missing Sec-WebSocket-Key");
if(! rfc2616::token_in_list(
req.headers["Upgrade"], "websocket"))
return err("Missing websocket Upgrade token");
{
auto const version =
req.headers["Sec-WebSocket-Version"];
if(version.empty())
return err("Missing Sec-WebSocket-Version");
if(version != "13")
return err("Unsupported Sec-WebSocket-Version");
{
http::response_v1<http::string_body> res;
res.status = 426;
res.reason = http::reason_string(res.status);
res.version = req.version;
res.headers.insert("Sec-WebSocket-Version", "13");
prepare(res,
(is_keep_alive(req) && keep_alive_) ?
http::connection::keep_alive :
http::connection::close);
return res;
}
}
if(! rfc2616::token_in_list(
req.headers["Upgrade"], "websocket"))
return err("Missing websocket Upgrade token");
http::response_v1<http::string_body> res;
res.status = 101;
res.reason = http::reason_string(res.status);
@@ -893,7 +981,6 @@ build_response(http::request_v1<Body, Headers> const& req)
{
auto const key =
req.headers["Sec-WebSocket-Key"];
res.headers.insert("Sec-WebSocket-Key", key);
res.headers.insert("Sec-WebSocket-Accept",
detail::make_sec_ws_accept(key));
}
@@ -912,6 +999,8 @@ do_response(http::response_v1<Body, Headers> const& res,
{
// VFALCO Review these error codes
auto fail = [&]{ ec = error::response_failed; };
if(res.version < 11)
return fail();
if(res.status != 101)
return fail();
if(! is_upgrade(res))
@@ -924,7 +1013,7 @@ do_response(http::response_v1<Body, Headers> const& res,
if(res.headers["Sec-WebSocket-Accept"] !=
detail::make_sec_ws_accept(key))
return fail();
role_ = role_type::client;
open(detail::role_type::client);
}
template<class NextLayer>

View File

@@ -61,13 +61,13 @@ class stream<NextLayer>::write_frame_op
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = boost::asio::buffer_size(cb);
fh.mask = ws.role_ == role_type::client;
fh.mask = ws.role_ == detail::role_type::client;
if(fh.mask)
{
fh.key = ws.maskgen_();
detail::prepare_key(key, fh.key);
tmp_size = detail::clamp(
fh.len, ws.wr_buf_size_);
fh.len, ws.mask_buf_size_);
tmp = boost_asio_handler_alloc_helpers::
allocate(tmp_size, h);
remain = fh.len;
@@ -100,16 +100,17 @@ public:
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
{
(*this)(error_code{}, 0, false);
(*this)(error_code{}, false);
}
void operator()()
{
(*this)(error_code{}, 0, true);
(*this)(error_code{});
}
void operator()(error_code ec,
std::size_t bytes_transferred, bool again = true);
void operator()(error_code ec, std::size_t);
void operator()(error_code ec, bool again = true);
friend
void* asio_handler_allocate(
@@ -142,19 +143,33 @@ public:
}
};
template<class NextLayer>
template<class Buffers, class Handler>
void
stream<NextLayer>::
write_frame_op<Buffers, Handler>::
operator()(error_code ec, std::size_t)
{
auto& d = *d_;
if(ec)
d.ws.failed_ = true;
(*this)(ec);
}
template<class NextLayer>
template<class Buffers, class Handler>
void
stream<NextLayer>::
write_frame_op<Buffers, Handler>::
operator()(
error_code ec, std::size_t bytes_transferred, bool again)
operator()(error_code ec, bool again)
{
using boost::asio::buffer_copy;
using boost::asio::mutable_buffers_1;
auto& d = *d_;
d.cont = d.cont || again;
while(! ec && d.state != 99)
if(ec)
goto upcall;
for(;;)
{
switch(d.state)
{
@@ -162,47 +177,27 @@ operator()(
if(d.ws.wr_block_)
{
// suspend
d.state = 1;
d.state = 3;
d.ws.wr_op_.template emplace<
write_frame_op>(std::move(*this));
return;
}
if(d.ws.error_)
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
d.state = 99;
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted, 0));
boost::asio::error::operation_aborted));
return;
}
assert(! d.ws.wr_close_);
d.state = 3;
break;
// fall through
// resume
case 1:
d.state = 2;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec, bytes_transferred));
return;
case 2:
if(d.ws.error_)
{
// call handler
d.state = 99;
ec = boost::asio::error::operation_aborted;
break;
}
d.state = 3;
break;
case 3:
{
if(! d.fh.mask)
{
// send header and payload
// send header and entire payload
d.state = 99;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
@@ -219,7 +214,7 @@ operator()(
d.remain -= n;
detail::mask_inplace(mb, d.key);
// send header and payload
d.state = d.remain > 0 ? 4 : 99;
d.state = d.remain > 0 ? 2 : 99;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
@@ -229,7 +224,7 @@ operator()(
}
// sent masked payload
case 4:
case 2:
{
auto const n =
detail::clamp(d.remain, d.tmp_size);
@@ -242,24 +237,41 @@ operator()(
// send payload
if(d.remain == 0)
d.state = 99;
assert(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
assert(d.ws.wr_block_ == &d);
boost::asio::async_write(
d.ws.stream_, mb, std::move(*this));
return;
}
case 3:
d.state = 4;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec));
return;
case 4:
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
ec = boost::asio::error::operation_aborted;
goto upcall;
}
d.state = 1;
break;
case 99:
goto upcall;
}
}
if(ec)
d.ws.error_ = true;
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
upcall:
if(d.tmp)
{
boost_asio_handler_alloc_helpers::
deallocate(d.tmp, d.tmp_size, d.h);
d.tmp = nullptr;
}
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.rd_op_.maybe_invoke();
d.h(ec);
}

View File

@@ -109,7 +109,7 @@ operator()(error_code ec, bool again)
{
auto& d = *d_;
d.cont = d.cont || again;
while(! ec && d.state != 99)
if(! ec)
{
switch(d.state)
{
@@ -126,6 +126,9 @@ operator()(error_code ec, bool again)
d.ws.async_write_frame(fin, pb, std::move(*this));
return;
}
case 99:
break;
}
}
d.h(ec);

View File

@@ -8,11 +8,13 @@
#ifndef BEAST_WEBSOCKET_OPTION_HPP
#define BEAST_WEBSOCKET_OPTION_HPP
#include <beast/websocket/rfc6455.hpp>
#include <beast/websocket/detail/stream_base.hpp>
#include <algorithm>
#include <cstdint>
#include <stdexcept>
#include <type_traits>
#include <utility>
namespace beast {
namespace websocket {
@@ -147,6 +149,45 @@ struct keep_alive
};
#endif
/** Mask buffer size option.
Sets the size of the buffer allocated when the implementation
must allocate memory to apply the mask to a payload. Only affects
streams operating in the client role, since only clients send
masked frames. Lowering the size of the buffer can decrease the
memory requirements for each connection, while increasing the size
of the buffer can reduce the number of calls made to the next
layer to write masked data.
The default setting is 4096. The minimum value is 1.
@note Objects of this type are passed to @ref stream::set_option.
@par Example
Setting the write buffer size.
@code
...
websocket::stream<ip::tcp::socket> ws(ios);
ws.set_option(mask_buffer_size{8192});
@endcode
*/
#if GENERATING_DOCS
using mask_buffer_size = implementation_defined;
#else
struct mask_buffer_size
{
std::size_t value;
explicit
mask_buffer_size(std::size_t n)
: value(n)
{
if(n == 0)
throw std::domain_error("invalid mask buffer size");
}
};
#endif
/** Message type option.
This controls the opcode set for outgoing messages. Valid
@@ -184,6 +225,50 @@ struct message_type
};
#endif
/** Pong callback option.
Sets the callback to be invoked whenever a pong is received
during a call to @ref read, @ref read_frame, @ref async_read,
or @ref async_read_frame.
Unlike completion handlers, the callback will be invoked for
each received pong during a call to any synchronous or
asynchronous read function. The operation is passive, with
no associated error code, and triggered by reads.
The signature of the callback must be:
@code
void callback(
ping_data const& payload // Payload of the pong frame
);
@endcode
If the read operation receiving a pong frame is an asynchronous
operation, the callback will be invoked using the same method as
that used to invoke the final handler.
@note To remove the pong callback, construct the option with
no parameters: `set_option(pong_callback{})`
*/
#if GENERATING_DOCS
using pong_callback = implementation_defined;
#else
struct pong_callback
{
detail::pong_cb value;
pong_callback() = default;
pong_callback(pong_callback&&) = default;
pong_callback(pong_callback const&) = default;
explicit
pong_callback(detail::pong_cb f)
: value(std::move(f))
{
}
};
#endif
/** Read buffer size option.
Sets the number of bytes allocated to the socket's read buffer.
@@ -224,7 +309,8 @@ struct read_buffer_size
frame headers indicating a size that would bring the total
message size over this limit will cause a protocol failure.
The default setting is 16 megabytes.
The default setting is 16 megabytes. A value of zero indicates
a limit of `std::numeric_limits<std::uint64_t>::max()`.
@note Objects of this type are passed to @ref stream::set_option.
@@ -251,44 +337,6 @@ struct read_message_max
};
#endif
/** Write buffer size option.
Sets the number of bytes allocated to the socket's write buffer.
This buffer is used to hold masked frame payload data. Lowering
the size of the buffer can decrease the memory requirements for
each connection, at the cost of an increased number of calls to
perform socket writes.
This setting does not affect connections operating in the server
role, since servers do not apply a masking key to frame payloads.
The default setting is 4096. The minimum value is 1024.
@note Objects of this type are passed to @ref stream::set_option.
@par Example
Setting the write buffer size.
@code
...
websocket::stream<ip::tcp::socket> ws(ios);
ws.set_option(write_buffer_size{8192});
@endcode
*/
#if GENERATING_DOCS
using write_buffer_size = implementation_defined;
#else
struct write_buffer_size
{
std::size_t value;
explicit
write_buffer_size(std::size_t n)
: value(n)
{
}
};
#endif
} // websocket
} // beast

View File

@@ -60,6 +60,13 @@ enum
protocol_error = 1002,
unknown_data = 1003,
/// Indicates a received close frame has no close code
//no_code = 1005, // TODO
/// Indicates the connection was closed without receiving a close frame
no_close = 1006,
bad_payload = 1007,
policy_error = 1008,
too_big = 1009,
@@ -80,12 +87,11 @@ enum
} // close_code
#endif
#if ! GENERATING_DOCS
using reason_string_type =
static_string<123, char>;
using ping_payload_type =
static_string<125, char>;
#endif
/// The type representing the reason string in a close frame.
using reason_string = static_string<123, char>;
/// The type representing the payload of ping and pong messages.
using ping_data = static_string<125, char>;
/** Description of the close reason.
@@ -98,7 +104,7 @@ struct close_reason
close_code::value code = close_code::none;
/// The optional utf8-encoded reason string.
reason_string_type reason;
reason_string reason;
/** Default constructor.
@@ -114,17 +120,17 @@ struct close_reason
}
/// Construct from a reason. code is close_code::normal.
template<class CharT>
close_reason(CharT const* reason_)
template<std::size_t N>
close_reason(char const (&reason_)[N])
: code(close_code::normal)
, reason(reason_)
{
}
/// Construct from a code and reason.
template<class CharT>
template<std::size_t N>
close_reason(close_code::value code_,
CharT const* reason_)
char const (&reason_)[N])
: code(code_)
, reason(reason_)
{
@@ -137,20 +143,6 @@ struct close_reason
}
};
#if ! GENERATING_DOCS
/// Identifies the role of a WebSockets stream.
enum class role_type
{
/// Stream is operating as a client.
client,
/// Stream is operating as a server.
server
};
#endif
} // websocket
} // beast

View File

@@ -69,8 +69,8 @@ struct frame_info
@tparam NextLayer The type representing the next layer, to which
data will be read and written during operations. For synchronous
operations, the type must support the @b `SyncStream` concept.
For asynchronous operations, the type must support the @b `AsyncStream`
concept.
For asynchronous operations, the type must support the
@b `AsyncStream` concept.
@note A stream object must not be destroyed while there are
pending asynchronous operations associated with it.
@@ -84,7 +84,7 @@ struct frame_info
template<class NextLayer>
class stream : public detail::stream_base
{
friend class ws_test;
friend class stream_test;
streambuf_readstream<NextLayer, streambuf> stream_;
@@ -124,12 +124,12 @@ public:
/** Construct a WebSocket stream.
This constructor creates a websocket stream and initialises
This constructor creates a websocket stream and initializes
the next layer object.
@throws Any exceptions thrown by the NextLayer constructor.
@param args The arguments to be passed to initialise the
@param args The arguments to be passed to initialize the
next layer object. The arguments are forwarded to the next
layer's constructor.
*/
@@ -198,6 +198,13 @@ public:
wr_opcode_ = o.value;
}
/// Set the pong callback
void
set_option(pong_callback o)
{
pong_cb_ = std::move(o.value);
}
/// Set the read buffer size
void
set_option(read_buffer_size const& o)
@@ -212,11 +219,11 @@ public:
rd_msg_max_ = o.value;
}
/// Set the size of the write buffer
/// Set the size of the mask buffer
void
set_option(write_buffer_size const& o)
set_option(mask_buffer_size const& o)
{
wr_buf_size_ = std::max<std::size_t>(o.value, 1024);
mask_buf_size_ = o.value;
stream_.capacity(o.value);
}
@@ -788,7 +795,7 @@ public:
/** Send a WebSocket close frame.
This function is used to sycnhronously send a close frame on
This function is used to synchronously send a close frame on
the stream. The call blocks until one of the following is true:
@li The close frame finishes sending.
@@ -817,7 +824,7 @@ public:
/** Send a WebSocket close frame.
This function is used to sycnhronously send a close frame on
This function is used to synchronously send a close frame on
the stream. The call blocks until one of the following is true:
@li The close frame finishes sending.
@@ -844,7 +851,7 @@ public:
void
close(close_reason const& cr, error_code& ec);
/** Start an asycnhronous operation to send a WebSocket close frame.
/** Start an asynchronous operation to send a WebSocket close frame.
This function is used to asynchronously send a close frame on
the stream. This function call always returns immediately. The
@@ -858,7 +865,7 @@ public:
This operation is implemented in terms of one or more calls to the
next layer's `async_write_some` functions, and is known as a
<em>composed operation</em>. The program must ensure that the
stream performs no other write operations (such as
stream performs no other write operations (such as @ref async_ping,
@ref stream::async_write, @ref stream::async_write_frame, or
@ref stream::async_close) until this operation completes.
@@ -896,6 +903,84 @@ public:
#endif
async_close(close_reason const& cr, CloseHandler&& handler);
/** Send a WebSocket ping frame.
This function is used to synchronously send a ping frame on
the stream. The call blocks until one of the following is true:
@li The ping frame finishes sending.
@li An error occurs on the stream.
This function is implemented in terms of one or more calls to the
next layer's `write_some` functions.
@param payload The payload of the ping message, which may be empty.
@throws boost::system::system_error Thrown on failure.
*/
void
ping(ping_data const& payload);
/** Send a WebSocket ping frame.
This function is used to synchronously send a ping frame on
the stream. The call blocks until one of the following is true:
@li The ping frame finishes sending.
@li An error occurs on the stream.
This function is implemented in terms of one or more calls to the
next layer's `write_some` functions.
@param payload The payload of the ping message, which may be empty.
@param ec Set to indicate what error occurred, if any.
*/
void
ping(ping_data const& payload, error_code& ec);
/** Start an asynchronous operation to send a WebSocket ping frame.
This function is used to asynchronously send a ping frame to
the stream. The function call always returns immediately. The
asynchronous operation will continue until one of the following
is true:
@li The entire ping frame is sent.
@li An error occurs on the stream.
This operation is implemented in terms of one or more calls to the
next layer's `async_write_some` functions, and is known as a
<em>composed operation</em>. The program must ensure that the
stream performs no other writes until this operation completes.
@param payload The payload of the ping message, which may be empty.
@param handler The handler to be called when the read operation
completes. Copies will be made of the handler as required. The
function signature of the handler must be:
@code
void handler(
error_code const& error // Result of operation
);
@endcode
Regardless of whether the asynchronous operation completes
immediately or not, the handler will not be invoked from within
this function. Invocation of the handler will be performed in a
manner equivalent to using `boost::asio::io_service::post`.
*/
template<class PingHandler>
#if GENERATING_DOCS
void_or_deduced
#else
typename async_completion<
PingHandler, void(error_code)>::result_type
#endif
async_ping(ping_data const& payload, PingHandler&& handler);
/** Read a message from the stream.
This function is used to synchronously read a message from
@@ -969,7 +1054,7 @@ public:
/** Start an asynchronous operation to read a message from the stream.
This function is used to asychronously read a message from
This function is used to asynchronously read a message from
the stream. The function call always returns immediately. The
asynchronous operation will continue until one of the following
is true:
@@ -981,7 +1066,7 @@ public:
This operation is implemented in terms of one or more calls to the
next layer's `async_read_some` and `async_write_some` functions,
and is known as a <em>composed operation</em>. The program must
ensure that the stream performs no other until this operation
ensure that the stream performs no other reads until this operation
completes.
Upon a success, op is set to either binary or text depending on
@@ -989,15 +1074,14 @@ public:
hold all the message payload bytes (which may be zero in length).
Control frames encountered while reading frame or message data
are handled automatically. Pings are replied to, pongs are noted,
and close frames initiate the WebSocket close procedure. When a
close frame is received, this call will eventually return
@ref error::closed. Because of the need to handle control frames,
read operations can cause writes to take place. These writes are
managed transparently; callers can still have one active
asynchronous read and asynchronous write operation pending
simultaneously (a user initiated call to @ref async_close
counts as a write).
are handled automatically. Pings are replied to, pongs cause
an outstanding call to `async_ping` to complete, and close
frames initiate the WebSocket close procedure. When a close
frame is received, this call will eventually return
@ref error::closed. Because of the need to handle control
frames, these read operations can cause writes to take place.
Despite this, calls to `async_read` and `async_read_frame`
only count as read operations.
@param op A value to receive the message type.
This object must remain valid until the handler is called.
@@ -1104,7 +1188,7 @@ public:
/** Start an asynchronous operation to read a message frame from the stream.
This function is used to asychronously read a single message
This function is used to asynchronously read a single message
frame from the websocket. The function call always returns
immediately. The asynchronous operation will continue until
one of the following conditions is true:
@@ -1116,7 +1200,7 @@ public:
This operation is implemented in terms of one or more calls to the
next layer's `async_read_some` and `async_write_some` functions,
and is known as a <em>composed operation</em>. The program must
ensure that the stream performs no other until this operation
ensure that the stream performs no other reads until this operation
completes.
Upon a successful completion, fi is filled out to reflect the
@@ -1242,7 +1326,7 @@ public:
/** Start an asynchronous operation to write a message to the stream.
This function is used to asychronously write a message to
This function is used to asynchronously write a message to
the stream. The function call always returns immediately.
The asynchronous operation will continue until one of the
following conditions is true:
@@ -1407,11 +1491,15 @@ private:
template<class Handler> class accept_op;
template<class Handler> class close_op;
template<class Handler> class handshake_op;
template<class Handler> class ping_op;
template<class Handler> class response_op;
template<class Streambuf, class Handler> class read_op;
template<class Streambuf, class Handler> class read_frame_op;
template<class Buffers, class Handler> class write_op;
template<class Buffers, class Handler> class write_frame_op;
template<class Streambuf, class Handler> class read_op;
template<class Streambuf, class Handler> class read_frame_op;
void
reset();
http::request_v1<http::empty_body>
build_request(boost::string_ref const& host,

View File

@@ -35,6 +35,7 @@ unit-test core-tests :
core/write_streambuf.cpp
core/detail/base64.cpp
core/detail/empty_base_optimization.cpp
core/detail/get_lowest_layer.cpp
core/detail/sha1.cpp
;
@@ -77,6 +78,7 @@ unit-test websocket-tests :
websocket/teardown.cpp
websocket/detail/frame.cpp
websocket/detail/mask.cpp
websocket/detail/stream_base.cpp
websocket/detail/utf8_checker.cpp
;

View File

@@ -29,6 +29,7 @@ add_executable (core-tests
write_streambuf.cpp
detail/base64.cpp
detail/empty_base_optimization.cpp
detail/get_lowest_layer.cpp
detail/sha1.cpp
)

View File

@@ -0,0 +1,88 @@
//
// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Test that header file is self-contained.
#include <beast/core/detail/get_lowest_layer.hpp>
#include <beast/unit_test/suite.hpp>
#include <type_traits>
namespace beast {
namespace detail {
class get_lowest_layer_test
: public beast::unit_test::suite
{
public:
struct F1
{
};
struct F2
{
};
template<class F>
struct F3
{
using next_layer_type =
typename std::remove_reference<F>::type;
using lowest_layer_type = typename
get_lowest_layer<next_layer_type>::type;
};
template<class F>
struct F4
{
using next_layer_type =
typename std::remove_reference<F>::type;
using lowest_layer_type = typename
get_lowest_layer<next_layer_type>::type;
};
void
run()
{
static_assert(! has_lowest_layer<F1>::value, "");
static_assert(! has_lowest_layer<F2>::value, "");
static_assert(has_lowest_layer<F3<F1>>::value, "");
static_assert(has_lowest_layer<F4<F3<F2>>>::value, "");
static_assert(std::is_same<
get_lowest_layer<F1>::type, F1>::value, "");
static_assert(std::is_same<
get_lowest_layer<F2>::type, F2>::value, "");
static_assert(std::is_same<
get_lowest_layer<F3<F1>>::type, F1>::value, "");
static_assert(std::is_same<
get_lowest_layer<F3<F2>>::type, F2>::value, "");
static_assert(std::is_same<
get_lowest_layer<F4<F1>>::type, F1>::value, "");
static_assert(std::is_same<
get_lowest_layer<F4<F2>>::type, F2>::value, "");
static_assert(std::is_same<
get_lowest_layer<F4<F3<F1>>>::type, F1>::value, "");
static_assert(std::is_same<
get_lowest_layer<F4<F3<F2>>>::type, F2>::value, "");
pass();
}
};
BEAST_DEFINE_TESTSUITE(get_lowest_layer,core,beast);
} // detail
} // beast

View File

@@ -89,6 +89,7 @@ public:
void testIterator()
{
using boost::asio::buffer_size;
using boost::asio::const_buffer;
char b[3];
std::array<const_buffer, 3> bs{{
@@ -98,7 +99,12 @@ public:
auto pb = prepare_buffers(2, bs);
std::size_t n = 0;
for(auto it = pb.end(); it != pb.begin(); --it)
{
decltype(pb)::const_iterator it2(std::move(it));
expect(buffer_size(*it2) == 1);
it = std::move(it2);
++n;
}
expect(n == 2);
}

View File

@@ -59,6 +59,7 @@ public:
}
catch(std::exception const&)
{
pass();
}
m.headers.erase("Content-Length");
m.headers.insert("Connection", "keep-alive");
@@ -69,7 +70,12 @@ public:
}
catch(std::exception const&)
{
pass();
}
m.version = 11;
m.headers.erase("Connection");
m.headers.insert("Connection", "close");
expect(! is_keep_alive(m));
}
void run() override

View File

@@ -16,6 +16,7 @@ add_executable (websocket-tests
teardown.cpp
detail/frame.cpp
detail/mask.cpp
detail/stream_base.cpp
detail/utf8_checker.cpp
)

View File

@@ -225,7 +225,6 @@ public:
testCloseCodes();
testFrameHeader();
testBadFrameHeaders();
pass();
}
};

View File

@@ -5,7 +5,9 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Test that header file is self-contained.
#include <beast/websocket/detail/mask.hpp>
#include <beast/unit_test/suite.hpp>
namespace beast {

View File

@@ -0,0 +1,40 @@
//
// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Test that header file is self-contained.
#include <beast/websocket/detail/stream_base.hpp>
#include <beast/unit_test/suite.hpp>
#include <initializer_list>
#include <climits>
namespace beast {
namespace websocket {
namespace detail {
class stream_base_test : public beast::unit_test::suite
{
public:
void testClamp()
{
expect(detail::clamp(
std::numeric_limits<std::uint64_t>::max()) ==
std::numeric_limits<std::size_t>::max());
}
void run() override
{
testClamp();
}
};
BEAST_DEFINE_TESTSUITE(stream_base,websocket,beast);
} // detail
} // websocket
} // beast

File diff suppressed because it is too large Load Diff

View File

@@ -91,14 +91,16 @@ private:
bool log;
int state = 0;
boost::optional<endpoint_type> ep;
websocket::stream<socket_type> ws;
websocket::opcode op;
stream<socket_type> ws;
boost::asio::io_service::strand strand;
opcode op;
beast::streambuf sb;
int id;
data(bool log_, socket_type&& sock_)
: log(log_)
, ws(std::move(sock_))
, strand(ws.get_io_service())
, id([]
{
static int n = 0;
@@ -112,6 +114,7 @@ private:
: log(log_)
, ep(ep_)
, ws(std::move(sock_))
, strand(ws.get_io_service())
, id([]
{
static int n = 0;
@@ -174,8 +177,34 @@ private:
}
}
template<class Streambuf, std::size_t N>
static
bool
match(Streambuf& sb, char const(&s)[N])
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
if(sb.size() < N-1)
return false;
static_string<N-1> t;
t.resize(N-1);
buffer_copy(buffer(t.data(), t.size()),
sb.data());
if(t != s)
return false;
sb.consume(N-1);
return true;
}
void operator()(error_code ec, std::size_t)
{
(*this)(ec);
}
void operator()(error_code ec)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
auto& d = *d_;
switch(d.state)
{
@@ -191,19 +220,54 @@ private:
d.sb.consume(d.sb.size());
// read message
d.state = 2;
d.ws.async_read(d.op, d.sb, std::move(*this));
d.ws.async_read(d.op, d.sb,
d.strand.wrap(std::move(*this)));
return;
// got message
case 2:
if(ec == websocket::error::closed)
if(ec == error::closed)
return;
if(ec)
return fail(ec, "async_read");
if(match(d.sb, "RAW"))
{
d.state = 1;
boost::asio::async_write(d.ws.next_layer(),
d.sb.data(), d.strand.wrap(std::move(*this)));
return;
}
else if(match(d.sb, "TEXT"))
{
d.state = 1;
d.ws.set_option(message_type{opcode::text});
d.ws.async_write(
d.sb.data(), d.strand.wrap(std::move(*this)));
return;
}
else if(match(d.sb, "PING"))
{
ping_data payload;
d.sb.consume(buffer_copy(
buffer(payload.data(), payload.size()),
d.sb.data()));
d.state = 1;
d.ws.async_ping(payload,
d.strand.wrap(std::move(*this)));
return;
}
else if(match(d.sb, "CLOSE"))
{
d.state = 1;
d.ws.async_close({},
d.strand.wrap(std::move(*this)));
return;
}
// write message
d.state = 1;
d.ws.set_option(websocket::message_type(d.op));
d.ws.async_write(d.sb.data(), std::move(*this));
d.ws.set_option(message_type(d.op));
d.ws.async_write(d.sb.data(),
d.strand.wrap(std::move(*this)));
return;
// connected
@@ -214,7 +278,7 @@ private:
d.ws.async_handshake(
d.ep->address().to_string() + ":" +
std::to_string(d.ep->port()),
"/", std::move(*this));
"/", d.strand.wrap(std::move(*this)));
return;
}
}
@@ -226,7 +290,7 @@ private:
auto& d = *d_;
if(d.log)
{
if(ec != websocket::error::closed)
if(ec != error::closed)
std::cerr << "#" << d_->id << " " <<
what << ": " << ec.message() << std::endl;
}
@@ -256,6 +320,8 @@ private:
{
if(! acceptor_.is_open())
return;
if(ec == boost::asio::error::operation_aborted)
return;
maybe_throw(ec, "accept");
socket_type sock(std::move(sock_));
acceptor_.async_accept(sock_,

View File

@@ -101,15 +101,17 @@ private:
{
int id;
sync_echo_peer& self;
socket_type sock;
boost::asio::io_service::work work;
// Must be destroyed before work otherwise the
// io_service could be destroyed before the socket.
socket_type sock;
lambda(int id_, sync_echo_peer& self_,
socket_type&& sock_)
: id(id_)
, self(self_)
, work(sock_.get_io_service())
, sock(std::move(sock_))
, work(sock.get_io_service())
{
}
@@ -149,10 +151,31 @@ private:
}
};
template<class Streambuf, std::size_t N>
static
bool
match(Streambuf& sb, char const(&s)[N])
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
if(sb.size() < N-1)
return false;
static_string<N-1> t;
t.resize(N-1);
buffer_copy(buffer(t.data(), t.size()),
sb.data());
if(t != s)
return false;
sb.consume(N-1);
return true;
}
void
do_peer(int id, socket_type&& sock)
{
websocket::stream<socket_type> ws(std::move(sock));
using boost::asio::buffer;
using boost::asio::buffer_copy;
stream<socket_type> ws(std::move(sock));
ws.set_option(decorate(identity{}));
ws.set_option(read_message_max(64 * 1024 * 1024));
error_code ec;
@@ -164,17 +187,45 @@ private:
}
for(;;)
{
websocket::opcode op;
opcode op;
beast::streambuf sb;
ws.read(op, sb, ec);
if(ec)
{
auto const s = ec.message();
break;
ws.set_option(websocket::message_type(op));
ws.write(sb.data(), ec);
}
ws.set_option(message_type(op));
if(match(sb, "RAW"))
{
boost::asio::write(
ws.next_layer(), sb.data(), ec);
}
else if(match(sb, "TEXT"))
{
ws.set_option(message_type{opcode::text});
ws.write(sb.data(), ec);
}
else if(match(sb, "PING"))
{
ping_data payload;
sb.consume(buffer_copy(
buffer(payload.data(), payload.size()),
sb.data()));
ws.ping(payload, ec);
}
else if(match(sb, "CLOSE"))
{
ws.close({}, ec);
}
else
{
ws.write(sb.data(), ec);
}
if(ec)
break;
}
if(ec && ec != websocket::error::closed)
if(ec && ec != error::closed)
{
fail(id, ec, "read");
}