completed concurrent_table's API

This commit is contained in:
joaquintides
2023-03-17 16:58:57 +01:00
committed by Christian Mazakas
parent ca59ed8c50
commit 3aaefdcc78
2 changed files with 181 additions and 50 deletions

View File

@ -26,15 +26,25 @@
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#if !defined(BOOST_NO_CXX17_HDR_EXECUTION)
#include <algorithm>
#endif
namespace boost{ namespace boost{
namespace unordered{ namespace unordered{
namespace detail{ namespace detail{
namespace foa{ namespace foa{
// TODO: use std::hardware_destructive_interference_size when available
template<typename T> template<typename T>
struct alignas(64) cacheline_protected:T struct
#if defined(__cpp_lib_hardware_interference_size)
alignas(std::hardware_destructive_interference_size)
#else
alignas(64)
#endif
cacheline_protected:T
{ {
using T::T; using T::T;
}; };
@ -144,8 +154,8 @@ struct group_access
using shared_lock_guard=shared_lock<mutex_type>; using shared_lock_guard=shared_lock<mutex_type>;
using exclusive_lock_guard=lock_guard<mutex_type>; using exclusive_lock_guard=lock_guard<mutex_type>;
shared_lock_guard shared_access(){return m;} shared_lock_guard shared_access(){return shared_lock_guard{m};}
exclusive_lock_guard exclusive_access(){return m;} exclusive_lock_guard exclusive_access(){return exclusive_lock_guard{m};}
std::atomic_uint32_t& insert_counter(){return cnt;} std::atomic_uint32_t& insert_counter(){return cnt;}
private: private:
@ -187,6 +197,18 @@ class concurrent_table:concurrent_table_core_impl<TypePolicy,Hash,Pred,Allocator
using super::N; using super::N;
using prober=typename super::prober; using prober=typename super::prober;
#if !defined(BOOST_NO_CXX17_HDR_EXECUTION)
template<typename ExecutionPolicy>
using is_execution_policy=std::is_execution_policy<
typename std::remove_cv<
typename std::remove_reference<ExecutionPolicy>::type
>::type
>;
#else
template<typename ExecutionPolicy>
using is_execution_policy=std::false_type;
#endif
public: public:
using key_type=typename super::key_type; using key_type=typename super::key_type;
using init_type=typename super::init_type; using init_type=typename super::init_type;
@ -248,7 +270,41 @@ public:
visit(x,[&](const value_type& v){f(v);}); visit(x,[&](const value_type& v){f(v);});
} }
// TODO: visit_all template<typename F> std::size_t visit_all(F f)
{
auto lck=shared_access();
std::size_t res=0;
this->for_all_elements([&](element_type* p){
f(type_policy::value_from(*p));
++res;
});
return res;
}
template<typename F> std::size_t visit_all(F f)const
{
return const_cast<concurrent_table*>(this)->
visit_all([&](const value_type& v){f(v);});
}
#if !defined(BOOST_NO_CXX17_HDR_EXECUTION)
template<typename ExecutionPolicy,typename F>
void visit_all(ExecutionPolicy&& policy,F f)
{
auto lck=shared_access();
for_all_elements_exec(
std::forward<ExecutionPolicy>(policy),
[&](element_type* p){f(type_policy::value_from(*p));});
}
template<typename ExecutionPolicy,typename F>
void visit_all(ExecutionPolicy&& policy,F f)const
{
return const_cast<concurrent_table*>(this)->
visit_all(
std::forward<ExecutionPolicy>(policy),[&](const value_type& v){f(v);});
}
#endif
bool empty()const noexcept{return size()==0;} bool empty()const noexcept{return size()==0;}
@ -298,12 +354,6 @@ public:
f,try_emplace_args_t{},std::forward<Key>(x),std::forward<Args>(args)...); f,try_emplace_args_t{},std::forward<Key>(x),std::forward<Args>(args)...);
} }
template<typename Key>
BOOST_FORCEINLINE std::size_t erase(Key&& x)
{
return erase_if(std::forward<Key>(x),[](const value_type&){return true;});
}
template<typename F,typename... Args> template<typename F,typename... Args>
BOOST_FORCEINLINE bool emplace_or_visit(F f,Args&&... args) BOOST_FORCEINLINE bool emplace_or_visit(F f,Args&&... args)
{ {
@ -328,8 +378,15 @@ public:
BOOST_FORCEINLINE bool insert_or_visit(value_type&& x,F f) BOOST_FORCEINLINE bool insert_or_visit(value_type&& x,F f)
{return emplace_or_visit_impl(f,std::move(x));} {return emplace_or_visit_impl(f,std::move(x));}
template<typename Key>
BOOST_FORCEINLINE std::size_t erase(Key&& x)
{
return erase_if(std::forward<Key>(x),[](const value_type&){return true;});
}
template<typename Key,typename F> template<typename Key,typename F>
BOOST_FORCEINLINE std::size_t erase_if(Key&& x,F f) BOOST_FORCEINLINE auto erase_if(Key&& x,F f)->typename std::enable_if<
!is_execution_policy<Key>::value,std::size_t>::type
{ {
auto lck=shared_access(); auto lck=shared_access();
auto hash=this->hash_for(x); auto hash=this->hash_for(x);
@ -343,6 +400,30 @@ public:
}); });
} }
template<typename F>
std::size_t erase_if(F&& f)
{
auto lck=shared_access();
return super::erase_if_impl(std::forward<F>(f));
}
#if !defined(BOOST_NO_CXX17_HDR_EXECUTION)
template<typename ExecutionPolicy,typename F>
auto erase_if(ExecutionPolicy&& policy,F f)->typename std::enable_if<
is_execution_policy<ExecutionPolicy>::value,void>::type
{
auto lck=shared_access();
for_all_elements_exec(
std::forward<ExecutionPolicy>(policy),
[&,this](group_type* pg,unsigned int n,element_type* p)
{
if(f(const_cast<const value_type&>(type_policy::value_from(*p)))){
super::erase(pg,n,p);
}
});
}
#endif
void swap(concurrent_table& x) void swap(concurrent_table& x)
noexcept(noexcept(std::declval<super&>().swap(std::declval<super&>()))) noexcept(noexcept(std::declval<super&>().swap(std::declval<super&>())))
{ {
@ -356,20 +437,20 @@ public:
super::clear(); super::clear();
} }
#if 0
// TODO: should we accept different allocator too? // TODO: should we accept different allocator too?
template<typename Hash2,typename Pred2> template<typename Hash2,typename Pred2>
void merge(table<TypePolicy,Hash2,Pred2,Allocator>& x) void merge(concurrent_table<TypePolicy,Hash2,Pred2,Allocator>& x)
{ {
// TODO: consider grabbing shared access on *this at this level
auto lck=x.shared_access(); // TODO: can deadlock if x1.merge(x2) while x2.merge(x1)
x.for_all_elements([&,this](group_type* pg,unsigned int n,element_type* p){ x.for_all_elements([&,this](group_type* pg,unsigned int n,element_type* p){
erase_on_exit e{x,{pg,n,p}}; erase_on_exit e{x,pg,n,p};
if(!emplace_impl(type_policy::move(*p)).second)e.rollback(); if(!emplace_impl(type_policy::move(*p)))e.rollback();
}); });
} }
template<typename Hash2,typename Pred2> template<typename Hash2,typename Pred2>
void merge(table<TypePolicy,Hash2,Pred2,Allocator>&& x){merge(x);} void merge(concurrent_table<TypePolicy,Hash2,Pred2,Allocator>&& x){merge(x);}
#endif
hasher hash_function()const hasher hash_function()const
{ {
@ -427,11 +508,10 @@ public:
super::reserve(n); super::reserve(n);
} }
// TODO: rewrite
template<typename Predicate> template<typename Predicate>
friend std::size_t erase_if(concurrent_table& x,Predicate pr) friend std::size_t erase_if(concurrent_table& x,Predicate pr)
{ {
return x.erase_if_impl(pr); return x.erase_if(pr);
} }
private: private:
@ -454,6 +534,57 @@ private:
concurrent_table&& x,const Allocator& al_,exclusive_lock_guard): concurrent_table&& x,const Allocator& al_,exclusive_lock_guard):
super{std::move(x),al_}{} super{std::move(x),al_}{}
shared_lock_guard shared_access()const
{
// TODO: make this more sophisticated (even distribution)
thread_local auto id=(++thread_counter)%mutexes.size();
return shared_lock_guard{mutexes[id]};
}
exclusive_lock_guard exclusive_access()const
{
return exclusive_lock_guard{mutexes};
}
exclusive_bilock_guard exclusive_access(
const concurrent_table& x,const concurrent_table& y)
{
return {x.mutexes,y.mutexes};
}
group_shared_lock_guard shared_access(std::size_t pos)const
{
return this->arrays.groups[pos].shared_access();
}
group_exclusive_lock_guard exclusive_access(std::size_t pos)const
{
return this->arrays.groups[pos].exclusive_access();
}
std::atomic_uint32_t& insert_counter(std::size_t pos)const
{
return this->arrays.groups[pos].insert_counter();
}
struct erase_on_exit
{
erase_on_exit(
concurrent_table& x_,
group_type* pg_,unsigned int pos_,element_type* p_):
x{x_},pg{pg_},pos{pos_},p{p_}{}
~erase_on_exit(){if(!rollback_)x.super::erase(pg,pos,p);}
void rollback(){rollback_=true;}
concurrent_table &x;
group_type *pg;
unsigned int pos;
element_type *p;
bool rollback_=false;
};
template<typename Key,typename F> template<typename Key,typename F>
BOOST_FORCEINLINE std::size_t unprotected_visit( BOOST_FORCEINLINE std::size_t unprotected_visit(
const Key& x,std::size_t pos0,std::size_t hash,F&& f)const const Key& x,std::size_t pos0,std::size_t hash,F&& f)const
@ -651,40 +782,40 @@ private:
if(this->size_==this->ml)super::rehash(super::capacity()+1); if(this->size_==this->ml)super::rehash(super::capacity()+1);
} }
shared_lock_guard shared_access()const #if !defined(BOOST_NO_CXX17_HDR_EXECUTION)
template<typename ExecutionPolicy,typename F>
auto for_all_elements_exec(ExecutionPolicy&& policy,F f)
->decltype(f(nullptr),void())
{ {
// TODO: make this more sophisticated (even distribution) for_all_elements_exec(
thread_local auto id=(++thread_counter)%mutexes.size(); std::forward<ExecutionPolicy>(policy),
[&](group_type* pg,unsigned int n,element_type* p){f(p);});
return mutexes[id];
} }
exclusive_lock_guard exclusive_access()const template<typename ExecutionPolicy,typename F>
auto for_all_elements_exec(ExecutionPolicy&& policy,F f)
->decltype(f(nullptr,0,nullptr),void())
{ {
return mutexes; auto lck=shared_access();
} auto first=this->arrays.groups,
last=first+this->arrays.groups_size_mask+1;
exclusive_bilock_guard exclusive_access( std::for_each(std::forward<ExecutionPolicy>(policy),first,last,
const concurrent_table& x,const concurrent_table& y) [&,this](group_type& g){
{ std::size_t pos=&g-first;
return {x.mutexes,y.mutexes}; auto p=this->arrays.elements+pos*N;
} auto lck=exclusive_access(pos);
auto mask=g.match_really_occupied();
group_shared_lock_guard shared_access(std::size_t pos)const while(mask){
{ auto n=unchecked_countr_zero(mask);
return this->arrays.groups[pos].shared_access(); f(&g,n,p+n);
} mask&=mask-1;
}
group_exclusive_lock_guard exclusive_access(std::size_t pos)const }
{ );
return this->arrays.groups[pos].exclusive_access();
}
std::atomic_uint32_t& insert_counter(std::size_t pos)const
{
return this->arrays.groups[pos].insert_counter();
} }
#endif
/* TODO: thread_counter should be static */
mutable std::atomic_uint thread_counter=0; mutable std::atomic_uint thread_counter=0;
mutable multimutex_type mutexes; mutable multimutex_type mutexes;
}; };

View File

@ -1585,7 +1585,7 @@ public:
} }
template<typename Predicate> template<typename Predicate>
std::size_t erase_if_impl(Predicate pr) std::size_t erase_if_impl(Predicate&& pr)
{ {
std::size_t s=size(); std::size_t s=size();
for_all_elements([&,this](group_type* pg,unsigned int n,element_type* p){ for_all_elements([&,this](group_type* pg,unsigned int n,element_type* p){