Fix race in http-crawl

This commit is contained in:
Richard Hodges
2020-07-07 12:31:45 +02:00
parent 03d43aa397
commit f036077547
2 changed files with 31 additions and 35 deletions

View File

@ -1,3 +1,7 @@
* Fix race in http-crawl example.
--------------------------------------------------------------------------------
Version 298: Version 298:
* Support BOOST_ASIO_NO_TS_EXECUTORS. * Support BOOST_ASIO_NO_TS_EXECUTORS.

View File

@ -46,7 +46,6 @@ using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
// This structure aggregates statistics on all the sites // This structure aggregates statistics on all the sites
class crawl_report class crawl_report
{ {
net::io_context& ioc_;
net::strand< net::strand<
net::io_context::executor_type> strand_; net::io_context::executor_type> strand_;
std::atomic<std::size_t> index_; std::atomic<std::size_t> index_;
@ -55,8 +54,7 @@ class crawl_report
public: public:
crawl_report(net::io_context& ioc) crawl_report(net::io_context& ioc)
: ioc_(ioc) : strand_(ioc.get_executor())
, strand_(ioc_.get_executor())
, index_(0) , index_(0)
, hosts_(urls_large_data()) , hosts_(urls_large_data())
{ {
@ -150,6 +148,7 @@ class worker : public std::enable_shared_from_this<worker>
}; };
crawl_report& report_; crawl_report& report_;
net::strand<net::io_context::executor_type> ex_;
tcp::resolver resolver_; tcp::resolver resolver_;
beast::tcp_stream stream_; beast::tcp_stream stream_;
beast::flat_buffer buffer_; // (Must persist between reads) beast::flat_buffer buffer_; // (Must persist between reads)
@ -164,8 +163,9 @@ public:
crawl_report& report, crawl_report& report,
net::io_context& ioc) net::io_context& ioc)
: report_(report) : report_(report)
, resolver_(net::make_strand(ioc)) , ex_(net::make_strand(ioc.get_executor()))
, stream_(net::make_strand(ioc)) , resolver_(ex_)
, stream_(ex_)
{ {
// Set up the common fields of the request // Set up the common fields of the request
req_.version(11); req_.version(11);
@ -347,21 +347,14 @@ int main(int argc, char* argv[])
std::cerr << std::cerr <<
"Usage: http-crawl <threads>\n" << "Usage: http-crawl <threads>\n" <<
"Example:\n" << "Example:\n" <<
" http-crawl 100 1\n"; " http-crawl 100\n";
return EXIT_FAILURE; return EXIT_FAILURE;
} }
auto const threads = std::max<int>(1, std::atoi(argv[1])); auto const threads = std::max<int>(1, std::atoi(argv[1]));
// The io_context is required for all I/O // The io_context is used to aggregate the statistics
net::io_context ioc; net::io_context ioc;
// Building a tracked executor ensures that the underlying context's
// run() function will not return until the tracked executor is destroyed
net::any_io_executor work =
net::require(
ioc.get_executor(),
net::execution::outstanding_work.tracked);
// The report holds the aggregated statistics // The report holds the aggregated statistics
crawl_report report{ioc}; crawl_report report{ioc};
@ -371,17 +364,26 @@ int main(int argc, char* argv[])
std::vector<std::thread> workers; std::vector<std::thread> workers;
workers.reserve(threads + 1); workers.reserve(threads + 1);
for(int i = 0; i < threads; ++i) for(int i = 0; i < threads; ++i)
{
// Each worker will eventually add some data to the aggregated
// report. Outstanding work is tracked in each worker to
// represent the forthcoming delivery of this data by that
// worker.
auto reporting_work = net::require(
ioc.get_executor(),
net::execution::outstanding_work.tracked);
workers.emplace_back( workers.emplace_back(
[&report] [&report, reporting_work] {
{ // We use a separate io_context for each worker because
// We use a separate io_context for each worker because // the asio resolver simulates asynchronous operation using
// the asio resolver simulates asynchronous operation using // a dedicated worker thread per io_context, and we want to
// a dedicated worker thread per io_context, and we want to // do a lot of name resolutions in parallel.
// do a lot of name resolutions in parallel. net::io_context ioc;
net::io_context ioc{1}; std::make_shared<worker>(report, ioc)->run();
std::make_shared<worker>(report, ioc)->run(); ioc.run();
ioc.run(); });
}); }
// Add another thread to run the main io_context which // Add another thread to run the main io_context which
// is used to aggregate the statistics // is used to aggregate the statistics
@ -393,17 +395,7 @@ int main(int argc, char* argv[])
// Now block until all threads exit // Now block until all threads exit
for(std::size_t i = 0; i < workers.size(); ++i) for(std::size_t i = 0; i < workers.size(); ++i)
{ workers[i].join();
auto& thread = workers[i];
// If this is the last thread, reset the
// work object so that it can return from run.
if(i == workers.size() - 1)
work = {};
// Wait for the thread to exit
thread.join();
}
std::cout << std::cout <<
"Elapsed time: " << chrono::duration_cast<chrono::seconds>(t.elapsed()).count() << " seconds\n"; "Elapsed time: " << chrono::duration_cast<chrono::seconds>(t.elapsed()).count() << " seconds\n";