From 35138450c8b3ebc563d873947eb5e17385f3ed8f Mon Sep 17 00:00:00 2001 From: joaquintides Date: Sat, 16 Dec 2023 11:55:23 +0100 Subject: [PATCH] desynced size control --- .../unordered/detail/foa/concurrent_table.hpp | 175 ++++++++++++------ 1 file changed, 119 insertions(+), 56 deletions(-) diff --git a/include/boost/unordered/detail/foa/concurrent_table.hpp b/include/boost/unordered/detail/foa/concurrent_table.hpp index 04ea5f4f..be89a965 100644 --- a/include/boost/unordered/detail/foa/concurrent_table.hpp +++ b/include/boost/unordered/detail/foa/concurrent_table.hpp @@ -497,12 +497,27 @@ public: concurrent_table( std::size_t n=default_bucket_count,const Hash& h_=Hash(), const Pred& pred_=Pred(),const Allocator& al_=Allocator()): +#if defined(BOOST_UNORDERED_LATCH_FREE) + super{(std::max)(n,std::size_t(1)),h_,pred_,al_} // TODO: won't work with n==0 +#else super{n,h_,pred_,al_} +#endif { #if defined(BOOST_UNORDERED_LATCH_FREE) + using retired_element_allocator_type= + typename boost::allocator_rebind::type; + using retired_element_traits= + boost::allocator_traits; + + retired_element_allocator_type ral=this->al(); for(std::size_t i=0;i::type; + using retired_element_traits= + boost::allocator_traits; + + retired_element_allocator_type ral=this->al(); + for(std::size_t i=0;isize_ctrl.size; + for(std::size_t i=0;iarrays.group_accesses()[pos].exclusive_access()}; #else @@ -1551,37 +1592,35 @@ private: if(unprotected_visit( access_mode,k,pos0,hash,std::forward(f)))return 0; - reserve_size rsize(*this); - if(BOOST_LIKELY(rsize.succeeded())){ - for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){ - auto pos=pb.get(); - auto pg=this->arrays.groups()+pos; - auto mask=pg->match_available(); - if(BOOST_LIKELY(mask!=0)){ - auto n=unchecked_countr_zero(mask); - unsigned char expected=0; - if(!reinterpret_cast*>(pg)[n]. - compare_exchange_weak(expected,1)){ - /* slot wasn't empty */ - goto startover; - } - if(BOOST_UNLIKELY( - !insert_counter(pos0).compare_exchange_weak(counter,counter+1))){ - /* other thread inserted from pos0, need to start over */ - pg->reset(n); - goto startover; - } - auto p=this->arrays.elements()+pos*N+n; - this->construct_element(p,std::forward(args)...); - pg->set(n,hash); - ++insert_counter(pos0); - rsize.commit(); - return 1; + std::size_t pbn=max_probe; + for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){ + auto pos=pb.get(); + auto pg=this->arrays.groups()+pos; + auto mask=pg->match_available(); + if(BOOST_LIKELY(mask!=0)){ + auto n=unchecked_countr_zero(mask); + unsigned char expected=0; + if(!reinterpret_cast*>(pg)[n]. + compare_exchange_weak(expected,1)){ + /* slot wasn't empty */ + goto startover; } - pg->mark_overflow(hash); + if(BOOST_UNLIKELY( + !insert_counter(pos0).compare_exchange_weak(counter,counter+1))){ + /* other thread inserted from pos0, need to start over */ + pg->reset(n); + goto startover; + } + auto p=this->arrays.elements()+pos*N+n; + this->construct_element(p,std::forward(args)...); + pg->set(n,hash); + insert_counter(pos0)=counter+2; + ++local_garbage_vector().size; + return 1; } + if(!pbn--)return -1; + pg->mark_overflow(hash); } - else return -1; } } #else @@ -1630,13 +1669,23 @@ private: void rehash_if_full() { +#if defined(BOOST_UNORDERED_LATCH_FREE) + auto lck=exclusive_access(); + update_size_ctrl(); + if(this->size_ctrl.size>=this->size_ctrl.ml){ // NB >= + garbage_collect(); + this->unchecked_rehash_for_growth(); + max_probe=default_max_probe; + } + else{ + ++max_probe; + } +#else auto lck=exclusive_access(); if(this->size_ctrl.size==this->size_ctrl.ml){ -#if defined(BOOST_UNORDERED_LATCH_FREE) - garbage_collect(); -#endif this->unchecked_rehash_for_growth(); } +#endif } template @@ -1887,7 +1936,6 @@ private: std::atomic epoch=available_; std::atomic pos; - std::atomic mco; }; struct garbage_vector { @@ -1895,16 +1943,27 @@ private: static constexpr std::size_t min_for_epoch_bump=64; static constexpr std::size_t min_for_garbage_collection=64; - epoch_type epoch=0; - std::atomic epoch_bump=0; - std::vector retired_elements; - std::atomic wpos=0; - std::atomic rpos=0; - std::atomic reading=false; - unsigned char pad[16]; + using ssize_t=std::make_signed::type; + + epoch_type epoch=0; + std::atomic epoch_bump=0; + retired_element* retired_elements; + std::atomic wpos=0; + std::atomic rpos=0; + std::atomic reading=false; + std::atomic size=0; + std::atomic mcos=0; }; + static constexpr std::size_t default_max_probe=3; + mutable std::array garbage_vectors; epoch_type current_epoch=1; + std::size_t max_probe=default_max_probe; + + garbage_vector& local_garbage_vector()const + { + return garbage_vectors[thread_id()%garbage_vectors.size()]; + } std::size_t max_safe_epoch() { @@ -1919,23 +1978,18 @@ private: BOOST_FORCEINLINE void retire_element(std::size_t pos,bool mco) { - auto& v=garbage_vectors[thread_id()%garbage_vectors.size()]; + auto& v=local_garbage_vector(); std::size_t wpos=v.wpos; std::size_t expected=retired_element::available_; for(;;){ auto& e=v.retired_elements[wpos%v.garbage_vector::N]; if(e.epoch.compare_exchange_strong(expected,retired_element::reserved_)){ e.pos=pos; - e.mco=mco; e.epoch=v.epoch.load(); ++v.wpos; - --this->size_ctrl.ml; - --this->size_ctrl.size; -/* if(wpos-v.rpos>=garbage_vector::min_for_garbage_collection){ - v.epoch=++current_epoch; - garbage_collect(); - } - else */if(++v.epoch_bump==garbage_vector::min_for_epoch_bump) + --v.size; + v.mcos+=mco; + if(++v.epoch_bump==garbage_vector::min_for_epoch_bump) { v.epoch=++current_epoch; v.epoch_bump=0; @@ -1946,7 +2000,6 @@ private: if(expected==retired_element::reserved_){ /* other thread wrote */ } else{ /* vector full */ - //std::cout<<"*"; v.epoch=++current_epoch; garbage_collect(); } @@ -1955,6 +2008,19 @@ private: } } + void update_size_ctrl() + { + for(std::size_t i=0;isize_ctrl.size+=v.size; + if(this->size_ctrl.ml>=v.mcos)this->size_ctrl.ml-=v.mcos; + else this->size_ctrl.ml=0; + v.size=0; + v.mcos=0; + } + //std::cout<<"("<size_ctrl.size<<","<size_ctrl.ml<<")"; + } + void garbage_collect(garbage_vector& v,std::size_t max_epoch) { if(v.rpos==v.wpos)return; @@ -1962,18 +2028,15 @@ private: bool expected=false; if(v.reading.compare_exchange_strong(expected,true)){ std::size_t rpos=v.rpos; - std::size_t num_mcos=0; for(;;){ auto& e=v.retired_elements[rpos%v.garbage_vector::N]; if(e.epoch>max_epoch)break; //++garbage_collected; this->destroy_element(this->arrays.elements()+e.pos); this->arrays.groups()[e.pos/N].reset(e.pos%N); - num_mcos+=e.mco; e.epoch=retired_element::available_; ++rpos; } - this->size_ctrl.ml+=(rpos-v.rpos)-num_mcos; v.rpos=rpos; v.reading=false; }