made insertion and constant visitation (but *not* erasure) optionally latch-free

This commit is contained in:
joaquintides
2023-12-02 19:50:36 +01:00
parent f493603f5c
commit 49ff2a63ea

View File

@ -83,6 +83,8 @@ public:
cache_aligned_array(const cache_aligned_array&)=delete;
cache_aligned_array& operator=(const cache_aligned_array&)=delete;
constexpr std::size_t size()const noexcept{return N;}
T& operator[](std::size_t pos)noexcept{return *data(pos);}
private:
@ -149,14 +151,12 @@ template<typename Mutex>
class lock_guard
{
public:
lock_guard(Mutex& m_)noexcept:m(m_){m.lock();}
~lock_guard()noexcept{m.unlock();}
/* not used but VS in pre-C++17 mode needs to see it for RVO */
lock_guard(const lock_guard&);
lock_guard(Mutex& m)noexcept:pm(&m){pm->lock();}
lock_guard(lock_guard&& x)noexcept:pm(x.pm){x.pm=nullptr;}
~lock_guard()noexcept{if(pm)pm->unlock();}
private:
Mutex &m;
Mutex* pm;
};
/* inspired by boost/multi_index/detail/scoped_bilock.hpp */
@ -970,8 +970,30 @@ private:
using exclusive_lock_guard=reentrancy_checked<lock_guard<multimutex_type>>;
using exclusive_bilock_guard=
reentrancy_bichecked<scoped_bilock<multimutex_type>>;
#if defined(BOOST_UNORDERED_LATCH_FREE)
struct group_shared_lock_guard
{
group_shared_lock_guard(std::atomic_int& cs_):cs{cs_}{cs.store(1);}
~group_shared_lock_guard(){cs.store(0);}
std::atomic_int& cs;
};
struct group_exclusive_lock_guard
{
group_exclusive_lock_guard(
std::atomic_int& cs_,group_access::exclusive_lock_guard&& lck_):
cs{cs_},lck{std::move(lck_)}{cs.store(1);}
~group_exclusive_lock_guard(){cs.store(0);}
std::atomic_int& cs;
group_access::exclusive_lock_guard lck;
};
#else
using group_shared_lock_guard=typename group_access::shared_lock_guard;
using group_exclusive_lock_guard=typename group_access::exclusive_lock_guard;
#endif
using group_insert_counter_type=typename group_access::insert_counter_type;
concurrent_table(const concurrent_table& x,exclusive_lock_guard):
@ -985,9 +1007,15 @@ private:
concurrent_table&& x,const Allocator& al_,exclusive_lock_guard):
super{std::move(x),al_}{}
static inline std::size_t thread_id()
{
thread_local auto id=(++thread_counter);
return id;
}
inline shared_lock_guard shared_access()const
{
thread_local auto id=(++thread_counter)%mutexes.size();
thread_local auto id=thread_id()%mutexes.size();
return shared_lock_guard{this,mutexes[id]};
}
@ -1018,13 +1046,24 @@ private:
inline group_shared_lock_guard access(group_shared,std::size_t pos)const
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
return {csections[thread_id()%csections.size()]};
#else
return this->arrays.group_accesses()[pos].shared_access();
#endif
}
inline group_exclusive_lock_guard access(
group_exclusive,std::size_t pos)const
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
return {
csections[thread_id()%csections.size()],
this->arrays.group_accesses()[pos].exclusive_access()
};
#else
return this->arrays.group_accesses()[pos].exclusive_access();
#endif
}
inline group_insert_counter_type& insert_counter(std::size_t pos)const
@ -1436,6 +1475,55 @@ private:
bool commit_=false;
};
#if defined(BOOST_UNORDERED_LATCH_FREE)
template<typename GroupAccessMode,typename F,typename... Args>
BOOST_FORCEINLINE int
unprotected_norehash_emplace_or_visit(
GroupAccessMode access_mode,F&& f,Args&&... args)
{
const auto &k=this->key_from(std::forward<Args>(args)...);
auto hash=this->hash_for(k);
auto pos0=this->position_for(hash);
for(;;){
startover:
boost::uint32_t counter=insert_counter(pos0);
if(unprotected_visit(
access_mode,k,pos0,hash,std::forward<F>(f)))return 0;
reserve_size rsize(*this);
if(BOOST_LIKELY(rsize.succeeded())){
for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){
auto pos=pb.get();
auto pg=this->arrays.groups()+pos;
auto mask=pg->match_available();
if(BOOST_LIKELY(mask!=0)){
auto n=unchecked_countr_zero(mask);
unsigned char expected=0;
if(!reinterpret_cast<std::atomic<unsigned char>*>(pg)[n].
compare_exchange_weak(expected,1)){
/* slot wasn't empty */
goto startover;
}
wait_for_csections(); // WHY BEFORE THE FOLLOWING?
if(BOOST_UNLIKELY(insert_counter(pos0)++!=counter)){
/* other thread inserted from pos0, need to start over */
pg->reset(n);
goto startover;
}
auto p=this->arrays.elements()+pos*N+n;
this->construct_element(p,std::forward<Args>(args)...);
pg->set(n,hash);
rsize.commit();
return 1;
}
pg->mark_overflow(hash);
}
}
else return -1;
}
}
#else
template<typename GroupAccessMode,typename F,typename... Args>
BOOST_FORCEINLINE int
unprotected_norehash_emplace_or_visit(
@ -1477,6 +1565,7 @@ private:
else return -1;
}
}
#endif
void rehash_if_full()
{
@ -1724,6 +1813,18 @@ private:
static std::atomic<std::size_t> thread_counter;
mutable multimutex_type mutexes;
#if defined(BOOST_UNORDERED_LATCH_FREE)
mutable cache_aligned_array<
std::atomic_int,128> csections;
void wait_for_csections()const
{
for(std::size_t i=0;i<csections.size();++i){
while(csections[i].load(std::memory_order_acquire)){}
}
}
#endif
};
template<typename T,typename H,typename P,typename A>