Fix write loop in advanced server examples

Fixes #2739
This commit is contained in:
Mohammad Nejati
2024-01-01 16:35:56 +00:00
committed by Mohammad Nejati
parent 0d1e2d64f8
commit 5998feda44
3 changed files with 40 additions and 71 deletions

View File

@@ -38,8 +38,9 @@
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <memory>
#include <list> #include <list>
#include <memory>
#include <queue>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector> #include <vector>
@@ -499,7 +500,7 @@ class http_session
} }
static constexpr std::size_t queue_limit = 8; // max responses static constexpr std::size_t queue_limit = 8; // max responses
std::vector<http::message_generator> response_queue_; std::queue<http::message_generator> response_queue_;
// The parser is stored in an optional container so we can // The parser is stored in an optional container so we can
// construct it from scratch it at the beginning of each new message. // construct it from scratch it at the beginning of each new message.
@@ -580,42 +581,30 @@ public:
queue_write(http::message_generator response) queue_write(http::message_generator response)
{ {
// Allocate and store the work // Allocate and store the work
response_queue_.push_back(std::move(response)); response_queue_.push(std::move(response));
// If there was no previous work, start the write // If there was no previous work, start the write loop
// loop
if (response_queue_.size() == 1) if (response_queue_.size() == 1)
do_write(); do_write();
} }
// Called to start/continue the write-loop. Should not be called when // Called to start/continue the write-loop. Should not be called when
// write_loop is already active. // write_loop is already active.
// void
// Returns `true` if the caller may initiate a new read
bool
do_write() do_write()
{ {
bool const was_full =
response_queue_.size() == queue_limit;
if(! response_queue_.empty()) if(! response_queue_.empty())
{ {
http::message_generator msg = bool keep_alive = response_queue_.front().keep_alive();
std::move(response_queue_.front());
response_queue_.erase(response_queue_.begin());
bool keep_alive = msg.keep_alive();
beast::async_write( beast::async_write(
derived().stream(), derived().stream(),
std::move(msg), std::move(response_queue_.front()),
beast::bind_front_handler( beast::bind_front_handler(
&http_session::on_write, &http_session::on_write,
derived().shared_from_this(), derived().shared_from_this(),
keep_alive)); keep_alive));
} }
return was_full;
} }
void void
@@ -636,12 +625,13 @@ public:
return derived().do_eof(); return derived().do_eof();
} }
// Inform the queue that a write completed // Resume the read if it has been paused
if(do_write()) if(response_queue_.size() == queue_limit)
{
// Read another request
do_read(); do_read();
}
response_queue_.pop();
do_write();
} }
}; };

View File

@@ -31,6 +31,7 @@
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <queue>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector> #include <vector>
@@ -452,7 +453,7 @@ class http_session
} }
static constexpr std::size_t queue_limit = 8; // max responses static constexpr std::size_t queue_limit = 8; // max responses
std::vector<http::message_generator> response_queue_; std::queue<http::message_generator> response_queue_;
// The parser is stored in an optional container so we can // The parser is stored in an optional container so we can
// construct it from scratch it at the beginning of each new message. // construct it from scratch it at the beginning of each new message.
@@ -533,42 +534,30 @@ public:
queue_write(http::message_generator response) queue_write(http::message_generator response)
{ {
// Allocate and store the work // Allocate and store the work
response_queue_.push_back(std::move(response)); response_queue_.push(std::move(response));
// If there was no previous work, start the write // If there was no previous work, start the write loop
// loop
if (response_queue_.size() == 1) if (response_queue_.size() == 1)
do_write(); do_write();
} }
// Called to start/continue the write-loop. Should not be called when // Called to start/continue the write-loop. Should not be called when
// write_loop is already active. // write_loop is already active.
// void
// Returns `true` if the caller may initiate a new read
bool
do_write() do_write()
{ {
bool const was_full =
response_queue_.size() == queue_limit;
if(! response_queue_.empty()) if(! response_queue_.empty())
{ {
http::message_generator msg = bool keep_alive = response_queue_.front().keep_alive();
std::move(response_queue_.front());
response_queue_.erase(response_queue_.begin());
bool keep_alive = msg.keep_alive();
beast::async_write( beast::async_write(
derived().stream(), derived().stream(),
std::move(msg), std::move(response_queue_.front()),
beast::bind_front_handler( beast::bind_front_handler(
&http_session::on_write, &http_session::on_write,
derived().shared_from_this(), derived().shared_from_this(),
keep_alive)); keep_alive));
} }
return was_full;
} }
void void
@@ -589,12 +578,13 @@ public:
return derived().do_eof(); return derived().do_eof();
} }
// Inform the queue that a write completed // Resume the read if it has been paused
if(do_write()) if(response_queue_.size() == queue_limit)
{
// Read another request
do_read(); do_read();
}
response_queue_.pop();
do_write();
} }
}; };

View File

@@ -28,6 +28,7 @@
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <queue>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector> #include <vector>
@@ -328,7 +329,7 @@ class http_session : public std::enable_shared_from_this<http_session>
std::shared_ptr<std::string const> doc_root_; std::shared_ptr<std::string const> doc_root_;
static constexpr std::size_t queue_limit = 8; // max responses static constexpr std::size_t queue_limit = 8; // max responses
std::vector<http::message_generator> response_queue_; std::queue<http::message_generator> response_queue_;
// The parser is stored in an optional container so we can // The parser is stored in an optional container so we can
// construct it from scratch it at the beginning of each new message. // construct it from scratch it at the beginning of each new message.
@@ -344,7 +345,6 @@ public:
{ {
static_assert(queue_limit > 0, static_assert(queue_limit > 0,
"queue limit must be positive"); "queue limit must be positive");
response_queue_.reserve(queue_limit);
} }
// Start the session // Start the session
@@ -420,42 +420,30 @@ private:
queue_write(http::message_generator response) queue_write(http::message_generator response)
{ {
// Allocate and store the work // Allocate and store the work
response_queue_.push_back(std::move(response)); response_queue_.push(std::move(response));
// If there was no previous work, start the write // If there was no previous work, start the write loop
// loop
if (response_queue_.size() == 1) if (response_queue_.size() == 1)
do_write(); do_write();
} }
// Called to start/continue the write-loop. Should not be called when // Called to start/continue the write-loop. Should not be called when
// write_loop is already active. // write_loop is already active.
// void
// Returns `true` if the caller may initiate a new read
bool
do_write() do_write()
{ {
bool const was_full =
response_queue_.size() == queue_limit;
if(! response_queue_.empty()) if(! response_queue_.empty())
{ {
http::message_generator msg = bool keep_alive = response_queue_.front().keep_alive();
std::move(response_queue_.front());
response_queue_.erase(response_queue_.begin());
bool keep_alive = msg.keep_alive();
beast::async_write( beast::async_write(
stream_, stream_,
std::move(msg), std::move(response_queue_.front()),
beast::bind_front_handler( beast::bind_front_handler(
&http_session::on_write, &http_session::on_write,
shared_from_this(), shared_from_this(),
keep_alive)); keep_alive));
} }
return was_full;
} }
void void
@@ -476,12 +464,13 @@ private:
return do_close(); return do_close();
} }
// Inform the queue that a write completed // Resume the read if it has been paused
if(do_write()) if(response_queue_.size() == queue_limit)
{
// Read another request
do_read(); do_read();
}
response_queue_.pop();
do_write();
} }
void void