desynced size control

This commit is contained in:
joaquintides
2023-12-16 11:55:23 +01:00
parent ca7120d928
commit 35138450c8

View File

@ -497,12 +497,27 @@ public:
concurrent_table(
std::size_t n=default_bucket_count,const Hash& h_=Hash(),
const Pred& pred_=Pred(),const Allocator& al_=Allocator()):
#if defined(BOOST_UNORDERED_LATCH_FREE)
super{(std::max)(n,std::size_t(1)),h_,pred_,al_} // TODO: won't work with n==0
#else
super{n,h_,pred_,al_}
#endif
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
using retired_element_allocator_type=
typename boost::allocator_rebind<Allocator,retired_element>::type;
using retired_element_traits=
boost::allocator_traits<retired_element_allocator_type>;
retired_element_allocator_type ral=this->al();
for(std::size_t i=0;i<garbage_vectors.size();++i){
garbage_vectors[i].epoch_bump=(i*251)%garbage_vector::min_for_epoch_bump;
garbage_vectors[i].retired_elements.resize(garbage_vector::N);
auto& v=garbage_vectors[i];
v.epoch_bump=(i*251)%garbage_vector::min_for_epoch_bump;
v.retired_elements=retired_element_traits::allocate(
ral,garbage_vector::N);
for(std::size_t j=0;j<garbage_vector::N;++j){
retired_element_traits::construct(ral,&v.retired_elements[j]);
}
}
#endif
}
@ -539,7 +554,23 @@ public:
concurrent_table(std::move(x),x.make_empty_arrays())
{}
#if defined(BOOST_UNORDERED_LATCH_FREE)
~concurrent_table(){
using retired_element_allocator_type=
typename boost::allocator_rebind<Allocator,retired_element>::type;
using retired_element_traits=
boost::allocator_traits<retired_element_allocator_type>;
retired_element_allocator_type ral=this->al();
for(std::size_t i=0;i<garbage_vectors.size();++i){
auto& v=garbage_vectors[i];
retired_element_traits::deallocate(
ral,v.retired_elements,garbage_vector::N);
}
}
#else
~concurrent_table()=default;
#endif
concurrent_table& operator=(const concurrent_table& x)
{
@ -694,8 +725,18 @@ public:
std::size_t size()const noexcept
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
auto lck=exclusive_access();
std::size_t res=this->size_ctrl.size;
for(std::size_t i=0;i<garbage_vectors.size();++i){
auto &v=garbage_vectors[i];
res+=v.size;
}
return res;
#else
auto lck=shared_access();
return unprotected_size();
#endif
}
using super::max_size;
@ -1104,7 +1145,7 @@ private:
inline group_shared_lock_guard access(group_shared,std::size_t pos)const
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
auto& e=garbage_vectors[thread_id()%garbage_vectors.size()].epoch;
auto& e=local_garbage_vector().epoch;
e=current_epoch.load(std::memory_order_relaxed);
return {e};
#else
@ -1116,7 +1157,7 @@ private:
group_exclusive,std::size_t pos)const
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
auto& e=garbage_vectors[thread_id()%garbage_vectors.size()].epoch;
auto& e=local_garbage_vector().epoch;
e=current_epoch.load(std::memory_order_relaxed);
return {e,this->arrays.group_accesses()[pos].exclusive_access()};
#else
@ -1551,37 +1592,35 @@ private:
if(unprotected_visit(
access_mode,k,pos0,hash,std::forward<F>(f)))return 0;
reserve_size rsize(*this);
if(BOOST_LIKELY(rsize.succeeded())){
for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){
auto pos=pb.get();
auto pg=this->arrays.groups()+pos;
auto mask=pg->match_available();
if(BOOST_LIKELY(mask!=0)){
auto n=unchecked_countr_zero(mask);
unsigned char expected=0;
if(!reinterpret_cast<std::atomic<unsigned char>*>(pg)[n].
compare_exchange_weak(expected,1)){
/* slot wasn't empty */
goto startover;
}
if(BOOST_UNLIKELY(
!insert_counter(pos0).compare_exchange_weak(counter,counter+1))){
/* other thread inserted from pos0, need to start over */
pg->reset(n);
goto startover;
}
auto p=this->arrays.elements()+pos*N+n;
this->construct_element(p,std::forward<Args>(args)...);
pg->set(n,hash);
++insert_counter(pos0);
rsize.commit();
return 1;
std::size_t pbn=max_probe;
for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){
auto pos=pb.get();
auto pg=this->arrays.groups()+pos;
auto mask=pg->match_available();
if(BOOST_LIKELY(mask!=0)){
auto n=unchecked_countr_zero(mask);
unsigned char expected=0;
if(!reinterpret_cast<std::atomic<unsigned char>*>(pg)[n].
compare_exchange_weak(expected,1)){
/* slot wasn't empty */
goto startover;
}
pg->mark_overflow(hash);
if(BOOST_UNLIKELY(
!insert_counter(pos0).compare_exchange_weak(counter,counter+1))){
/* other thread inserted from pos0, need to start over */
pg->reset(n);
goto startover;
}
auto p=this->arrays.elements()+pos*N+n;
this->construct_element(p,std::forward<Args>(args)...);
pg->set(n,hash);
insert_counter(pos0)=counter+2;
++local_garbage_vector().size;
return 1;
}
if(!pbn--)return -1;
pg->mark_overflow(hash);
}
else return -1;
}
}
#else
@ -1630,13 +1669,23 @@ private:
void rehash_if_full()
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
auto lck=exclusive_access();
update_size_ctrl();
if(this->size_ctrl.size>=this->size_ctrl.ml){ // NB >=
garbage_collect();
this->unchecked_rehash_for_growth();
max_probe=default_max_probe;
}
else{
++max_probe;
}
#else
auto lck=exclusive_access();
if(this->size_ctrl.size==this->size_ctrl.ml){
#if defined(BOOST_UNORDERED_LATCH_FREE)
garbage_collect();
#endif
this->unchecked_rehash_for_growth();
}
#endif
}
template<typename GroupAccessMode,typename F>
@ -1887,7 +1936,6 @@ private:
std::atomic<std::size_t> epoch=available_;
std::atomic<std::size_t> pos;
std::atomic<bool> mco;
};
struct garbage_vector
{
@ -1895,16 +1943,27 @@ private:
static constexpr std::size_t min_for_epoch_bump=64;
static constexpr std::size_t min_for_garbage_collection=64;
epoch_type epoch=0;
std::atomic<std::size_t> epoch_bump=0;
std::vector<retired_element> retired_elements;
std::atomic<std::size_t> wpos=0;
std::atomic<std::size_t> rpos=0;
std::atomic<bool> reading=false;
unsigned char pad[16];
using ssize_t=std::make_signed<std::size_t>::type;
epoch_type epoch=0;
std::atomic<std::size_t> epoch_bump=0;
retired_element* retired_elements;
std::atomic<std::size_t> wpos=0;
std::atomic<std::size_t> rpos=0;
std::atomic<bool> reading=false;
std::atomic<ssize_t> size=0;
std::atomic<ssize_t> mcos=0;
};
static constexpr std::size_t default_max_probe=3;
mutable std::array<garbage_vector,128> garbage_vectors;
epoch_type current_epoch=1;
std::size_t max_probe=default_max_probe;
garbage_vector& local_garbage_vector()const
{
return garbage_vectors[thread_id()%garbage_vectors.size()];
}
std::size_t max_safe_epoch()
{
@ -1919,23 +1978,18 @@ private:
BOOST_FORCEINLINE void
retire_element(std::size_t pos,bool mco)
{
auto& v=garbage_vectors[thread_id()%garbage_vectors.size()];
auto& v=local_garbage_vector();
std::size_t wpos=v.wpos;
std::size_t expected=retired_element::available_;
for(;;){
auto& e=v.retired_elements[wpos%v.garbage_vector::N];
if(e.epoch.compare_exchange_strong(expected,retired_element::reserved_)){
e.pos=pos;
e.mco=mco;
e.epoch=v.epoch.load();
++v.wpos;
--this->size_ctrl.ml;
--this->size_ctrl.size;
/* if(wpos-v.rpos>=garbage_vector::min_for_garbage_collection){
v.epoch=++current_epoch;
garbage_collect();
}
else */if(++v.epoch_bump==garbage_vector::min_for_epoch_bump)
--v.size;
v.mcos+=mco;
if(++v.epoch_bump==garbage_vector::min_for_epoch_bump)
{
v.epoch=++current_epoch;
v.epoch_bump=0;
@ -1946,7 +2000,6 @@ private:
if(expected==retired_element::reserved_){ /* other thread wrote */
}
else{ /* vector full */
//std::cout<<"*";
v.epoch=++current_epoch;
garbage_collect();
}
@ -1955,6 +2008,19 @@ private:
}
}
void update_size_ctrl()
{
for(std::size_t i=0;i<garbage_vectors.size();++i){
auto &v=garbage_vectors[i];
this->size_ctrl.size+=v.size;
if(this->size_ctrl.ml>=v.mcos)this->size_ctrl.ml-=v.mcos;
else this->size_ctrl.ml=0;
v.size=0;
v.mcos=0;
}
//std::cout<<"("<<this->size_ctrl.size<<","<<this->size_ctrl.ml<<")";
}
void garbage_collect(garbage_vector& v,std::size_t max_epoch)
{
if(v.rpos==v.wpos)return;
@ -1962,18 +2028,15 @@ private:
bool expected=false;
if(v.reading.compare_exchange_strong(expected,true)){
std::size_t rpos=v.rpos;
std::size_t num_mcos=0;
for(;;){
auto& e=v.retired_elements[rpos%v.garbage_vector::N];
if(e.epoch>max_epoch)break;
//++garbage_collected;
this->destroy_element(this->arrays.elements()+e.pos);
this->arrays.groups()[e.pos/N].reset(e.pos%N);
num_mcos+=e.mco;
e.epoch=retired_element::available_;
++rpos;
}
this->size_ctrl.ml+=(rpos-v.rpos)-num_mcos;
v.rpos=rpos;
v.reading=false;
}