From 06535e518ecc6c29cd3f57eefec45e832cd7f050 Mon Sep 17 00:00:00 2001 From: joaquintides Date: Mon, 13 Mar 2023 19:22:41 +0100 Subject: [PATCH] uploaded first draft of foa::concurrent_table --- .../unordered/detail/foa/concurrent_table.hpp | 697 ++++++++++++++++++ .../unordered/detail/foa/rw_spinlock.hpp | 184 +++++ 2 files changed, 881 insertions(+) create mode 100644 include/boost/unordered/detail/foa/concurrent_table.hpp create mode 100644 include/boost/unordered/detail/foa/rw_spinlock.hpp diff --git a/include/boost/unordered/detail/foa/concurrent_table.hpp b/include/boost/unordered/detail/foa/concurrent_table.hpp new file mode 100644 index 00000000..94823642 --- /dev/null +++ b/include/boost/unordered/detail/foa/concurrent_table.hpp @@ -0,0 +1,697 @@ +/* Fast open-addressing concurrent hash table. + * + * Copyright 2023 Joaquin M Lopez Munoz. + * Distributed under the Boost Software License, Version 1.0. + * (See accompanying file LICENSE_1_0.txt or copy at + * http://www.boost.org/LICENSE_1_0.txt) + * + * See https://www.boost.org/libs/unordered for library home page. + */ + +#ifndef BOOST_UNORDERED_DETAIL_FOA_CONCURRENT_TABLE_HPP +#define BOOST_UNORDERED_DETAIL_FOA_CONCURRENT_TABLE_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost{ +namespace unordered{ +namespace detail{ +namespace foa{ + +// TODO: use std::hardware_destructive_interference_size when available + +template +struct alignas(64) cacheline_protected:T +{ + using T::T; +}; + +template +class multimutex +{ +public: + constexpr std::size_t size()const noexcept{return N;} + + Mutex& operator[](std::size_t pos)noexcept + { + BOOST_ASSERT(pos mutexes; +}; + +/* std::shared_lock is C++14 */ + +template +class shared_lock +{ +public: + shared_lock(Mutex& m_)noexcept:m{m_}{m.lock_shared();} + ~shared_lock()noexcept{if(owns)m.unlock_shared();} + + void lock(){BOOST_ASSERT(!owns);m.lock_shared();owns=true;} + void unlock(){BOOST_ASSERT(owns);m.unlock_shared();owns=false;} + +private: + Mutex &m; + bool owns=true; +}; + +/* VS in pre-C++17 mode has trouble returning std::lock_guard due to + * its copy constructor being deleted. + */ + +template +class lock_guard +{ +public: + lock_guard(Mutex& m_)noexcept:m{m_}{m.lock();} + ~lock_guard()noexcept{m.unlock();} + +private: + Mutex &m; +}; + +/* copied from boost/multi_index/detail/scoped_bilock.hpp */ + +template +class scoped_bilock +{ +public: + scoped_bilock(Mutex& m1,Mutex& m2)noexcept:mutex_eq{&m1==&m2} + { + bool mutex_lt=std::less{}(&m1,&m2); + + ::new (static_cast(&storage1)) lock_guard_type(mutex_lt?m1:m2); + if(!mutex_eq) + ::new (static_cast(&storage2)) lock_guard_type(mutex_lt?m2:m1); + } + + ~scoped_bilock()noexcept + { + reinterpret_cast(&storage1)->~lock_guard_type(); + if(!mutex_eq) + reinterpret_cast(&storage2)->~lock_guard_type(); + } + +private: + using lock_guard_type=lock_guard; + + bool mutex_eq; + alignas(lock_guard_type) unsigned char storage1[sizeof(lock_guard_type)], + storage2[sizeof(lock_guard_type)]; +}; + +/* TODO: describe atomic_integral and group_access */ + +template +struct atomic_integral +{ + operator Integral()const{return n.load(std::memory_order_acquire);} + void operator=(Integral m){n.store(m,std::memory_order_release);} + void operator|=(Integral m){n.fetch_or(m,std::memory_order_acq_rel);} + void operator&=(Integral m){n.fetch_and(m,std::memory_order_acq_rel);} + + std::atomic n; +}; + +struct group_access +{ + struct dummy_group_access_type + { + boost::uint32_t storage[2]={0,0}; + }; + + using mutex_type=rw_spinlock; + using shared_lock_guard=shared_lock; + using exclusive_lock_guard=lock_guard; + + shared_lock_guard shared_access(){return m;} + exclusive_lock_guard exclusive_access(){return m;} + std::atomic_uint32_t& insert_counter(){return cnt;} + +private: + mutex_type m; + std::atomic_uint32_t cnt; +}; + +template +struct concurrent_group:Group,group_access +{ + struct dummy_group_type + { + typename Group::dummy_group_type group_storage; + group_access::dummy_group_access_type access_storage; + }; +}; + +/* TODO: describe foa::concurrent_table. + */ + +template +using concurrent_table_core_impl=table_core< + TypePolicy,concurrent_group>,std::atomic_size_t, + Hash,Pred,Allocator>; + +#include + +#if defined(BOOST_MSVC) +#pragma warning(push) +#pragma warning(disable:4714) /* marked as __forceinline not inlined */ +#endif + +template +class concurrent_table:concurrent_table_core_impl +{ + using super=concurrent_table_core_impl; + using type_policy=typename super::type_policy; + using group_type=typename super::group_type; + using super::N; + using prober=typename super::prober; + +public: + using key_type=typename super::key_type; + using init_type=typename super::init_type; + using value_type=typename super::value_type; + using element_type=typename super::element_type; + using hasher=typename super::hasher; + using key_equal=typename super::key_equal; + using allocator_type=typename super::allocator_type; + using size_type=typename super::size_type; + + concurrent_table( + std::size_t n=default_bucket_count,const Hash& h_=Hash(), + const Pred& pred_=Pred(),const Allocator& al_=Allocator()): + super{n,h_,pred_,al_} + {} + + concurrent_table(const concurrent_table& x): + concurrent_table(x,x.exclusive_access()){} + concurrent_table(concurrent_table&& x): + concurrent_table(std::move(x),x.exclusive_access()){} + concurrent_table(const concurrent_table& x,const Allocator& al_): + concurrent_table(x,al_,x.exclusive_access()){} + concurrent_table(concurrent_table&& x,const Allocator& al_): + concurrent_table(std::move(x),al_,x.exclusive_access()){} + ~concurrent_table()=default; + + concurrent_table& operator=(const concurrent_table& x) + { + auto lck=exclusive_access(*this,x); + super::operator=(x); + return *this; + } + + concurrent_table& operator=(concurrent_table&& x) + { + auto lck=exclusive_access(*this,x); + super::operator=(std::move(x)); + return *this; + } + + allocator_type get_allocator()const noexcept + { + auto lck=shared_access(); + return super::get_allocator(); + } + + template + BOOST_FORCEINLINE std::size_t visit(const Key& x,F f) + { + auto lck=shared_access(); + auto hash=this->hash_for(x); + return unprotected_visit(x,this->position_for(hash),hash,f); + } + + template + BOOST_FORCEINLINE std::size_t visit(const Key& x,F f)const + { + return const_cast(this)-> + visit(x,[&](const value_type& v){f(v);}); + } + + // TODO: visit_all + + bool empty()const noexcept{return size()==0;} + + std::size_t size()const noexcept + { + auto lck=shared_access(); + std::size_t ml_=this->ml; /* load order matters */ + std::size_t available_=this->available; + return ml_-available_; + } + + using super::max_size; + + template + BOOST_FORCEINLINE bool emplace(Args&&... args) + { + return construct_and_emplace(std::forward(args)...); + } + + BOOST_FORCEINLINE bool + insert(const init_type& x){return emplace_impl(x);} + + BOOST_FORCEINLINE bool + insert(init_type&& x){return emplace_impl(std::move(x));} + + /* template tilts call ambiguities in favor of init_type */ + + template + BOOST_FORCEINLINE bool + insert(const value_type& x){return emplace_impl(x);} + + template + BOOST_FORCEINLINE bool + insert(value_type&& x){return emplace_impl(std::move(x));} + + template + BOOST_FORCEINLINE bool try_emplace(Key&& x,Args&&... args) + { + return emplace_impl( + try_emplace_args_t{},std::forward(x),std::forward(args)...); + } + + template + BOOST_FORCEINLINE bool try_emplace_or_visit(Key&& x,F f,Args&&... args) + { + return emplace_or_visit_impl( + f,try_emplace_args_t{},std::forward(x),std::forward(args)...); + } + + template + BOOST_FORCEINLINE std::size_t erase(Key&& x) + { + return erase_if(std::forward(x),[](const value_type&){return true;}); + } + + template + BOOST_FORCEINLINE bool emplace_or_visit(F f,Args&&... args) + { + return construct_and_emplace_or_visit( + std::forward(f),std::forward(args)...); + } + + template + BOOST_FORCEINLINE bool insert_or_visit(const init_type& x,F f) + {return emplace_or_visit_impl(x,std::forward(f));} + + template + BOOST_FORCEINLINE bool insert_or_visit(init_type&& x,F f) + {return emplace_or_visit_impl(std::move(x),std::forward(f));} + + /* typename=void tilts call ambiguities in favor of init_type */ + + template + BOOST_FORCEINLINE bool insert_or_visit(const value_type& x,F f) + {return emplace_or_visit_impl(x,std::forward(f));} + + template + BOOST_FORCEINLINE bool insert_or_visit(value_type&& x,F f) + {return emplace_or_visit_impl(std::move(x),std::forward(f));} + + template + BOOST_FORCEINLINE std::size_t erase_if(Key&& x,F f) + { + auto lck=shared_access(); + auto hash=this->hash_for(x); + return (std::size_t)unprotected_internal_visit( + x,this->position_for(hash),hash, + [&,this](group_type* pg,unsigned int n,element_type* p) + { + if(f(const_cast(type_policy::value_from(*p)))){ + super::erase(pg,n,p); + } + }); + } + + void swap(concurrent_table& x) + noexcept(noexcept(std::declval().swap(std::declval()))) + { + auto lck=exclusive_access(*this,x); + super::swap(x); + } + + void clear()noexcept + { + auto lck=exclusive_access(); + super::clear(); + } + +#if 0 + // TODO: should we accept different allocator too? + template + void merge(table& x) + { + x.for_all_elements([&,this](group_type* pg,unsigned int n,element_type* p){ + erase_on_exit e{x,{pg,n,p}}; + if(!emplace_impl(type_policy::move(*p)).second)e.rollback(); + }); + } + + template + void merge(table&& x){merge(x);} +#endif + + hasher hash_function()const + { + auto lck=shared_access(); + return super::hash_function(); + } + + key_equal key_eq()const + { + auto lck=shared_access(); + return super::key_eq(); + } + + template + BOOST_FORCEINLINE std::size_t count(Key&& x)const + { + return (std::size_t)contains(std::forward(x)); + } + + template + BOOST_FORCEINLINE bool contains(Key&& x)const + { + return visit(std::forward(x),[](const value_type&){}); + } + + std::size_t capacity()const noexcept + { + auto lck=shared_access(); + return super::capacity(); + } + + float load_factor()const noexcept + { + auto lck=shared_access(); + return super::load_factor(); + } + + using super::max_load_factor; + + std::size_t max_load()const noexcept + { + auto lck=shared_access(); + return super::max_load(); + } + + void rehash(std::size_t n) + { + auto lck=exclusive_access(); + super::rehash(n); + } + + void reserve(std::size_t n) + { + auto lck=exclusive_access(); + super::reserve(n); + } + + // TODO: rewrite + template + friend std::size_t erase_if(concurrent_table& x,Predicate pr) + { + return x.erase_if_impl(pr); + } + +private: + using mutex_type=cacheline_protected; + using multimutex_type=multimutex; // TODO: adapt 128 to the machine + using shared_lock_guard=shared_lock; + using exclusive_lock_guard=lock_guard; + using exclusive_bilock_guard=scoped_bilock; + using group_shared_lock_guard=typename group_type::shared_lock_guard; + using group_exclusive_lock_guard=typename group_type::exclusive_lock_guard; + + concurrent_table(const concurrent_table& x,exclusive_lock_guard): + super{x}{} + concurrent_table(concurrent_table&& x,exclusive_lock_guard): + super{std::move(x)}{} + concurrent_table( + const concurrent_table& x,const Allocator& al_,exclusive_lock_guard): + super{x,al_}{} + concurrent_table( + concurrent_table&& x,const Allocator& al_,exclusive_lock_guard): + super{std::move(x),al_}{} + + template + BOOST_FORCEINLINE std::size_t unprotected_visit( + const Key& x,std::size_t pos0,std::size_t hash,F&& f)const + { + return unprotected_internal_visit( + x,pos0,hash, + [&,this](group_type*,unsigned int,element_type* p) + {f(type_policy::value_from(*p));}); + } + +#if defined(BOOST_MSVC) +/* warning: forcing value to bool 'true' or 'false' in bool(pred()...) */ +#pragma warning(push) +#pragma warning(disable:4800) +#endif + + template + BOOST_FORCEINLINE std::size_t unprotected_internal_visit( + const Key& x,std::size_t pos0,std::size_t hash,F&& f)const + { + prober pb(pos0); + do{ + auto pos=pb.get(); + auto pg=this->arrays.groups+pos; + auto mask=pg->match(hash); + if(mask){ + auto p=this->arrays.elements+pos*N; + this->prefetch_elements(p); + auto lck=shared_access(pos); + do{ + auto n=unchecked_countr_zero(mask); + if(BOOST_LIKELY( + pg->is_occupied(n)&&bool(this->pred()(x,this->key_from(p[n]))))){ + f(pg,n,p+n); + return 1; + } + mask&=mask-1; + }while(mask); + } + if(BOOST_LIKELY(pg->is_not_overflowed(hash))){ + return 0; + } + } + while(BOOST_LIKELY(pb.next(this->arrays.groups_size_mask))); + return 0; + } + +#if defined(BOOST_MSVC) +#pragma warning(pop) /* C4800 */ +#endif + + template + BOOST_FORCEINLINE bool construct_and_emplace(Args&&... args) + { + return construct_and_emplace_or_visit( + [](value_type&){},std::forward(args)...); + } + + template + BOOST_FORCEINLINE bool construct_and_emplace_or_visit(F&& f,Args&&... args) + { + auto lck=shared_access(); + + auto x=alloc_make_insert_type( + this->al(),std::forward(args)...); + int res=unprotected_norehash_emplace_or_visit( + std::forward(f),type_policy::move(x.value())); + if(res>=0)return res!=0; + + lck.unlock(); + + rehash_if_full(); + return noinline_emplace_or_visit( + std::forward(f),type_policy::move(x.value())); + } + + template + BOOST_FORCEINLINE bool emplace_impl(Args&&... args) + { + return emplace_or_visit_impl( + [](value_type&){},std::forward(args)...); + } + + template + BOOST_NOINLINE bool noinline_emplace_or_visit(F&& f,Args&&... args) + { + return emplace_or_visit_impl( + std::forward(f),std::forward(args)...); + } + + template + BOOST_FORCEINLINE bool emplace_or_visit_impl(F&& f,Args&&... args) + { + for(;;){ + { + auto lck=shared_access(); + int res=unprotected_norehash_emplace_or_visit( + std::forward(f),std::forward(args)...); + if(res>=0)return res!=0; + } + rehash_if_full(); + } + } + + struct reserve_available + { + reserve_available(concurrent_table& x_):x{x_} + { + do{ + available=x.available.load(std::memory_order_acquire); + }while( + available&&!x.available.compare_exchange_weak(available,available-1)); + } + + ~reserve_available() + { + if(!commit_&&available){ + x.available.fetch_add(1,std::memory_order_release); + } + } + + bool succeeded()const{return available!=0;} + void commit(){commit_=true;} + + concurrent_table &x; + std::size_t available; + bool commit_=false; + }; + + template + BOOST_FORCEINLINE int + unprotected_norehash_emplace_or_visit(F&& f,Args&&... args) + { + const auto &k=this->key_from(std::forward(args)...); + auto hash=this->hash_for(k); + auto pos0=this->position_for(hash); + + for(;;){ + startover: + boost::uint32_t counter=insert_counter(pos0); + if(unprotected_visit(k,pos0,hash,f))return 0; + +#if 1 + reserve_available ra(*this); + if(BOOST_LIKELY(ra.succeeded())){ +#else + if(BOOST_LIKELY(this->available!=0)){ +#endif + for(prober pb(pos0);;pb.next(this->arrays.groups_size_mask)){ + auto pos=pb.get(); + auto pg=this->arrays.groups+pos; + auto mask=pg->match_available(); + if(BOOST_LIKELY(mask!=0)){ + auto lck=exclusive_access(pos); + do{ + auto n=unchecked_countr_zero(mask); + if(BOOST_LIKELY(!pg->is_occupied(n))){ + pg->set(n,hash); + if(BOOST_UNLIKELY(insert_counter(pos0)++!=counter)){ + /* other thread inserted from pos0, need to start over */ + pg->reset(n); + goto startover; + } + auto p=this->arrays.elements+pos*N+n; + this->construct_element(p,std::forward(args)...); +#if 1 + ra.commit(); +#else + --(this->available); +#endif + f(type_policy::value_from(*p)); + return 1; + } + mask&=mask-1; + }while(mask); + } + pg->mark_overflow(hash); + } + } + else return -1; + } + } + + BOOST_NOINLINE void rehash_if_full() + { + auto lck=exclusive_access(); + // TODO: use same mechanism as unchecked_emplace_with_rehash + if(!this->available)this->super::rehash(super::capacity()+1); + } + + shared_lock_guard shared_access()const + { + // TODO: make this more sophisticated (even distribution) + thread_local auto id=(++thread_counter)%mutexes.size(); + + return mutexes[id]; + } + + exclusive_lock_guard exclusive_access()const + { + return mutexes; + } + + exclusive_bilock_guard exclusive_access( + const concurrent_table& x,const concurrent_table& y) + { + return {x.mutexes,y.mutexes}; + } + + group_shared_lock_guard shared_access(std::size_t pos)const + { + return this->arrays.groups[pos].shared_access(); + } + + group_exclusive_lock_guard exclusive_access(std::size_t pos)const + { + return this->arrays.groups[pos].exclusive_access(); + } + + std::atomic_uint32_t& insert_counter(std::size_t pos)const + { + return this->arrays.groups[pos].insert_counter(); + } + + mutable std::atomic_uint thread_counter=0; + mutable multimutex_type mutexes; +}; + +#if defined(BOOST_MSVC) +#pragma warning(pop) /* C4714 */ +#endif + +#include + +} /* namespace foa */ +} /* namespace detail */ +} /* namespace unordered */ +} /* namespace boost */ + +#endif diff --git a/include/boost/unordered/detail/foa/rw_spinlock.hpp b/include/boost/unordered/detail/foa/rw_spinlock.hpp new file mode 100644 index 00000000..132c68dd --- /dev/null +++ b/include/boost/unordered/detail/foa/rw_spinlock.hpp @@ -0,0 +1,184 @@ +#ifndef BOOST_UNORDERED_DETAIL_FOA_RW_SPINLOCK_HPP_INCLUDED +#define BOOST_UNORDERED_DETAIL_FOA_RW_SPINLOCK_HPP_INCLUDED + +// Copyright 2023 Peter Dimov +// Distributed under the Boost Software License, Version 1.0. +// https://www.boost.org/LICENSE_1_0.txt + +#include +#include +#include +#include + +namespace boost{ +namespace unordered{ +namespace detail{ +namespace foa{ + +class rw_spinlock +{ +private: + + // bit 31: locked exclusive + // bit 30: writer pending + // bit 29..: reader lock count + + std::atomic state_ = {}; + +private: + + // number of times to spin before sleeping + static constexpr int spin_count = 24576; + +public: + + bool try_lock_shared() noexcept + { + std::uint32_t st = state_.load( std::memory_order_relaxed ); + + if( st >= 0x3FFF'FFFF ) + { + // either bit 31 set, bit 30 set, or reader count is max + return false; + } + + std::uint32_t newst = st + 1; + return state_.compare_exchange_strong( st, newst, std::memory_order_acquire, std::memory_order_relaxed ); + } + + void lock_shared() noexcept + { + for( ;; ) + { + for( int k = 0; k < spin_count; ++k ) + { + std::uint32_t st = state_.load( std::memory_order_relaxed ); + + if( st < 0x3FFF'FFFF ) + { + std::uint32_t newst = st + 1; + if( state_.compare_exchange_weak( st, newst, std::memory_order_acquire, std::memory_order_relaxed ) ) return; + } + + boost::detail::sp_thread_pause(); + } + + boost::detail::sp_thread_sleep(); + } + } + + void unlock_shared() noexcept + { + // pre: locked shared, not locked exclusive + + state_.fetch_sub( 1, std::memory_order_release ); + + // if the writer pending bit is set, there's a writer waiting + // let it acquire the lock; it will clear the bit on unlock + } + + bool try_lock() noexcept + { + std::uint32_t st = state_.load( std::memory_order_relaxed ); + + if( st & 0x8000'0000 ) + { + // locked exclusive + return false; + } + + if( st & 0x3FFF'FFFF ) + { + // locked shared + return false; + } + + std::uint32_t newst = 0x8000'0000; + return state_.compare_exchange_strong( st, newst, std::memory_order_acquire, std::memory_order_relaxed ); + } + + void lock() noexcept + { + for( ;; ) + { + for( int k = 0; k < spin_count; ++k ) + { + std::uint32_t st = state_.load( std::memory_order_relaxed ); + + if( st & 0x8000'0000 ) + { + // locked exclusive, spin + } + else if( ( st & 0x3FFF'FFFF ) == 0 ) + { + // not locked exclusive, not locked shared, try to lock + + std::uint32_t newst = 0x8000'0000; + if( state_.compare_exchange_weak( st, newst, std::memory_order_acquire, std::memory_order_relaxed ) ) return; + } + else if( st & 0x4000'000 ) + { + // writer pending bit already set, nothing to do + } + else + { + // locked shared, set writer pending bit + + std::uint32_t newst = st | 0x4000'0000; + state_.compare_exchange_weak( st, newst, std::memory_order_relaxed, std::memory_order_relaxed ); + } + + boost::detail::sp_thread_pause(); + } + + // clear writer pending bit before going to sleep + + { + std::uint32_t st = state_.load( std::memory_order_relaxed ); + + for( ;; ) + { + if( st & 0x8000'0000 ) + { + // locked exclusive, nothing to do + break; + } + else if( ( st & 0x3FFF'FFFF ) == 0 ) + { + // lock free, try to take it + + std::uint32_t newst = 0x8000'0000; + if( state_.compare_exchange_weak( st, newst, std::memory_order_acquire, std::memory_order_relaxed ) ) return; + } + else if( ( st & 0x4000'0000 ) == 0 ) + { + // writer pending bit already clear, nothing to do + break; + } + else + { + // clear writer pending bit + + std::uint32_t newst = st & ~0x4000'0000u; + if( state_.compare_exchange_weak( st, newst, std::memory_order_relaxed, std::memory_order_relaxed ) ) break; + } + } + } + + boost::detail::sp_thread_sleep(); + } + } + + void unlock() noexcept + { + // pre: locked exclusive, not locked shared + state_.store( 0, std::memory_order_release ); + } +}; + +} /* namespace foa */ +} /* namespace detail */ +} /* namespace unordered */ +} /* namespace boost */ + +#endif // BOOST_UNORDERED_DETAIL_FOA_RW_SPINLOCK_HPP_INCLUDED