used parlay epoch framework

This commit is contained in:
joaquintides
2024-02-03 17:37:08 +01:00
parent 1c48ce8194
commit 687a446784

View File

@ -54,6 +54,7 @@
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
#include <iostream> #include <iostream>
#include "utils/epoch.h"
#endif #endif
std::atomic<std::size_t> nodes_wasted=0; std::atomic<std::size_t> nodes_wasted=0;
@ -503,27 +504,7 @@ public:
#else #else
super{n,h_,pred_,al_} super{n,h_,pred_,al_}
#endif #endif
{ {}
#if defined(BOOST_UNORDERED_LATCH_FREE)
using retired_element_allocator_type=
typename boost::allocator_rebind<Allocator,retired_element>::type;
using retired_element_traits=
boost::allocator_traits<retired_element_allocator_type>;
retired_element_allocator_type ral=this->al();
for(std::size_t i=0;i<garbage_vectors.size();++i){
auto& v=garbage_vectors[i];
v.epoch_bump=0;
v.retired_elements=retired_element_traits::allocate(
ral,garbage_vector::N);
for(std::size_t j=0;j<garbage_vector::N;++j){
retired_element_traits::construct(ral,&v.retired_elements[j]);
}
}
nodes_wasted=0;
#endif
}
concurrent_table(const concurrent_table& x): concurrent_table(const concurrent_table& x):
concurrent_table(x,x.exclusive_access()){} concurrent_table(x,x.exclusive_access()){}
@ -559,25 +540,12 @@ public:
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
~concurrent_table(){ ~concurrent_table(){
using retired_element_allocator_type=
typename boost::allocator_rebind<Allocator,retired_element>::type;
using retired_element_traits=
boost::allocator_traits<retired_element_allocator_type>;
retired_element_allocator_type ral=this->al();
for(std::size_t i=0;i<garbage_vectors.size();++i){
auto& v=garbage_vectors[i];
retired_element_traits::deallocate(
ral,v.retired_elements,garbage_vector::N);
}
std::cout std::cout
<<"version: 2024/02/03 19:10; " <<"version: 2024/02/03 17:30; "
<<"lf: "<<(double)size()/capacity()<<"; " <<"lf: "<<(double)size()/capacity()<<"; "
<<"capacity: "<<capacity()<<"; " <<"capacity: "<<capacity()<<"; "
<<"rehashes: "<<rehashes<<"; " <<"rehashes: "<<rehashes<<"; "
<<"max probe:"<<max_probe<<", " <<"max probe:"<<max_probe<<"\n";
<<"nodes wasted:"<<nodes_wasted<<"\n";
} }
#else #else
~concurrent_table()=default; ~concurrent_table()=default;
@ -739,9 +707,8 @@ public:
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
auto lck=exclusive_access(); auto lck=exclusive_access();
std::size_t res=this->size_ctrl.size; std::size_t res=this->size_ctrl.size;
for(std::size_t i=0;i<garbage_vectors.size();++i){ for(const auto& sc:local_size_ctrls){
auto &v=garbage_vectors[i]; res+=sc.size;
res+=v.size;
} }
return res; return res;
#else #else
@ -890,7 +857,10 @@ public:
if(f(cast_for(group_shared{},*pv))){ if(f(cast_for(group_shared{},*pv))){
if(p->p.compare_exchange_strong(pv,nullptr)){ if(p->p.compare_exchange_strong(pv,nullptr)){
pg->reset(n); pg->reset(n);
retire_element(pv,!pg->is_not_overflowed(hash)); auto& sc=local_size_ctrl();
sc.size-=1;
sc.mcos+=!pg->is_not_overflowed(hash);
epoch::get_default_pool<value_type>().Retire(pv);
res=1; res=1;
} }
} }
@ -1071,26 +1041,22 @@ private:
using exclusive_bilock_guard= using exclusive_bilock_guard=
reentrancy_bichecked<scoped_bilock<multimutex_type>>; reentrancy_bichecked<scoped_bilock<multimutex_type>>;
#if defined(BOOST_UNORDERED_LATCH_FREE)
using epoch_type=std::atomic<std::size_t>;
#endif
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
struct group_shared_lock_guard struct group_shared_lock_guard
{ {
group_shared_lock_guard(epoch_type& e_):e{e_}{} group_shared_lock_guard():id{epoch::internal::get_epoch().announce()}{}
~group_shared_lock_guard(){e=std::size_t(-1);} ~group_shared_lock_guard(){epoch::internal::get_epoch().unannounce(id);}
epoch_type& e; int id;
}; };
struct group_exclusive_lock_guard struct group_exclusive_lock_guard
{ {
group_exclusive_lock_guard( group_exclusive_lock_guard(group_access::exclusive_lock_guard&& lck_):
epoch_type& e_,group_access::exclusive_lock_guard&& lck_): id{epoch::internal::get_epoch().announce()},
e{e_},lck{std::move(lck_)}{} lck{std::move(lck_)}{}
~group_exclusive_lock_guard(){e=std::size_t(-1);} ~group_exclusive_lock_guard(){epoch::internal::get_epoch().unannounce(id);}
epoch_type& e; int id;
group_access::exclusive_lock_guard lck; group_access::exclusive_lock_guard lck;
}; };
#else #else
@ -1151,9 +1117,7 @@ private:
inline group_shared_lock_guard access(group_shared,std::size_t pos)const inline group_shared_lock_guard access(group_shared,std::size_t pos)const
{ {
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
auto& e=local_garbage_vector().epoch; return {};
e=current_epoch.load(std::memory_order_relaxed);
return {e};
#else #else
return this->arrays.group_accesses()[pos].shared_access(); return this->arrays.group_accesses()[pos].shared_access();
#endif #endif
@ -1162,13 +1126,7 @@ private:
inline group_exclusive_lock_guard access( inline group_exclusive_lock_guard access(
group_exclusive,std::size_t pos)const group_exclusive,std::size_t pos)const
{ {
#if defined(BOOST_UNORDERED_LATCH_FREE)
auto& e=local_garbage_vector().epoch;
e=current_epoch.load(std::memory_order_relaxed);
return {e,this->arrays.group_accesses()[pos].exclusive_access()};
#else
return this->arrays.group_accesses()[pos].exclusive_access(); return this->arrays.group_accesses()[pos].exclusive_access();
#endif
} }
inline group_insert_counter_type& insert_counter(std::size_t pos)const inline group_insert_counter_type& insert_counter(std::size_t pos)const
@ -1670,10 +1628,8 @@ private:
} }
{ {
assign_insert_counter_on_exit a{insert_counter(pos0),counter+2}; assign_insert_counter_on_exit a{insert_counter(pos0),counter+2};
auto p=this->arrays.elements()+pos*N+n; auto p=this->arrays.elements()+pos*N+n;
auto pv=new_element(); this->construct_element(p,std::forward<Args>(args)...);
new (pv) value_type(std::forward<Args>(args)...);
p->p=pv;
pg->set(n,hash); pg->set(n,hash);
for(prober pb2(pos0);pb2.get()!=pos; for(prober pb2(pos0);pb2.get()!=pos;
pb2.next(this->arrays.groups_size_mask)){ pb2.next(this->arrays.groups_size_mask)){
@ -1681,9 +1637,9 @@ private:
} }
} }
rslot.commit(); rslot.commit();
auto& v=local_garbage_vector(); auto& sc=local_size_ctrl();
++v.size; sc.size+=1;
v.mcos-=!pg->is_not_overflowed(hash); sc.mcos-=!pg->is_not_overflowed(hash);
return 1; return 1;
} }
if(!pbn--)return -1; if(!pbn--)return -1;
@ -1744,7 +1700,6 @@ private:
if(p.first>=p.second){ // NB >= if(p.first>=p.second){ // NB >=
auto lck=exclusive_access(); auto lck=exclusive_access();
update_size_ctrl(); update_size_ctrl();
garbage_collect();
++rehashes; ++rehashes;
this->unchecked_rehash_for_growth(); this->unchecked_rehash_for_growth();
max_probe=default_max_probe; max_probe=default_max_probe;
@ -2000,118 +1955,22 @@ private:
mutable multimutex_type mutexes; mutable multimutex_type mutexes;
#if defined(BOOST_UNORDERED_LATCH_FREE) #if defined(BOOST_UNORDERED_LATCH_FREE)
struct retired_element{ struct alignas(64) local_size_ctrl_type
static constexpr std::size_t available_=std::size_t(-1),
reserved_=std::size_t(-2);
retired_element()=default;
retired_element(const retired_element&){}
std::atomic<std::size_t> epoch=available_;
std::atomic<value_type*> p={};
};
struct alignas(64) garbage_vector
{ {
static constexpr std::size_t N=256*16;
static constexpr std::size_t min_for_epoch_bump=16;
static constexpr std::size_t min_for_garbage_collection=64;
using ssize_t=std::make_signed<std::size_t>::type; using ssize_t=std::make_signed<std::size_t>::type;
epoch_type epoch=std::size_t(-1);
std::atomic<std::size_t> epoch_bump=0;
retired_element* retired_elements;
std::atomic<std::size_t> wpos=0;
std::atomic<std::size_t> rpos=0;
//std::atomic<std::size_t> apos=0;
std::atomic<ssize_t> size=0; std::atomic<ssize_t> size=0;
std::atomic<ssize_t> mcos=0; std::atomic<ssize_t> mcos=0;
}; };
static constexpr std::size_t default_max_probe=3; static constexpr std::size_t default_max_probe=3;
mutable std::array<garbage_vector,128> garbage_vectors; mutable std::array<local_size_ctrl_type,128> local_size_ctrls;
epoch_type current_epoch=1; std::atomic<std::size_t> max_probe=default_max_probe;
unsigned char pad_[cacheline_size-sizeof(epoch_type)]; std::size_t rehashes=0;
std::atomic<std::size_t> max_probe=default_max_probe;
std::size_t rehashes=0;
garbage_vector& local_garbage_vector()const local_size_ctrl_type& local_size_ctrl()const
{ {
return garbage_vectors[thread_id()%garbage_vectors.size()]; return local_size_ctrls[thread_id()%local_size_ctrls.size()];
}
std::size_t max_safe_epoch()
{
std::size_t e=retired_element::reserved_;
for(std::size_t i=0;i<garbage_vectors.size();++i){
std::size_t le=garbage_vectors[i].epoch.load(std::memory_order_relaxed);
if(le<e)e=le;
}
return e-1;
}
bool everybody_reached_epoch(std::size_t e)const
{
for(std::size_t i=0;i<garbage_vectors.size();++i){
if(garbage_vectors[i].epoch.load(std::memory_order_relaxed)<e)return false;
}
return true;
}
BOOST_FORCEINLINE void retire_element(value_type* p,bool mco)
{
auto& v=local_garbage_vector();
if(++v.epoch_bump%garbage_vector::min_for_epoch_bump==0){
auto ce=current_epoch.load(std::memory_order_relaxed);
if(everybody_reached_epoch(ce)&&
current_epoch.compare_exchange_strong(ce,ce+1)){
v.epoch=ce+1;
}
}
--v.size;
v.mcos+=mco;
for(;;){
std::size_t wpos=v.wpos;
std::size_t expected=retired_element::available_;
auto& e=v.retired_elements[wpos%v.garbage_vector::N];
if(e.epoch.compare_exchange_strong(expected,retired_element::reserved_)){
e.p=p;
//p=e.p.exchange(p);
//if(p){
// ++nodes_wasted;
// element_type x{p};
// this->destroy_element(&x);
// ++v.apos;
//}
e.epoch=v.epoch.load();
if(++v.wpos%garbage_vector::min_for_garbage_collection==0){
garbage_collect(v,max_safe_epoch());
}
return;
}
if(expected==retired_element::reserved_){ /* other thread wrote */
}
else{ /* vector full */
v.epoch=current_epoch.load();
garbage_collect(v,max_safe_epoch());
}
}
}
BOOST_FORCEINLINE value_type* new_element()
{
//auto& v=local_garbage_vector();
//for(;;){
// std::size_t apos=v.apos;
// if(apos>=v.rpos)break; /*`no available elements*/
// auto pv=v.retired_elements[apos%v.garbage_vector::N].p.load();
// if(v.apos.compare_exchange_weak(apos,apos+1)){
// v.retired_elements[apos%v.garbage_vector::N].p=nullptr;
// return pv;
// }
//}
/* allocate */
return boost::allocator_allocate(this->al(),1);
} }
std::pair<std::size_t,std::size_t> calculate_size_ctrl() std::pair<std::size_t,std::size_t> calculate_size_ctrl()
@ -2119,10 +1978,9 @@ private:
using ssize_t=std::make_signed<std::size_t>::type; using ssize_t=std::make_signed<std::size_t>::type;
ssize_t ssize=0,smcos=0; ssize_t ssize=0,smcos=0;
for(std::size_t i=0;i<garbage_vectors.size();++i){ for(const auto& sc:local_size_ctrls){
auto &v=garbage_vectors[i]; ssize+=sc.size.load(std::memory_order_relaxed);
ssize+=v.size.load(std::memory_order_relaxed); smcos+=sc.mcos.load(std::memory_order_relaxed);
smcos+=v.mcos.load(std::memory_order_relaxed);
} }
std::size_t size_=this->size_ctrl.size.load(std::memory_order_relaxed), std::size_t size_=this->size_ctrl.size.load(std::memory_order_relaxed),
ml_=this->size_ctrl.ml.load(std::memory_order_relaxed); ml_=this->size_ctrl.ml.load(std::memory_order_relaxed);
@ -2139,10 +1997,9 @@ private:
using ssize_t=std::make_signed<std::size_t>::type; using ssize_t=std::make_signed<std::size_t>::type;
ssize_t ssize=0,smcos=0; ssize_t ssize=0,smcos=0;
for(std::size_t i=0;i<garbage_vectors.size();++i){ for(auto& sc:local_size_ctrls){
auto &v=garbage_vectors[i]; ssize+=sc.size.exchange(0);
ssize+=v.size.exchange(0); smcos+=sc.mcos.exchange(0);
smcos+=v.mcos.exchange(0);
} }
this->size_ctrl.size+=ssize; this->size_ctrl.size+=ssize;
if(ssize_t(this->size_ctrl.ml)>=smcos)this->size_ctrl.ml-=smcos; if(ssize_t(this->size_ctrl.ml)>=smcos)this->size_ctrl.ml-=smcos;
@ -2150,30 +2007,6 @@ private:
auto max_ml=super::initial_max_load(); auto max_ml=super::initial_max_load();
if(this->size_ctrl.ml>max_ml)this->size_ctrl.ml=max_ml; if(this->size_ctrl.ml>max_ml)this->size_ctrl.ml=max_ml;
} }
void garbage_collect(garbage_vector& v,std::size_t max_epoch)
{
if(v.rpos==v.wpos)return;
std::size_t rpos=v.rpos;
for(;;){
auto& e=v.retired_elements[rpos%v.garbage_vector::N];
if(e.epoch>max_epoch)break;
element_type x{e.p};
this->destroy_element(&x);
e.epoch=retired_element::available_;
++rpos;
}
v.rpos=rpos;
}
BOOST_NOINLINE void garbage_collect()
{
auto max_epoch=max_safe_epoch();
for(std::size_t i=0;i<garbage_vectors.size();++i){
garbage_collect(garbage_vectors[i],max_epoch);
}
}
#endif #endif
}; };