// // Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BEAST_WEBSOCKET_IMPL_STREAM_IPP #define BEAST_WEBSOCKET_IMPL_STREAM_IPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace beast { namespace websocket { namespace detail { template void stream_base::open(role_type role) { role_ = role; } template void stream_base::prepare_fh(close_code::value& code) { // continuation without an active message if(! rd_cont_ && rd_fh_.op == opcode::cont) { code = close_code::protocol_error; return; } // new data frame when continuation expected if(rd_cont_ && ! is_control(rd_fh_.op) && rd_fh_.op != opcode::cont) { code = close_code::protocol_error; return; } if(rd_fh_.mask) prepare_key(rd_key_, rd_fh_.key); if(! is_control(rd_fh_.op)) { if(rd_fh_.op != opcode::cont) { rd_size_ = rd_fh_.len; rd_opcode_ = rd_fh_.op; } else { if(rd_size_ > std::numeric_limits< std::uint64_t>::max() - rd_fh_.len) { code = close_code::too_big; return; } rd_size_ += rd_fh_.len; } if(rd_msg_max_ && rd_size_ > rd_msg_max_) { code = close_code::too_big; return; } rd_need_ = rd_fh_.len; rd_cont_ = ! rd_fh_.fin; } } template void stream_base::write_close( Streambuf& sb, close_reason const& cr) { using namespace boost::endian; frame_header fh; fh.op = opcode::close; fh.fin = true; fh.rsv1 = false; fh.rsv2 = false; fh.rsv3 = false; fh.len = cr.code == close_code::none ? 0 : 2 + cr.reason.size(); fh.mask = role_ == detail::role_type::client; if(fh.mask) fh.key = maskgen_(); detail::write(sb, fh); if(cr.code != close_code::none) { detail::prepared_key_type key; if(fh.mask) detail::prepare_key(key, fh.key); { std::uint8_t b[2]; ::new(&b[0]) big_uint16_buf_t{ (std::uint16_t)cr.code}; auto d = sb.prepare(2); boost::asio::buffer_copy(d, boost::asio::buffer(b)); if(fh.mask) detail::mask_inplace(d, key); sb.commit(2); } if(! cr.reason.empty()) { auto d = sb.prepare(cr.reason.size()); boost::asio::buffer_copy(d, boost::asio::const_buffer( cr.reason.data(), cr.reason.size())); if(fh.mask) detail::mask_inplace(d, key); sb.commit(cr.reason.size()); } } } template void stream_base::write_ping(Streambuf& sb, opcode op, ping_data const& data) { frame_header fh; fh.op = op; fh.fin = true; fh.rsv1 = false; fh.rsv2 = false; fh.rsv3 = false; fh.len = data.size(); fh.mask = role_ == role_type::client; if(fh.mask) fh.key = maskgen_(); detail::write(sb, fh); if(data.empty()) return; detail::prepared_key_type key; if(fh.mask) detail::prepare_key(key, fh.key); auto d = sb.prepare(data.size()); boost::asio::buffer_copy(d, boost::asio::const_buffers_1( data.data(), data.size())); if(fh.mask) detail::mask_inplace(d, key); sb.commit(data.size()); } } // detail //------------------------------------------------------------------------------ template template stream:: stream(Args&&... args) : stream_(std::forward(args)...) { } template void stream:: accept() { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; accept(boost::asio::null_buffers{}, ec); if(ec) throw system_error{ec}; } template void stream:: accept(error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); accept(boost::asio::null_buffers{}, ec); } template template typename async_completion< AcceptHandler, void(error_code)>::result_type stream:: async_accept(AcceptHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements requirements not met"); return async_accept(boost::asio::null_buffers{}, std::forward(handler)); } template template void stream:: accept(ConstBufferSequence const& buffers) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); static_assert(is_ConstBufferSequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); error_code ec; accept(buffers, ec); if(ec) throw system_error{ec}; } template template void stream:: accept(ConstBufferSequence const& buffers, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); static_assert(beast::is_ConstBufferSequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); using boost::asio::buffer_copy; using boost::asio::buffer_size; reset(); stream_.buffer().commit(buffer_copy( stream_.buffer().prepare( buffer_size(buffers)), buffers)); http::request_v1 m; http::read(next_layer(), stream_.buffer(), m, ec); if(ec) return; accept(m, ec); } template template typename async_completion< AcceptHandler, void(error_code)>::result_type stream:: async_accept(ConstBufferSequence const& bs, AcceptHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements requirements not met"); static_assert(beast::is_ConstBufferSequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); beast::async_completion< AcceptHandler, void(error_code) > completion(handler); accept_op{ completion.handler, *this, bs}; return completion.result.get(); } template template void stream:: accept(http::request_v1 const& request) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; accept(request, ec); if(ec) throw system_error{ec}; } template template void stream:: accept(http::request_v1 const& req, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); reset(); auto const res = build_response(req); http::write(stream_, res, ec); if(ec) return; if(res.status != 101) { ec = error::handshake_failed; // VFALCO TODO Respect keep alive setting, perform // teardown if Connection: close. return; } open(detail::role_type::server); } template template typename async_completion< AcceptHandler, void(error_code)>::result_type stream:: async_accept(http::request_v1 const& req, AcceptHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements requirements not met"); beast::async_completion< AcceptHandler, void(error_code) > completion(handler); reset(); response_op{ completion.handler, *this, req, boost_asio_handler_cont_helpers:: is_continuation(completion.handler)}; return completion.result.get(); } template void stream:: handshake(boost::string_ref const& host, boost::string_ref const& resource) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; handshake(host, resource, ec); if(ec) throw system_error{ec}; } template void stream:: handshake(boost::string_ref const& host, boost::string_ref const& resource, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); reset(); std::string key; http::write(stream_, build_request(host, resource, key), ec); if(ec) return; http::response_v1 res; http::read(next_layer(), stream_.buffer(), res, ec); if(ec) return; do_response(res, key, ec); } template template typename async_completion< HandshakeHandler, void(error_code)>::result_type stream:: async_handshake(boost::string_ref const& host, boost::string_ref const& resource, HandshakeHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements not met"); beast::async_completion< HandshakeHandler, void(error_code) > completion(handler); handshake_op{ completion.handler, *this, host, resource}; return completion.result.get(); } template void stream:: close(close_reason const& cr) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; close(cr, ec); if(ec) throw system_error{ec}; } template void stream:: close(close_reason const& cr, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); assert(! wr_close_); wr_close_ = true; detail::frame_streambuf fb; write_close(fb, cr); boost::asio::write(stream_, fb.data(), ec); failed_ = ec != 0; } template template typename async_completion< CloseHandler, void(error_code)>::result_type stream:: async_close(close_reason const& cr, CloseHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements not met"); beast::async_completion< CloseHandler, void(error_code) > completion(handler); close_op{ completion.handler, *this, cr}; return completion.result.get(); } template void stream:: ping(ping_data const& payload) { error_code ec; ping(payload, ec); if(ec) throw system_error{ec}; } template void stream:: ping(ping_data const& payload, error_code& ec) { detail::frame_streambuf sb; write_ping( sb, opcode::ping, payload); boost::asio::write(stream_, sb.data(), ec); } template template typename async_completion< PingHandler, void(error_code)>::result_type stream:: async_ping(ping_data const& payload, PingHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements requirements not met"); beast::async_completion< PingHandler, void(error_code) > completion(handler); ping_op{ completion.handler, *this, payload}; return completion.result.get(); } template template void stream:: read(opcode& op, Streambuf& streambuf) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; read(op, streambuf, ec); if(ec) throw system_error{ec}; } template template void stream:: read(opcode& op, Streambuf& streambuf, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); frame_info fi; for(;;) { read_frame(fi, streambuf, ec); if(ec) break; op = fi.op; if(fi.fin) break; } } template template typename async_completion< ReadHandler, void(error_code)>::result_type stream:: async_read(opcode& op, Streambuf& streambuf, ReadHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements requirements not met"); static_assert(beast::is_Streambuf::value, "Streambuf requirements not met"); beast::async_completion< ReadHandler, void(error_code) > completion(handler); read_op{ completion.handler, *this, op, streambuf}; return completion.result.get(); } template template void stream:: read_frame(frame_info& fi, Streambuf& streambuf) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; read_frame(fi, streambuf, ec); if(ec) throw system_error{ec}; } template template void stream:: read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); close_code::value code{}; for(;;) { if(rd_need_ == 0) { // read header detail::frame_streambuf fb; do_read_fh(fb, code, ec); failed_ = ec != 0; if(failed_) return; if(code != close_code::none) break; if(detail::is_control(rd_fh_.op)) { // read control payload if(rd_fh_.len > 0) { auto const mb = fb.prepare( static_cast(rd_fh_.len)); fb.commit(boost::asio::read(stream_, mb, ec)); failed_ = ec != 0; if(failed_) return; if(rd_fh_.mask) detail::mask_inplace(mb, rd_key_); fb.commit(static_cast(rd_fh_.len)); } if(rd_fh_.op == opcode::ping) { ping_data data; detail::read(data, fb.data()); fb.reset(); write_ping( fb, opcode::pong, data); boost::asio::write(stream_, fb.data(), ec); failed_ = ec != 0; if(failed_) return; continue; } else if(rd_fh_.op == opcode::pong) { ping_data payload; detail::read(payload, fb.data()); if(pong_cb_) pong_cb_(payload); continue; } assert(rd_fh_.op == opcode::close); { detail::read(cr_, fb.data(), code); if(code != close_code::none) break; if(! wr_close_) { auto cr = cr_; if(cr.code == close_code::none) cr.code = close_code::normal; cr.reason = ""; fb.reset(); wr_close_ = true; write_close(fb, cr); boost::asio::write(stream_, fb.data(), ec); failed_ = ec != 0; if(failed_) return; } break; } } if(rd_need_ == 0 && ! rd_fh_.fin) { // empty frame continue; } } // read payload auto smb = streambuf.prepare( detail::clamp(rd_need_)); auto const bytes_transferred = stream_.read_some(smb, ec); failed_ = ec != 0; if(failed_) return; rd_need_ -= bytes_transferred; auto const pb = prepare_buffers( bytes_transferred, smb); if(rd_fh_.mask) detail::mask_inplace(pb, rd_key_); if(rd_opcode_ == opcode::text) { if(! rd_utf8_check_.write(pb) || (rd_need_ == 0 && rd_fh_.fin && ! rd_utf8_check_.finish())) { code = close_code::bad_payload; break; } } streambuf.commit(bytes_transferred); fi.op = rd_opcode_; fi.fin = rd_fh_.fin && rd_need_ == 0; return; } if(code != close_code::none) { // Fail the connection (per rfc6455) if(! wr_close_) { wr_close_ = true; detail::frame_streambuf fb; write_close(fb, code); boost::asio::write(stream_, fb.data(), ec); failed_ = ec != 0; if(failed_) return; } websocket_helpers::call_teardown(next_layer(), ec); failed_ = ec != 0; if(failed_) return; ec = error::failed; failed_ = true; return; } if(! ec) websocket_helpers::call_teardown(next_layer(), ec); if(! ec) ec = error::closed; failed_ = ec != 0; } template template typename async_completion< ReadHandler, void(error_code)>::result_type stream:: async_read_frame(frame_info& fi, Streambuf& streambuf, ReadHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements requirements not met"); static_assert(beast::is_Streambuf::value, "Streambuf requirements not met"); beast::async_completion< ReadHandler, void(error_code)> completion(handler); read_frame_op{ completion.handler, *this, fi, streambuf}; return completion.result.get(); } template template void stream:: write(ConstBufferSequence const& buffers) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; write(buffers, ec); if(ec) throw system_error{ec}; } template template void stream:: write(ConstBufferSequence const& bs, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); static_assert(beast::is_ConstBufferSequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); using boost::asio::buffer_size; consuming_buffers cb(bs); auto remain = buffer_size(cb); for(;;) { auto const n = detail::clamp(remain, wr_frag_size_); remain -= n; auto const fin = remain <= 0; write_frame(fin, prepare_buffers(n, cb), ec); cb.consume(n); if(ec) return; if(fin) break; } } template template typename async_completion< WriteHandler, void(error_code)>::result_type stream:: async_write(ConstBufferSequence const& bs, WriteHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements not met"); static_assert(beast::is_ConstBufferSequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); beast::async_completion< WriteHandler, void(error_code)> completion(handler); write_op{ completion.handler, *this, bs}; return completion.result.get(); } template template void stream:: write_frame(bool fin, ConstBufferSequence const& buffers) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); error_code ec; write_frame(fin, buffers, ec); if(ec) throw system_error{ec}; } template template void stream:: write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec) { static_assert(is_SyncStream::value, "SyncStream requirements not met"); static_assert(beast::is_ConstBufferSequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); using boost::asio::buffer_copy; using boost::asio::buffer_size; using boost::asio::mutable_buffers_1; detail::frame_header fh; fh.op = wr_cont_ ? opcode::cont : wr_opcode_; wr_cont_ = ! fin; fh.fin = fin; fh.rsv1 = false; fh.rsv2 = false; fh.rsv3 = false; fh.len = buffer_size(bs); fh.mask = role_ == detail::role_type::client; if(fh.mask) fh.key = maskgen_(); detail::fh_streambuf fh_buf; detail::write(fh_buf, fh); if(! fh.mask) { // send header and payload boost::asio::write(stream_, buffer_cat(fh_buf.data(), bs), ec); failed_ = ec != 0; return; } detail::prepared_key_type key; detail::prepare_key(key, fh.key); auto const tmp_size = detail::clamp(fh.len, mask_buf_size_); std::unique_ptr up( new std::uint8_t[tmp_size]); std::uint64_t remain = fh.len; consuming_buffers cb(bs); { auto const n = detail::clamp(remain, tmp_size); mutable_buffers_1 mb{up.get(), n}; buffer_copy(mb, cb); cb.consume(n); remain -= n; detail::mask_inplace(mb, key); // send header and payload boost::asio::write(stream_, buffer_cat(fh_buf.data(), mb), ec); if(ec) { failed_ = ec != 0; return; } } while(remain > 0) { auto const n = detail::clamp(remain, tmp_size); mutable_buffers_1 mb{up.get(), n}; buffer_copy(mb, cb); cb.consume(n); remain -= n; detail::mask_inplace(mb, key); // send payload boost::asio::write(stream_, mb, ec); if(ec) { failed_ = ec != 0; return; } } } template template typename async_completion< WriteHandler, void(error_code)>::result_type stream:: async_write_frame(bool fin, ConstBufferSequence const& bs, WriteHandler&& handler) { static_assert(is_AsyncStream::value, "AsyncStream requirements not met"); static_assert(beast::is_ConstBufferSequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); beast::async_completion< WriteHandler, void(error_code) > completion(handler); write_frame_op{completion.handler, *this, fin, bs}; return completion.result.get(); } //------------------------------------------------------------------------------ template void stream:: reset() { failed_ = false; rd_need_ = 0; rd_cont_ = false; wr_close_ = false; wr_cont_ = false; wr_block_ = nullptr; // should be nullptr on close anyway pong_data_ = nullptr; // should be nullptr on close anyway stream_.buffer().consume( stream_.buffer().size()); } template http::request_v1 stream:: build_request(boost::string_ref const& host, boost::string_ref const& resource, std::string& key) { http::request_v1 req; req.url = "/"; req.version = 11; req.method = "GET"; req.headers.insert("Host", host); req.headers.insert("Upgrade", "websocket"); key = detail::make_sec_ws_key(maskgen_); req.headers.insert("Sec-WebSocket-Key", key); req.headers.insert("Sec-WebSocket-Version", "13"); (*d_)(req); http::prepare(req, http::connection::upgrade); return req; } template template http::response_v1 stream:: build_response(http::request_v1 const& req) { auto err = [&](std::string const& text) { http::response_v1 res; res.status = 400; res.reason = http::reason_string(res.status); res.version = req.version; res.body = text; (*d_)(res); prepare(res, (is_keep_alive(req) && keep_alive_) ? http::connection::keep_alive : http::connection::close); return res; }; if(req.version < 11) return err("HTTP version 1.1 required"); if(req.method != "GET") return err("Wrong method"); if(! is_upgrade(req)) return err("Expected Upgrade request"); if(! req.headers.exists("Host")) return err("Missing Host"); if(! req.headers.exists("Sec-WebSocket-Key")) return err("Missing Sec-WebSocket-Key"); if(! http::token_list{req.headers["Upgrade"]}.exists("websocket")) return err("Missing websocket Upgrade token"); { auto const version = req.headers["Sec-WebSocket-Version"]; if(version.empty()) return err("Missing Sec-WebSocket-Version"); if(version != "13") { http::response_v1 res; res.status = 426; res.reason = http::reason_string(res.status); res.version = req.version; res.headers.insert("Sec-WebSocket-Version", "13"); prepare(res, (is_keep_alive(req) && keep_alive_) ? http::connection::keep_alive : http::connection::close); return res; } } http::response_v1 res; res.status = 101; res.reason = http::reason_string(res.status); res.version = req.version; res.headers.insert("Upgrade", "websocket"); { auto const key = req.headers["Sec-WebSocket-Key"]; res.headers.insert("Sec-WebSocket-Accept", detail::make_sec_ws_accept(key)); } res.headers.replace("Server", "Beast.WSProto"); (*d_)(res); http::prepare(res, http::connection::upgrade); return res; } template template void stream:: do_response(http::response_v1 const& res, boost::string_ref const& key, error_code& ec) { // VFALCO Review these error codes auto fail = [&]{ ec = error::response_failed; }; if(res.version < 11) return fail(); if(res.status != 101) return fail(); if(! is_upgrade(res)) return fail(); if(! http::token_list{res.headers["Upgrade"]}.exists("websocket")) return fail(); if(! res.headers.exists("Sec-WebSocket-Accept")) return fail(); if(res.headers["Sec-WebSocket-Accept"] != detail::make_sec_ws_accept(key)) return fail(); open(detail::role_type::client); } template void stream:: do_read_fh(detail::frame_streambuf& fb, close_code::value& code, error_code& ec) { fb.commit(boost::asio::read( stream_, fb.prepare(2), ec)); if(ec) return; auto const n = detail::read_fh1( rd_fh_, fb, role_, code); if(code != close_code::none) return; if(n > 0) { fb.commit(boost::asio::read( stream_, fb.prepare(n), ec)); if(ec) return; } detail::read_fh2( rd_fh_, fb, role_, code); if(code != close_code::none) return; prepare_fh(code); } } // websocket } // beast #endif