implemented garbage collection

This commit is contained in:
joaquintides
2023-12-11 17:37:35 +01:00
parent 0f2a2a0b0f
commit 901d652fd7

View File

@ -50,6 +50,13 @@
#include <execution>
#endif
#if defined(BOOST_UNORDERED_LATCH_FREE)
#include <algorithm>
#include <vector>
#endif
std::atomic<std::size_t> garbage_collected=0;
namespace boost{
namespace unordered{
namespace detail{
@ -481,7 +488,14 @@ public:
std::size_t n=default_bucket_count,const Hash& h_=Hash(),
const Pred& pred_=Pred(),const Allocator& al_=Allocator()):
super{n,h_,pred_,al_}
{}
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
for(std::size_t i=0;i<garbage_vectors.size();++i){
garbage_vectors[i].epoch_bump=(i*251)%garbage_vector::min_for_epoch_bump;
garbage_vectors[i].retired_elements.resize(garbage_vector::N);
}
#endif
}
concurrent_table(const concurrent_table& x):
concurrent_table(x,x.exclusive_access()){}
@ -808,20 +822,18 @@ public:
auto hash=this->hash_for(x);
std::size_t res=0;
unprotected_internal_visit(
group_shared{},x,this->position_for(hash),hash,
group_shared{},x,this->position_for(hash),hash, // NB: shared access
[&,this](group_type* pg,unsigned int n,element_type* p)
{
auto expected=group_type::reduced_hash(hash);
if(reinterpret_cast<std::atomic<unsigned char>*>(pg)[n].
compare_exchange_strong(expected,1)){
if(f(cast_for(group_shared{},type_policy::value_from(*p)))){
wait_for_epochs();
super::erase(pg,n,p);
if(f(cast_for(group_shared{},type_policy::value_from(*p)))){
auto expected=group_type::reduced_hash(hash); // TODO: prove no ABA
auto pc=reinterpret_cast<unsigned char*>(pg)+n;
auto mco=group_type::maybe_caused_overflow(pc);
if(reinterpret_cast<std::atomic<unsigned char>*>(pg)[n].
compare_exchange_strong(expected,1)){
retire_element(static_cast<std::size_t>(p-this->arrays.elements()),mco);
res=1;
}
else{
pg->set(n,expected);
}
}
});
return res;
@ -1002,14 +1014,13 @@ private:
#if defined(BOOST_UNORDERED_LATCH_FREE)
using epoch_type=std::atomic<std::size_t>;
using epoch_array=cache_aligned_array<epoch_type,128>; // TODO: adapt 128 to the machine
#endif
#if defined(BOOST_UNORDERED_LATCH_FREE)
struct group_shared_lock_guard
{
group_shared_lock_guard(epoch_type& e_):e{e_}{++e;}
~group_shared_lock_guard(){--e;}
group_shared_lock_guard(epoch_type& e_):e{e_}{}
~group_shared_lock_guard(){e=0;}
epoch_type& e;
};
@ -1017,8 +1028,8 @@ private:
{
group_exclusive_lock_guard(
epoch_type& e_,group_access::exclusive_lock_guard&& lck_):
e{e_},lck{std::move(lck_)}{++e;}
~group_exclusive_lock_guard(){--e;}
e{e_},lck{std::move(lck_)}{}
~group_exclusive_lock_guard(){e=0;}
epoch_type& e;
group_access::exclusive_lock_guard lck;
@ -1081,7 +1092,9 @@ private:
inline group_shared_lock_guard access(group_shared,std::size_t pos)const
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
return {epochs[thread_id()%epochs.size()]};
auto& e=garbage_vectors[thread_id()%garbage_vectors.size()].epoch;
e=current_epoch.load(std::memory_order_relaxed);
return {e};
#else
return this->arrays.group_accesses()[pos].shared_access();
#endif
@ -1091,10 +1104,9 @@ private:
group_exclusive,std::size_t pos)const
{
#if defined(BOOST_UNORDERED_LATCH_FREE)
return {
epochs[thread_id()%epochs.size()],
this->arrays.group_accesses()[pos].exclusive_access()
};
auto& e=garbage_vectors[thread_id()%garbage_vectors.size()].epoch;
e=current_epoch.load(std::memory_order_relaxed);
return {e,this->arrays.group_accesses()[pos].exclusive_access()};
#else
return this->arrays.group_accesses()[pos].exclusive_access();
#endif
@ -1605,6 +1617,9 @@ private:
{
auto lck=exclusive_access();
if(this->size_ctrl.size==this->size_ctrl.ml){
#if defined(BOOST_UNORDERED_LATCH_FREE)
garbage_collect();
#endif
this->unchecked_rehash_for_growth();
}
}
@ -1849,12 +1864,111 @@ private:
mutable multimutex_type mutexes;
#if defined(BOOST_UNORDERED_LATCH_FREE)
mutable epoch_array epochs;
struct retired_element{
static constexpr std::size_t available_=std::size_t(-1),
reserved_=std::size_t(-2);
retired_element()=default;
retired_element(const retired_element&){}
void wait_for_epochs()
std::atomic<std::size_t> epoch=available_;
std::atomic<std::size_t> pos;
std::atomic<bool> mco;
};
struct garbage_vector
{
for(std::size_t i=0;i<epochs.size();++i){
while(epochs[i].load(std::memory_order_acquire)>1){}
static constexpr std::size_t N=128;
static constexpr std::size_t min_for_epoch_bump=64;
static constexpr std::size_t min_for_garbage_collection=64;
epoch_type epoch=0;
std::atomic<std::size_t> epoch_bump=0;
std::vector<retired_element> retired_elements;
std::atomic<std::size_t> wpos=0;
std::atomic<std::size_t> rpos=0;
std::atomic<bool> reading=false;
unsigned char pad[16];
};
mutable std::array<garbage_vector,128> garbage_vectors;
epoch_type current_epoch=1;
std::size_t max_safe_epoch()
{
std::size_t e=retired_element::reserved_;
for(std::size_t i=0;i<garbage_vectors.size();++i){
std::size_t le=garbage_vectors[i].epoch.load(std::memory_order_relaxed);
if(le&&le<e)e=le;
}
return e-1;
}
BOOST_FORCEINLINE void
retire_element(std::size_t pos,bool mco)
{
auto& v=garbage_vectors[thread_id()%garbage_vectors.size()];
std::size_t wpos=v.wpos;
std::size_t expected=retired_element::available_;
for(;;){
auto& e=v.retired_elements[wpos%v.garbage_vector::N];
if(e.epoch.compare_exchange_strong(expected,retired_element::reserved_)){
e.pos=pos;
e.mco=mco;
e.epoch=v.epoch.load();
++v.wpos;
--this->size_ctrl.ml;
--this->size_ctrl.size;
/* if(wpos-v.rpos>=garbage_vector::min_for_garbage_collection){
v.epoch=++current_epoch;
garbage_collect();
}
else */if(++v.epoch_bump==garbage_vector::min_for_epoch_bump)
{
v.epoch=++current_epoch;
v.epoch_bump=0;
garbage_collect();
}
return;
}
if(expected==retired_element::reserved_){ /* other thread wrote */
}
else{ /* vector full */
//std::cout<<"*";
v.epoch=++current_epoch;
garbage_collect();
}
wpos=v.wpos;
expected=retired_element::available_;
}
}
void garbage_collect(garbage_vector& v,std::size_t max_epoch)
{
if(v.rpos==v.wpos)return;
bool expected=false;
if(v.reading.compare_exchange_strong(expected,true)){
std::size_t rpos=v.rpos;
std::size_t num_mcos=0;
for(;;){
auto& e=v.retired_elements[rpos%v.garbage_vector::N];
if(e.epoch>max_epoch)break;
//++garbage_collected;
this->destroy_element(this->arrays.elements()+e.pos);
this->arrays.groups()[e.pos/N].reset(e.pos%N);
num_mcos+=e.mco;
e.epoch=retired_element::available_;
++rpos;
}
this->size_ctrl.ml+=(rpos-v.rpos)-num_mcos;
v.rpos=rpos;
v.reading=false;
}
}
BOOST_NOINLINE void garbage_collect()
{
auto max_epoch=max_safe_epoch();
for(std::size_t i=0;i<garbage_vectors.size();++i){
garbage_collect(garbage_vectors[i],max_epoch);
}
}
#endif