From 901d652fd72c94136cc06fd9a973c88fb26d0007 Mon Sep 17 00:00:00 2001 From: joaquintides Date: Mon, 11 Dec 2023 17:37:35 +0100 Subject: [PATCH] implemented garbage collection --- .../unordered/detail/foa/concurrent_table.hpp | 164 +++++++++++++++--- 1 file changed, 139 insertions(+), 25 deletions(-) diff --git a/include/boost/unordered/detail/foa/concurrent_table.hpp b/include/boost/unordered/detail/foa/concurrent_table.hpp index 8e41d185..47994fd6 100644 --- a/include/boost/unordered/detail/foa/concurrent_table.hpp +++ b/include/boost/unordered/detail/foa/concurrent_table.hpp @@ -50,6 +50,13 @@ #include #endif +#if defined(BOOST_UNORDERED_LATCH_FREE) +#include +#include +#endif + +std::atomic garbage_collected=0; + namespace boost{ namespace unordered{ namespace detail{ @@ -481,7 +488,14 @@ public: std::size_t n=default_bucket_count,const Hash& h_=Hash(), const Pred& pred_=Pred(),const Allocator& al_=Allocator()): super{n,h_,pred_,al_} - {} + { +#if defined(BOOST_UNORDERED_LATCH_FREE) + for(std::size_t i=0;ihash_for(x); std::size_t res=0; unprotected_internal_visit( - group_shared{},x,this->position_for(hash),hash, + group_shared{},x,this->position_for(hash),hash, // NB: shared access [&,this](group_type* pg,unsigned int n,element_type* p) { - auto expected=group_type::reduced_hash(hash); - if(reinterpret_cast*>(pg)[n]. - compare_exchange_strong(expected,1)){ - if(f(cast_for(group_shared{},type_policy::value_from(*p)))){ - wait_for_epochs(); - super::erase(pg,n,p); + if(f(cast_for(group_shared{},type_policy::value_from(*p)))){ + auto expected=group_type::reduced_hash(hash); // TODO: prove no ABA + auto pc=reinterpret_cast(pg)+n; + auto mco=group_type::maybe_caused_overflow(pc); + if(reinterpret_cast*>(pg)[n]. + compare_exchange_strong(expected,1)){ + retire_element(static_cast(p-this->arrays.elements()),mco); res=1; } - else{ - pg->set(n,expected); - } } }); return res; @@ -1002,14 +1014,13 @@ private: #if defined(BOOST_UNORDERED_LATCH_FREE) using epoch_type=std::atomic; - using epoch_array=cache_aligned_array; // TODO: adapt 128 to the machine #endif #if defined(BOOST_UNORDERED_LATCH_FREE) struct group_shared_lock_guard { - group_shared_lock_guard(epoch_type& e_):e{e_}{++e;} - ~group_shared_lock_guard(){--e;} + group_shared_lock_guard(epoch_type& e_):e{e_}{} + ~group_shared_lock_guard(){e=0;} epoch_type& e; }; @@ -1017,8 +1028,8 @@ private: { group_exclusive_lock_guard( epoch_type& e_,group_access::exclusive_lock_guard&& lck_): - e{e_},lck{std::move(lck_)}{++e;} - ~group_exclusive_lock_guard(){--e;} + e{e_},lck{std::move(lck_)}{} + ~group_exclusive_lock_guard(){e=0;} epoch_type& e; group_access::exclusive_lock_guard lck; @@ -1081,7 +1092,9 @@ private: inline group_shared_lock_guard access(group_shared,std::size_t pos)const { #if defined(BOOST_UNORDERED_LATCH_FREE) - return {epochs[thread_id()%epochs.size()]}; + auto& e=garbage_vectors[thread_id()%garbage_vectors.size()].epoch; + e=current_epoch.load(std::memory_order_relaxed); + return {e}; #else return this->arrays.group_accesses()[pos].shared_access(); #endif @@ -1091,10 +1104,9 @@ private: group_exclusive,std::size_t pos)const { #if defined(BOOST_UNORDERED_LATCH_FREE) - return { - epochs[thread_id()%epochs.size()], - this->arrays.group_accesses()[pos].exclusive_access() - }; + auto& e=garbage_vectors[thread_id()%garbage_vectors.size()].epoch; + e=current_epoch.load(std::memory_order_relaxed); + return {e,this->arrays.group_accesses()[pos].exclusive_access()}; #else return this->arrays.group_accesses()[pos].exclusive_access(); #endif @@ -1605,6 +1617,9 @@ private: { auto lck=exclusive_access(); if(this->size_ctrl.size==this->size_ctrl.ml){ +#if defined(BOOST_UNORDERED_LATCH_FREE) + garbage_collect(); +#endif this->unchecked_rehash_for_growth(); } } @@ -1849,12 +1864,111 @@ private: mutable multimutex_type mutexes; #if defined(BOOST_UNORDERED_LATCH_FREE) - mutable epoch_array epochs; + struct retired_element{ + static constexpr std::size_t available_=std::size_t(-1), + reserved_=std::size_t(-2); + retired_element()=default; + retired_element(const retired_element&){} - void wait_for_epochs() + std::atomic epoch=available_; + std::atomic pos; + std::atomic mco; + }; + struct garbage_vector { - for(std::size_t i=0;i1){} + static constexpr std::size_t N=128; + static constexpr std::size_t min_for_epoch_bump=64; + static constexpr std::size_t min_for_garbage_collection=64; + + epoch_type epoch=0; + std::atomic epoch_bump=0; + std::vector retired_elements; + std::atomic wpos=0; + std::atomic rpos=0; + std::atomic reading=false; + unsigned char pad[16]; + }; + mutable std::array garbage_vectors; + epoch_type current_epoch=1; + + std::size_t max_safe_epoch() + { + std::size_t e=retired_element::reserved_; + for(std::size_t i=0;isize_ctrl.ml; + --this->size_ctrl.size; +/* if(wpos-v.rpos>=garbage_vector::min_for_garbage_collection){ + v.epoch=++current_epoch; + garbage_collect(); + } + else */if(++v.epoch_bump==garbage_vector::min_for_epoch_bump) + { + v.epoch=++current_epoch; + v.epoch_bump=0; + garbage_collect(); + } + return; + } + if(expected==retired_element::reserved_){ /* other thread wrote */ + } + else{ /* vector full */ + //std::cout<<"*"; + v.epoch=++current_epoch; + garbage_collect(); + } + wpos=v.wpos; + expected=retired_element::available_; + } + } + + void garbage_collect(garbage_vector& v,std::size_t max_epoch) + { + if(v.rpos==v.wpos)return; + + bool expected=false; + if(v.reading.compare_exchange_strong(expected,true)){ + std::size_t rpos=v.rpos; + std::size_t num_mcos=0; + for(;;){ + auto& e=v.retired_elements[rpos%v.garbage_vector::N]; + if(e.epoch>max_epoch)break; + //++garbage_collected; + this->destroy_element(this->arrays.elements()+e.pos); + this->arrays.groups()[e.pos/N].reset(e.pos%N); + num_mcos+=e.mco; + e.epoch=retired_element::available_; + ++rpos; + } + this->size_ctrl.ml+=(rpos-v.rpos)-num_mcos; + v.rpos=rpos; + v.reading=false; + } + } + + BOOST_NOINLINE void garbage_collect() + { + auto max_epoch=max_safe_epoch(); + for(std::size_t i=0;i