diff --git a/include/boost/unordered/detail/foa/concurrent_table.hpp b/include/boost/unordered/detail/foa/concurrent_table.hpp index 7e4b283e..2724f403 100644 --- a/include/boost/unordered/detail/foa/concurrent_table.hpp +++ b/include/boost/unordered/detail/foa/concurrent_table.hpp @@ -54,11 +54,8 @@ #include #include #include -#include "utils/epoch.h" #endif -std::atomic nodes_wasted=0; - namespace boost{ namespace unordered{ namespace detail{ @@ -235,6 +232,24 @@ struct atomic_integral * unprotected_norehash_emplace_or_visit). */ +#if defined(BOOST_UNORDERED_LATCH_FREE) +struct group_access +{ + using mutex_type=rw_spinlock; + using exclusive_lock_guard=lock_guard; + using access_counter_type=std::atomic; + using insert_counter_type=std::atomic; + + exclusive_lock_guard exclusive_access(){return exclusive_lock_guard{m};} + access_counter_type& access_counter(){return acnt;} + insert_counter_type& insert_counter(){return icnt;} + +private: + mutex_type m; + access_counter_type acnt{0}; + insert_counter_type icnt{0}; +}; +#else struct group_access { using mutex_type=rw_spinlock; @@ -250,6 +265,7 @@ private: mutex_type m; insert_counter_type cnt{0}; }; +#endif template group_access* dummy_group_accesses() @@ -538,10 +554,10 @@ public: concurrent_table(std::move(x),x.make_empty_arrays()) {} -#if defined(BOOST_UNORDERED_LATCH_FREE) +#if 0&&defined(BOOST_UNORDERED_LATCH_FREE) ~concurrent_table(){ std::cout - <<"version: 2024/02/03 17:30; " + <<"version: 2024/02/09 12:00; " <<"lf: "<<(double)size()/capacity()<<"; " <<"capacity: "<hash_for(x); std::size_t res=0; unprotected_internal_visit( - group_shared{},x,this->position_for(hash),hash, // NB: shared access - [&,this](group_type* pg,unsigned int n,element_type* p,value_type *pv) + group_exclusive{},x,this->position_for(hash),hash, + [&,this](group_type* pg,unsigned int n,element_type* p) { - if(f(cast_for(group_shared{},*pv))){ - if(p->p.compare_exchange_strong(pv,nullptr)){ - pg->reset(n); - auto& sc=local_size_ctrl(); - sc.size-=1; - sc.mcos+=!pg->is_not_overflowed(hash); - epoch::get_default_pool().Retire(pv); - res=1; - } + if(f(cast_for(group_shared{},type_policy::value_from(*p)))){ + pg->reset(n); + auto& sc=local_size_ctrl(); + sc.size-=1; + sc.mcos+=!pg->is_not_overflowed(hash); + res=1; } }); return res; @@ -1042,29 +1055,15 @@ private: reentrancy_bichecked>; #if defined(BOOST_UNORDERED_LATCH_FREE) - struct group_shared_lock_guard - { - group_shared_lock_guard():id{epoch::internal::get_epoch().announce()}{} - ~group_shared_lock_guard(){epoch::internal::get_epoch().unannounce(id);} - - int id; - }; - struct group_exclusive_lock_guard - { - group_exclusive_lock_guard(group_access::exclusive_lock_guard&& lck_): - id{epoch::internal::get_epoch().announce()}, - lck{std::move(lck_)}{} - ~group_exclusive_lock_guard(){epoch::internal::get_epoch().unannounce(id);} - - int id; - group_access::exclusive_lock_guard lck; - }; + struct group_shared_lock_guard{}; + using group_exclusive_lock_guard=typename group_access::exclusive_lock_guard; + using group_access_counter_type=typename group_access::access_counter_type; + using group_insert_counter_type=typename group_access::insert_counter_type; #else using group_shared_lock_guard=typename group_access::shared_lock_guard; using group_exclusive_lock_guard=typename group_access::exclusive_lock_guard; -#endif - using group_insert_counter_type=typename group_access::insert_counter_type; +#endif concurrent_table(const concurrent_table& x,exclusive_lock_guard): super{x}{} @@ -1129,6 +1128,13 @@ private: return this->arrays.group_accesses()[pos].exclusive_access(); } +#if defined(BOOST_UNORDERED_LATCH_FREE) + inline group_access_counter_type& access_counter(std::size_t pos)const + { + return this->arrays.group_accesses()[pos].access_counter(); + } +#endif + inline group_insert_counter_type& insert_counter(std::size_t pos)const { return this->arrays.group_accesses()[pos].insert_counter(); @@ -1250,17 +1256,10 @@ private: GroupAccessMode access_mode, const Key& x,std::size_t pos0,std::size_t hash,F&& f)const { -#if defined(BOOST_UNORDERED_LATCH_FREE) - return unprotected_internal_visit( - access_mode,x,pos0,hash, - [&](group_type*,unsigned int,element_type* p,value_type* pv) - {f(cast_for(access_mode,*pv));}); -#else return unprotected_internal_visit( access_mode,x,pos0,hash, [&](group_type*,unsigned int,element_type* p) {f(cast_for(access_mode,type_policy::value_from(*p)));}); -#endif } /* check occupation with previous unsynced match */ @@ -1269,7 +1268,6 @@ private: static bool is_occupied(group_type* pg,std::size_t pos) { return reinterpret_cast*>(pg)[pos]>1; - //return true; } #else static bool is_occupied(group_type* pg,std::size_t pos) @@ -1284,6 +1282,75 @@ private: #pragma warning(disable:4800) #endif +#if defined(BOOST_UNORDERED_LATCH_FREE) + BOOST_FORCEINLINE element_type load_element( + element_type* p,std::size_t pos)const + { + auto& acnt=access_counter(pos); + for(;;){ + auto n=acnt.load(std::memory_order_acquire); + if(n%2==1)continue; + + auto res=*p; + + std::atomic_thread_fence(std::memory_order_acquire); + if(acnt.load(std::memory_order_acquire)!=n)continue; + else return res; + } + } + + BOOST_FORCEINLINE void save_element( + element_type* p,const element_type& x,std::size_t pos)const + { + auto& acnt=access_counter(pos); + for(;;){ + auto n=acnt.load(std::memory_order_acquire); + if(n%2==1)continue; + if(!acnt.compare_exchange_strong( + n,n+1,std::memory_order_release))continue; + + *p=x; + + std::atomic_thread_fence(std::memory_order_release); + acnt.store(n+2,std::memory_order_release); + return; + } + } + + template + BOOST_FORCEINLINE std::size_t unprotected_internal_visit( + GroupAccessMode access_mode, + const Key& x,std::size_t pos0,std::size_t hash,F&& f)const + { + prober pb(pos0); + do{ + auto pos=pb.get(); + auto pg=this->arrays.groups()+pos; + auto mask=pg->match(hash); + if(mask){ + auto p=this->arrays.elements()+pos*N; + BOOST_UNORDERED_PREFETCH_ELEMENTS(p,N); + do{ + auto n=unchecked_countr_zero(mask); + if(BOOST_LIKELY(pg->is_occupied(n))){ + auto e=load_element(p+n,pos); + if(BOOST_LIKELY(this->pred()(x,this->key_from(e)))){ + f(pg,n,&e); + // TODO: saving part + return 1; + } + } + mask&=mask-1; + }while(mask); + } + if(BOOST_LIKELY(pg->is_not_overflowed(hash))){ + return 0; + } + } + while(BOOST_LIKELY(pb.next(this->arrays.groups_size_mask))); + return 0; + } +#else template BOOST_FORCEINLINE std::size_t unprotected_internal_visit( GroupAccessMode access_mode, @@ -1300,10 +1367,10 @@ private: auto lck=access(access_mode,pos); do{ auto n=unchecked_countr_zero(mask); - auto pv=p[n].p.load(std::memory_order_relaxed); - if(BOOST_LIKELY(pv&&bool(this->pred()(x,this->key_from(*pv))))){ - f(pg,n,p+n,pv); - return 1; + if(BOOST_LIKELY( + pg->is_occupied(n)&&bool(this->pred()(x,this->key_from(p[n]))))){ + f(pg,n,p+n); + return 1; } mask&=mask-1; }while(mask); @@ -1315,6 +1382,7 @@ private: while(BOOST_LIKELY(pb.next(this->arrays.groups_size_mask))); return 0; } +#endif template BOOST_FORCEINLINE std::size_t unprotected_bulk_visit( @@ -1585,6 +1653,14 @@ private: bool commit_=false; }; + struct assign_access_counter_on_exit + { + ~assign_access_counter_on_exit(){counter=x;} + + group_access_counter_type &counter; + boost::uint32_t x; + }; + struct assign_insert_counter_on_exit { ~assign_insert_counter_on_exit(){counter=x;} @@ -1604,8 +1680,7 @@ private: for(;;){ startover: - boost::uint32_t counter=0; - while(BOOST_UNLIKELY((counter=insert_counter(pos0))%2==1)){} + boost::uint32_t counter=insert_counter(pos0); if(unprotected_visit( access_mode,k,pos0,hash,std::forward(f)))return 0; @@ -1613,6 +1688,7 @@ private: for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){ auto pos=pb.get(); auto pg=this->arrays.groups()+pos; + auto lck=access(group_exclusive{},pos); auto mask=pg->match_available(); if(BOOST_LIKELY(mask!=0)){ auto n=unchecked_countr_zero(mask); @@ -1621,20 +1697,19 @@ private: /* slot wasn't empty */ goto startover; } - if(BOOST_UNLIKELY( - !insert_counter(pos0).compare_exchange_weak(counter,counter+1))){ + if(BOOST_UNLIKELY(insert_counter(pos0)++!=counter)){ /* other thread inserted from pos0, need to start over */ goto startover; } { - assign_insert_counter_on_exit a{insert_counter(pos0),counter+2}; - auto p=this->arrays.elements()+pos*N+n; - this->construct_element(p,std::forward(args)...); - pg->set(n,hash); - for(prober pb2(pos0);pb2.get()!=pos; - pb2.next(this->arrays.groups_size_mask)){ - this->arrays.groups()[pb2.get()].mark_overflow(hash); + auto p=this->arrays.elements()+pos*N+n; + { + auto acounter=++access_counter(pos); + assign_access_counter_on_exit aa{access_counter(pos),acounter+1}; + this->construct_element(p,std::forward(args)...); + std::atomic_thread_fence(std::memory_order_release); } + pg->set(n,hash); } rslot.commit(); auto& sc=local_size_ctrl(); @@ -1643,7 +1718,7 @@ private: return 1; } if(!pbn--)return -1; - //pg->mark_overflow(hash); + pg->mark_overflow(hash); } } }