From a444cfb1ae140f542d225c22331d424136d76a8f Mon Sep 17 00:00:00 2001 From: joaquintides Date: Sat, 10 Feb 2024 20:51:09 +0100 Subject: [PATCH] implemented latch-free element save --- .../unordered/detail/foa/concurrent_table.hpp | 163 ++++++++++-------- 1 file changed, 93 insertions(+), 70 deletions(-) diff --git a/include/boost/unordered/detail/foa/concurrent_table.hpp b/include/boost/unordered/detail/foa/concurrent_table.hpp index 2e96a66e..f7e112bc 100644 --- a/include/boost/unordered/detail/foa/concurrent_table.hpp +++ b/include/boost/unordered/detail/foa/concurrent_table.hpp @@ -235,17 +235,13 @@ struct atomic_integral #if defined(BOOST_UNORDERED_LATCH_FREE) struct group_access { - using mutex_type=rw_spinlock; - using exclusive_lock_guard=lock_guard; using access_counter_type=std::atomic; using insert_counter_type=std::atomic; - exclusive_lock_guard exclusive_access(){return exclusive_lock_guard{m};} access_counter_type& access_counter(){return acnt;} insert_counter_type& insert_counter(){return icnt;} private: - mutex_type m; access_counter_type acnt{0}; insert_counter_type icnt{0}; }; @@ -557,7 +553,7 @@ public: #if defined(BOOST_UNORDERED_LATCH_FREE) ~concurrent_table(){ std::cout - <<"version: 2024/02/09 19:40; " + <<"version: 2024/02/10 20:50; " <<"lf: "<<(double)size()/capacity()<<"; " <<"size: "<reset(n); - auto& sc=local_size_ctrl(); - sc.size.fetch_sub(1,std::memory_order_relaxed); - sc.mcos.fetch_add( - !pg->is_not_overflowed(hash),std::memory_order_relaxed); - res=1; + //pg->reset(n); + auto pc=reinterpret_cast*>(pg)+n; + if(pc->exchange(0,std::memory_order_release)!=0){ + auto& sc=local_size_ctrl(); + sc.size.fetch_sub(1,std::memory_order_relaxed); + sc.mcos.fetch_add( + !pg->is_not_overflowed(hash),std::memory_order_relaxed); + res=1; + } } }); return res; @@ -1060,7 +1059,7 @@ private: #if defined(BOOST_UNORDERED_LATCH_FREE) struct group_shared_lock_guard{}; - using group_exclusive_lock_guard=typename group_access::exclusive_lock_guard; + struct group_exclusive_lock_guard{}; using group_access_counter_type=typename group_access::access_counter_type; using group_insert_counter_type=typename group_access::insert_counter_type; #else @@ -1129,7 +1128,11 @@ private: inline group_exclusive_lock_guard access( group_exclusive,std::size_t pos)const { +#if defined(BOOST_UNORDERED_LATCH_FREE) + return {}; +#else return this->arrays.group_accesses()[pos].exclusive_access(); +#endif } #if defined(BOOST_UNORDERED_LATCH_FREE) @@ -1288,40 +1291,47 @@ private: #endif #if defined(BOOST_UNORDERED_LATCH_FREE) - BOOST_FORCEINLINE element_type load_element( - element_type* p,std::size_t pos)const + template + BOOST_FORCEINLINE auto load_access(std::size_t pos,F f)const + ->std::pair { auto& acnt=access_counter(pos); for(;;){ auto n=acnt.load(std::memory_order_acquire); if(n%2==1)continue; - auto res=*p; + auto res=f(); std::atomic_thread_fence(std::memory_order_acquire); if(acnt.load(std::memory_order_acquire)!=n)continue; - else return res; + else return {res,n}; } } - template - BOOST_FORCEINLINE void save_element( - element_type* p,std::size_t pos,Args&&... args) + template + BOOST_FORCEINLINE bool save_access(std::size_t pos,boost::uint32_t n,F f)const { auto& acnt=access_counter(pos); + if(!acnt.compare_exchange_strong(n,n+1,std::memory_order_release)){ + return false; + } + std::atomic_thread_fence(std::memory_order_release); + + f(); + + acnt.store(n+2,std::memory_order_release); + return true; + } + + template + BOOST_FORCEINLINE void save_access(std::size_t pos,F f)const /* no previous load cnt */ + { + auto &acnt=access_counter(pos); for(;;){ - //auto n=acnt.load(std::memory_order_acquire); - //if(n%2==1)continue; - //if(!acnt.compare_exchange_strong( - // n,n+1,std::memory_order_release))continue; - acnt.fetch_add(1,std::memory_order_release); + auto n=acnt.load(std::memory_order_acquire); + if(n%2==1)continue; - this->construct_element(p,std::forward(args)...); - - std::atomic_thread_fence(std::memory_order_release); - //acnt.store(n+2,std::memory_order_release); - acnt.fetch_add(1,std::memory_order_release); - return; + if(save_access(pos,n,f))return; } } @@ -1330,6 +1340,7 @@ private: GroupAccessMode access_mode, const Key& x,std::size_t pos0,std::size_t hash,F&& f)const { + startover: prober pb(pos0); do{ auto pos=pb.get(); @@ -1338,15 +1349,20 @@ private: if(mask){ auto p=this->arrays.elements()+pos*N; BOOST_UNORDERED_PREFETCH_ELEMENTS(p,N); - auto lck=access(access_mode,pos); do{ auto n=unchecked_countr_zero(mask); if(BOOST_LIKELY(pg->is_occupied(n))){ - auto e=load_element(p+n,pos); + auto [e,acnt]=load_access(pos,[&]{return *(p+n);}); if(BOOST_LIKELY(this->pred()(x,this->key_from(e)))){ - f(pg,n,&e); - // TODO: saving part - return 1; + if constexpr(std::is_same::value){ + // ALTERNATIVE: offline f(pg,n,&e) + + if(!save_access(pos,acnt,[&]{f(pg,n,p+n);}))goto startover; + return 1; + }else{ + f(pg,n,&e); + return 1; + } } } mask&=mask-1; @@ -1666,7 +1682,7 @@ private: { ~assign_insert_counter_on_exit() { - counter.store(x,std::memory_order_relaxed); + counter.store(x,std::memory_order_release); } group_insert_counter_type &counter; @@ -1682,43 +1698,50 @@ private: auto hash=this->hash_for(k); auto pos0=this->position_for(hash); - for(;;){ - startover: - boost::uint32_t counter=insert_counter(pos0).load(std::memory_order_relaxed); - if(unprotected_visit( - access_mode,k,pos0,hash,std::forward(f)))return 0; + startover: + boost::uint32_t counter=0; + do{ + counter=insert_counter(pos0).load(std::memory_order_acquire); + } + while(BOOST_UNLIKELY(counter%2==1)); - std::size_t pbn=max_probe; - for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){ - auto pos=pb.get(); - auto pg=this->arrays.groups()+pos; - auto lck=access(group_exclusive{},pos); - auto mask=pg->match_available(); - if(BOOST_LIKELY(mask!=0)){ - auto n=unchecked_countr_zero(mask); - latch_free_reserve_slot rslot{pg,n}; - if(BOOST_UNLIKELY(!rslot.succeeded())){ - /* slot wasn't empty */ - goto startover; - } - if(BOOST_UNLIKELY( - !insert_counter(pos0).compare_exchange_strong( - counter,counter+1,std::memory_order_relaxed))){ - /* other thread inserted from pos0, need to start over */ - goto startover; - } - save_element( - this->arrays.elements()+pos*N+n,pos,std::forward(args)...); - pg->set(n,hash); - rslot.commit(); - auto& sc=local_size_ctrl(); - sc.size.fetch_add(1,std::memory_order_relaxed); - sc.mcos.fetch_sub(!pg->is_not_overflowed(hash),std::memory_order_relaxed); - return 1; + if(unprotected_visit( + access_mode,k,pos0,hash,std::forward(f)))return 0; + + std::size_t pbn=max_probe; + for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){ + auto pos=pb.get(); + auto pg=this->arrays.groups()+pos; + auto mask=pg->match_available(); + if(BOOST_LIKELY(mask!=0)){ + auto n=unchecked_countr_zero(mask); + latch_free_reserve_slot rslot{pg,n}; + if(BOOST_UNLIKELY(!rslot.succeeded())){ + /* slot wasn't empty */ + goto startover; } - if(!pbn--)return -1; - pg->mark_overflow(hash); + if(BOOST_UNLIKELY( + !insert_counter(pos0).compare_exchange_strong( + counter,counter+1,std::memory_order_relaxed))){ + /* other thread inserted from pos0, need to start over */ + goto startover; + } + { + assign_insert_counter_on_exit a{insert_counter(pos0),counter+2}; + save_access(pos,[&]{ + this->construct_element( + this->arrays.elements()+pos*N+n,std::forward(args)...); + }); + pg->set(n,hash); // SHOULD GO INSIDE save_access? + } + rslot.commit(); + auto& sc=local_size_ctrl(); + sc.size.fetch_add(1,std::memory_order_relaxed); + sc.mcos.fetch_sub(!pg->is_not_overflowed(hash),std::memory_order_relaxed); + return 1; } + if(!pbn--)return -1; + pg->mark_overflow(hash); } } #else