implemented latch-free element load

This commit is contained in:
joaquintides
2024-02-09 12:33:41 +01:00
parent 687a446784
commit 77a0f6bb74

View File

@ -54,11 +54,8 @@
#include <algorithm>
#include <vector>
#include <iostream>
#include "utils/epoch.h"
#endif
std::atomic<std::size_t> nodes_wasted=0;
namespace boost{
namespace unordered{
namespace detail{
@ -235,6 +232,24 @@ struct atomic_integral
* unprotected_norehash_emplace_or_visit).
*/
#if defined(BOOST_UNORDERED_LATCH_FREE)
struct group_access
{
using mutex_type=rw_spinlock;
using exclusive_lock_guard=lock_guard<mutex_type>;
using access_counter_type=std::atomic<boost::uint32_t>;
using insert_counter_type=std::atomic<boost::uint32_t>;
exclusive_lock_guard exclusive_access(){return exclusive_lock_guard{m};}
access_counter_type& access_counter(){return acnt;}
insert_counter_type& insert_counter(){return icnt;}
private:
mutex_type m;
access_counter_type acnt{0};
insert_counter_type icnt{0};
};
#else
struct group_access
{
using mutex_type=rw_spinlock;
@ -250,6 +265,7 @@ private:
mutex_type m;
insert_counter_type cnt{0};
};
#endif
template<std::size_t Size>
group_access* dummy_group_accesses()
@ -538,10 +554,10 @@ public:
concurrent_table(std::move(x),x.make_empty_arrays())
{}
#if defined(BOOST_UNORDERED_LATCH_FREE)
#if 0&&defined(BOOST_UNORDERED_LATCH_FREE)
~concurrent_table(){
std::cout
<<"version: 2024/02/03 17:30; "
<<"version: 2024/02/09 12:00; "
<<"lf: "<<(double)size()/capacity()<<"; "
<<"capacity: "<<capacity()<<"; "
<<"rehashes: "<<rehashes<<"; "
@ -851,18 +867,15 @@ public:
auto hash=this->hash_for(x);
std::size_t res=0;
unprotected_internal_visit(
group_shared{},x,this->position_for(hash),hash, // NB: shared access
[&,this](group_type* pg,unsigned int n,element_type* p,value_type *pv)
group_exclusive{},x,this->position_for(hash),hash,
[&,this](group_type* pg,unsigned int n,element_type* p)
{
if(f(cast_for(group_shared{},*pv))){
if(p->p.compare_exchange_strong(pv,nullptr)){
pg->reset(n);
auto& sc=local_size_ctrl();
sc.size-=1;
sc.mcos+=!pg->is_not_overflowed(hash);
epoch::get_default_pool<value_type>().Retire(pv);
res=1;
}
if(f(cast_for(group_shared{},type_policy::value_from(*p)))){
pg->reset(n);
auto& sc=local_size_ctrl();
sc.size-=1;
sc.mcos+=!pg->is_not_overflowed(hash);
res=1;
}
});
return res;
@ -1042,29 +1055,15 @@ private:
reentrancy_bichecked<scoped_bilock<multimutex_type>>;
#if defined(BOOST_UNORDERED_LATCH_FREE)
struct group_shared_lock_guard
{
group_shared_lock_guard():id{epoch::internal::get_epoch().announce()}{}
~group_shared_lock_guard(){epoch::internal::get_epoch().unannounce(id);}
int id;
};
struct group_exclusive_lock_guard
{
group_exclusive_lock_guard(group_access::exclusive_lock_guard&& lck_):
id{epoch::internal::get_epoch().announce()},
lck{std::move(lck_)}{}
~group_exclusive_lock_guard(){epoch::internal::get_epoch().unannounce(id);}
int id;
group_access::exclusive_lock_guard lck;
};
struct group_shared_lock_guard{};
using group_exclusive_lock_guard=typename group_access::exclusive_lock_guard;
using group_access_counter_type=typename group_access::access_counter_type;
using group_insert_counter_type=typename group_access::insert_counter_type;
#else
using group_shared_lock_guard=typename group_access::shared_lock_guard;
using group_exclusive_lock_guard=typename group_access::exclusive_lock_guard;
#endif
using group_insert_counter_type=typename group_access::insert_counter_type;
#endif
concurrent_table(const concurrent_table& x,exclusive_lock_guard):
super{x}{}
@ -1129,6 +1128,13 @@ private:
return this->arrays.group_accesses()[pos].exclusive_access();
}
#if defined(BOOST_UNORDERED_LATCH_FREE)
inline group_access_counter_type& access_counter(std::size_t pos)const
{
return this->arrays.group_accesses()[pos].access_counter();
}
#endif
inline group_insert_counter_type& insert_counter(std::size_t pos)const
{
return this->arrays.group_accesses()[pos].insert_counter();
@ -1250,17 +1256,10 @@ private:
GroupAccessMode access_mode,
const Key& x,std::size_t pos0,std::size_t hash,F&& f)const
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
return unprotected_internal_visit(
access_mode,x,pos0,hash,
[&](group_type*,unsigned int,element_type* p,value_type* pv)
{f(cast_for(access_mode,*pv));});
#else
return unprotected_internal_visit(
access_mode,x,pos0,hash,
[&](group_type*,unsigned int,element_type* p)
{f(cast_for(access_mode,type_policy::value_from(*p)));});
#endif
}
/* check occupation with previous unsynced match */
@ -1269,7 +1268,6 @@ private:
static bool is_occupied(group_type* pg,std::size_t pos)
{
return reinterpret_cast<std::atomic<unsigned char>*>(pg)[pos]>1;
//return true;
}
#else
static bool is_occupied(group_type* pg,std::size_t pos)
@ -1284,6 +1282,75 @@ private:
#pragma warning(disable:4800)
#endif
#if defined(BOOST_UNORDERED_LATCH_FREE)
BOOST_FORCEINLINE element_type load_element(
element_type* p,std::size_t pos)const
{
auto& acnt=access_counter(pos);
for(;;){
auto n=acnt.load(std::memory_order_acquire);
if(n%2==1)continue;
auto res=*p;
std::atomic_thread_fence(std::memory_order_acquire);
if(acnt.load(std::memory_order_acquire)!=n)continue;
else return res;
}
}
BOOST_FORCEINLINE void save_element(
element_type* p,const element_type& x,std::size_t pos)const
{
auto& acnt=access_counter(pos);
for(;;){
auto n=acnt.load(std::memory_order_acquire);
if(n%2==1)continue;
if(!acnt.compare_exchange_strong(
n,n+1,std::memory_order_release))continue;
*p=x;
std::atomic_thread_fence(std::memory_order_release);
acnt.store(n+2,std::memory_order_release);
return;
}
}
template<typename GroupAccessMode,typename Key,typename F>
BOOST_FORCEINLINE std::size_t unprotected_internal_visit(
GroupAccessMode access_mode,
const Key& x,std::size_t pos0,std::size_t hash,F&& f)const
{
prober pb(pos0);
do{
auto pos=pb.get();
auto pg=this->arrays.groups()+pos;
auto mask=pg->match(hash);
if(mask){
auto p=this->arrays.elements()+pos*N;
BOOST_UNORDERED_PREFETCH_ELEMENTS(p,N);
do{
auto n=unchecked_countr_zero(mask);
if(BOOST_LIKELY(pg->is_occupied(n))){
auto e=load_element(p+n,pos);
if(BOOST_LIKELY(this->pred()(x,this->key_from(e)))){
f(pg,n,&e);
// TODO: saving part
return 1;
}
}
mask&=mask-1;
}while(mask);
}
if(BOOST_LIKELY(pg->is_not_overflowed(hash))){
return 0;
}
}
while(BOOST_LIKELY(pb.next(this->arrays.groups_size_mask)));
return 0;
}
#else
template<typename GroupAccessMode,typename Key,typename F>
BOOST_FORCEINLINE std::size_t unprotected_internal_visit(
GroupAccessMode access_mode,
@ -1300,10 +1367,10 @@ private:
auto lck=access(access_mode,pos);
do{
auto n=unchecked_countr_zero(mask);
auto pv=p[n].p.load(std::memory_order_relaxed);
if(BOOST_LIKELY(pv&&bool(this->pred()(x,this->key_from(*pv))))){
f(pg,n,p+n,pv);
return 1;
if(BOOST_LIKELY(
pg->is_occupied(n)&&bool(this->pred()(x,this->key_from(p[n]))))){
f(pg,n,p+n);
return 1;
}
mask&=mask-1;
}while(mask);
@ -1315,6 +1382,7 @@ private:
while(BOOST_LIKELY(pb.next(this->arrays.groups_size_mask)));
return 0;
}
#endif
template<typename GroupAccessMode,typename FwdIterator,typename F>
BOOST_FORCEINLINE std::size_t unprotected_bulk_visit(
@ -1585,6 +1653,14 @@ private:
bool commit_=false;
};
struct assign_access_counter_on_exit
{
~assign_access_counter_on_exit(){counter=x;}
group_access_counter_type &counter;
boost::uint32_t x;
};
struct assign_insert_counter_on_exit
{
~assign_insert_counter_on_exit(){counter=x;}
@ -1604,8 +1680,7 @@ private:
for(;;){
startover:
boost::uint32_t counter=0;
while(BOOST_UNLIKELY((counter=insert_counter(pos0))%2==1)){}
boost::uint32_t counter=insert_counter(pos0);
if(unprotected_visit(
access_mode,k,pos0,hash,std::forward<F>(f)))return 0;
@ -1613,6 +1688,7 @@ private:
for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){
auto pos=pb.get();
auto pg=this->arrays.groups()+pos;
auto lck=access(group_exclusive{},pos);
auto mask=pg->match_available();
if(BOOST_LIKELY(mask!=0)){
auto n=unchecked_countr_zero(mask);
@ -1621,20 +1697,19 @@ private:
/* slot wasn't empty */
goto startover;
}
if(BOOST_UNLIKELY(
!insert_counter(pos0).compare_exchange_weak(counter,counter+1))){
if(BOOST_UNLIKELY(insert_counter(pos0)++!=counter)){
/* other thread inserted from pos0, need to start over */
goto startover;
}
{
assign_insert_counter_on_exit a{insert_counter(pos0),counter+2};
auto p=this->arrays.elements()+pos*N+n;
this->construct_element(p,std::forward<Args>(args)...);
pg->set(n,hash);
for(prober pb2(pos0);pb2.get()!=pos;
pb2.next(this->arrays.groups_size_mask)){
this->arrays.groups()[pb2.get()].mark_overflow(hash);
auto p=this->arrays.elements()+pos*N+n;
{
auto acounter=++access_counter(pos);
assign_access_counter_on_exit aa{access_counter(pos),acounter+1};
this->construct_element(p,std::forward<Args>(args)...);
std::atomic_thread_fence(std::memory_order_release);
}
pg->set(n,hash);
}
rslot.commit();
auto& sc=local_size_ctrl();
@ -1643,7 +1718,7 @@ private:
return 1;
}
if(!pbn--)return -1;
//pg->mark_overflow(hash);
pg->mark_overflow(hash);
}
}
}