implemented latch-free element save

This commit is contained in:
joaquintides
2024-02-10 20:51:09 +01:00
parent db779b949e
commit a444cfb1ae

View File

@ -235,17 +235,13 @@ struct atomic_integral
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
struct group_access struct group_access
{ {
using mutex_type=rw_spinlock;
using exclusive_lock_guard=lock_guard<mutex_type>;
using access_counter_type=std::atomic<boost::uint32_t>; using access_counter_type=std::atomic<boost::uint32_t>;
using insert_counter_type=std::atomic<boost::uint32_t>; using insert_counter_type=std::atomic<boost::uint32_t>;
exclusive_lock_guard exclusive_access(){return exclusive_lock_guard{m};}
access_counter_type& access_counter(){return acnt;} access_counter_type& access_counter(){return acnt;}
insert_counter_type& insert_counter(){return icnt;} insert_counter_type& insert_counter(){return icnt;}
private: private:
mutex_type m;
access_counter_type acnt{0}; access_counter_type acnt{0};
insert_counter_type icnt{0}; insert_counter_type icnt{0};
}; };
@ -557,7 +553,7 @@ public:
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
~concurrent_table(){ ~concurrent_table(){
std::cout std::cout
<<"version: 2024/02/09 19:40; " <<"version: 2024/02/10 20:50; "
<<"lf: "<<(double)size()/capacity()<<"; " <<"lf: "<<(double)size()/capacity()<<"; "
<<"size: "<<size()<<", " <<"size: "<<size()<<", "
<<"capacity: "<<capacity()<<"; " <<"capacity: "<<capacity()<<"; "
@ -874,12 +870,15 @@ public:
[&,this](group_type* pg,unsigned int n,element_type* p) [&,this](group_type* pg,unsigned int n,element_type* p)
{ {
if(f(cast_for(group_shared{},type_policy::value_from(*p)))){ if(f(cast_for(group_shared{},type_policy::value_from(*p)))){
pg->reset(n); //pg->reset(n);
auto& sc=local_size_ctrl(); auto pc=reinterpret_cast<std::atomic<unsigned char>*>(pg)+n;
sc.size.fetch_sub(1,std::memory_order_relaxed); if(pc->exchange(0,std::memory_order_release)!=0){
sc.mcos.fetch_add( auto& sc=local_size_ctrl();
!pg->is_not_overflowed(hash),std::memory_order_relaxed); sc.size.fetch_sub(1,std::memory_order_relaxed);
res=1; sc.mcos.fetch_add(
!pg->is_not_overflowed(hash),std::memory_order_relaxed);
res=1;
}
} }
}); });
return res; return res;
@ -1060,7 +1059,7 @@ private:
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
struct group_shared_lock_guard{}; struct group_shared_lock_guard{};
using group_exclusive_lock_guard=typename group_access::exclusive_lock_guard; struct group_exclusive_lock_guard{};
using group_access_counter_type=typename group_access::access_counter_type; using group_access_counter_type=typename group_access::access_counter_type;
using group_insert_counter_type=typename group_access::insert_counter_type; using group_insert_counter_type=typename group_access::insert_counter_type;
#else #else
@ -1129,7 +1128,11 @@ private:
inline group_exclusive_lock_guard access( inline group_exclusive_lock_guard access(
group_exclusive,std::size_t pos)const group_exclusive,std::size_t pos)const
{ {
#if defined(BOOST_UNORDERED_LATCH_FREE)
return {};
#else
return this->arrays.group_accesses()[pos].exclusive_access(); return this->arrays.group_accesses()[pos].exclusive_access();
#endif
} }
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
@ -1288,40 +1291,47 @@ private:
#endif #endif
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
BOOST_FORCEINLINE element_type load_element( template<typename F>
element_type* p,std::size_t pos)const BOOST_FORCEINLINE auto load_access(std::size_t pos,F f)const
->std::pair<decltype(f()),boost::uint32_t>
{ {
auto& acnt=access_counter(pos); auto& acnt=access_counter(pos);
for(;;){ for(;;){
auto n=acnt.load(std::memory_order_acquire); auto n=acnt.load(std::memory_order_acquire);
if(n%2==1)continue; if(n%2==1)continue;
auto res=*p; auto res=f();
std::atomic_thread_fence(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_acquire);
if(acnt.load(std::memory_order_acquire)!=n)continue; if(acnt.load(std::memory_order_acquire)!=n)continue;
else return res; else return {res,n};
} }
} }
template<typename... Args> template<typename F>
BOOST_FORCEINLINE void save_element( BOOST_FORCEINLINE bool save_access(std::size_t pos,boost::uint32_t n,F f)const
element_type* p,std::size_t pos,Args&&... args)
{ {
auto& acnt=access_counter(pos); auto& acnt=access_counter(pos);
if(!acnt.compare_exchange_strong(n,n+1,std::memory_order_release)){
return false;
}
std::atomic_thread_fence(std::memory_order_release);
f();
acnt.store(n+2,std::memory_order_release);
return true;
}
template<typename F>
BOOST_FORCEINLINE void save_access(std::size_t pos,F f)const /* no previous load cnt */
{
auto &acnt=access_counter(pos);
for(;;){ for(;;){
//auto n=acnt.load(std::memory_order_acquire); auto n=acnt.load(std::memory_order_acquire);
//if(n%2==1)continue; if(n%2==1)continue;
//if(!acnt.compare_exchange_strong(
// n,n+1,std::memory_order_release))continue;
acnt.fetch_add(1,std::memory_order_release);
this->construct_element(p,std::forward<Args>(args)...); if(save_access(pos,n,f))return;
std::atomic_thread_fence(std::memory_order_release);
//acnt.store(n+2,std::memory_order_release);
acnt.fetch_add(1,std::memory_order_release);
return;
} }
} }
@ -1330,6 +1340,7 @@ private:
GroupAccessMode access_mode, GroupAccessMode access_mode,
const Key& x,std::size_t pos0,std::size_t hash,F&& f)const const Key& x,std::size_t pos0,std::size_t hash,F&& f)const
{ {
startover:
prober pb(pos0); prober pb(pos0);
do{ do{
auto pos=pb.get(); auto pos=pb.get();
@ -1338,15 +1349,20 @@ private:
if(mask){ if(mask){
auto p=this->arrays.elements()+pos*N; auto p=this->arrays.elements()+pos*N;
BOOST_UNORDERED_PREFETCH_ELEMENTS(p,N); BOOST_UNORDERED_PREFETCH_ELEMENTS(p,N);
auto lck=access(access_mode,pos);
do{ do{
auto n=unchecked_countr_zero(mask); auto n=unchecked_countr_zero(mask);
if(BOOST_LIKELY(pg->is_occupied(n))){ if(BOOST_LIKELY(pg->is_occupied(n))){
auto e=load_element(p+n,pos); auto [e,acnt]=load_access(pos,[&]{return *(p+n);});
if(BOOST_LIKELY(this->pred()(x,this->key_from(e)))){ if(BOOST_LIKELY(this->pred()(x,this->key_from(e)))){
f(pg,n,&e); if constexpr(std::is_same<GroupAccessMode,group_exclusive>::value){
// TODO: saving part // ALTERNATIVE: offline f(pg,n,&e)
return 1;
if(!save_access(pos,acnt,[&]{f(pg,n,p+n);}))goto startover;
return 1;
}else{
f(pg,n,&e);
return 1;
}
} }
} }
mask&=mask-1; mask&=mask-1;
@ -1666,7 +1682,7 @@ private:
{ {
~assign_insert_counter_on_exit() ~assign_insert_counter_on_exit()
{ {
counter.store(x,std::memory_order_relaxed); counter.store(x,std::memory_order_release);
} }
group_insert_counter_type &counter; group_insert_counter_type &counter;
@ -1682,43 +1698,50 @@ private:
auto hash=this->hash_for(k); auto hash=this->hash_for(k);
auto pos0=this->position_for(hash); auto pos0=this->position_for(hash);
for(;;){ startover:
startover: boost::uint32_t counter=0;
boost::uint32_t counter=insert_counter(pos0).load(std::memory_order_relaxed); do{
if(unprotected_visit( counter=insert_counter(pos0).load(std::memory_order_acquire);
access_mode,k,pos0,hash,std::forward<F>(f)))return 0; }
while(BOOST_UNLIKELY(counter%2==1));
std::size_t pbn=max_probe; if(unprotected_visit(
for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){ access_mode,k,pos0,hash,std::forward<F>(f)))return 0;
auto pos=pb.get();
auto pg=this->arrays.groups()+pos; std::size_t pbn=max_probe;
auto lck=access(group_exclusive{},pos); for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){
auto mask=pg->match_available(); auto pos=pb.get();
if(BOOST_LIKELY(mask!=0)){ auto pg=this->arrays.groups()+pos;
auto n=unchecked_countr_zero(mask); auto mask=pg->match_available();
latch_free_reserve_slot rslot{pg,n}; if(BOOST_LIKELY(mask!=0)){
if(BOOST_UNLIKELY(!rslot.succeeded())){ auto n=unchecked_countr_zero(mask);
/* slot wasn't empty */ latch_free_reserve_slot rslot{pg,n};
goto startover; if(BOOST_UNLIKELY(!rslot.succeeded())){
} /* slot wasn't empty */
if(BOOST_UNLIKELY( goto startover;
!insert_counter(pos0).compare_exchange_strong(
counter,counter+1,std::memory_order_relaxed))){
/* other thread inserted from pos0, need to start over */
goto startover;
}
save_element(
this->arrays.elements()+pos*N+n,pos,std::forward<Args>(args)...);
pg->set(n,hash);
rslot.commit();
auto& sc=local_size_ctrl();
sc.size.fetch_add(1,std::memory_order_relaxed);
sc.mcos.fetch_sub(!pg->is_not_overflowed(hash),std::memory_order_relaxed);
return 1;
} }
if(!pbn--)return -1; if(BOOST_UNLIKELY(
pg->mark_overflow(hash); !insert_counter(pos0).compare_exchange_strong(
counter,counter+1,std::memory_order_relaxed))){
/* other thread inserted from pos0, need to start over */
goto startover;
}
{
assign_insert_counter_on_exit a{insert_counter(pos0),counter+2};
save_access(pos,[&]{
this->construct_element(
this->arrays.elements()+pos*N+n,std::forward<Args>(args)...);
});
pg->set(n,hash); // SHOULD GO INSIDE save_access?
}
rslot.commit();
auto& sc=local_size_ctrl();
sc.size.fetch_add(1,std::memory_order_relaxed);
sc.mcos.fetch_sub(!pg->is_not_overflowed(hash),std::memory_order_relaxed);
return 1;
} }
if(!pbn--)return -1;
pg->mark_overflow(hash);
} }
} }
#else #else