Refactored and documented the sort and merge algorithm

This commit is contained in:
Ion Gaztañaga
2016-03-23 22:11:06 +01:00
parent f86a3a40bb
commit cae8d2dda3

View File

@@ -37,8 +37,8 @@
// elements twice.
//
// The adaptive_merge algorithm was developed by Ion Gaztanaga reusing some parts
// from the sorting algorithm and implementing a block merge algorithm
// without moving elements left or right, which is used when external memory
// from the sorting algorithm and implementing an additional block merge algorithm
// without moving elements to left or right, which is used when external memory
// is available.
//////////////////////////////////////////////////////////////////////////////
#ifndef BOOST_MOVE_ADAPTIVE_SORT_MERGE_HPP
@@ -1954,7 +1954,7 @@ void stable_merge
template<class RandIt, class Compare>
void final_merge( bool buffer_right
void adaptive_sort_final_merge( bool buffer_right
, RandIt const first
, typename iterator_traits<RandIt>::size_type const l_intbuf
, typename iterator_traits<RandIt>::size_type const n_keys
@@ -2009,7 +2009,7 @@ bool build_params
//segments of size l_build_buf*2, maximizing the classic merge phase.
l_intbuf = size_type(ceil_sqrt_multiple(len, &l_base));
//This is the minimum number of case to implement the ideal algorithm
//This is the minimum number of keys to implement the ideal algorithm
//
//l_intbuf is used as buffer plus the key count
size_type n_min_ideal_keys = l_intbuf-1u;
@@ -2030,7 +2030,7 @@ bool build_params
//
//If available memory is 2*sqrt(l), then only sqrt(l) unique keys are needed,
//(to be used for keys in combine_all_blocks) as the whole l_build_buf
//we'll be backuped in the buffer during build_blocks.
//will be backuped in the buffer during build_blocks.
bool const non_unique_buf = xbuf.capacity() >= 2*l_intbuf;
size_type const to_collect = non_unique_buf ? l_intbuf : l_intbuf*2;
size_type collected = collect_unique(first, first+len, to_collect, comp, xbuf);
@@ -2053,6 +2053,7 @@ bool build_params
while(n_keys > collected){
n_keys/=2;
}
//AdaptiveSortInsertionSortThreshold is always power of two so the minimum is power of two
l_base = min_value<Unsigned>(n_keys, AdaptiveSortInsertionSortThreshold);
l_intbuf = 0;
l_build_buf = n_keys;
@@ -2072,119 +2073,26 @@ bool build_params
return true;
}
#define BOOST_MOVE_ADAPTIVE_MERGE_WITH_BUF
template<class RandIt, class Compare>
void adaptive_sort_impl
( RandIt first
, typename iterator_traits<RandIt>::size_type const len
inline void adaptive_merge_combine_blocks( RandIt first
, typename iterator_traits<RandIt>::size_type len1
, typename iterator_traits<RandIt>::size_type len2
, typename iterator_traits<RandIt>::size_type collected
, typename iterator_traits<RandIt>::size_type n_keys
, typename iterator_traits<RandIt>::size_type l_block
, bool use_internal_buf
, bool xbuf_used
, Compare comp
, adaptive_xbuf<typename iterator_traits<RandIt>::value_type> & xbuf
)
{
typedef typename iterator_traits<RandIt>::size_type size_type;
//Small sorts go directly to insertion sort
if(len <= size_type(AdaptiveSortInsertionSortThreshold)){
insertion_sort(first, first + len, comp);
return;
}
if((len-len/2) <= xbuf.capacity()){
merge_sort(first, first+len, comp, xbuf.data());
return;
}
//Make sure it is at least two
BOOST_STATIC_ASSERT(AdaptiveSortInsertionSortThreshold >= 4);
size_type l_base = 0;
size_type l_intbuf = 0;
size_type n_keys = 0;
size_type l_build_buf = 0;
if(!build_params(first, len, comp, n_keys, l_intbuf, l_base, l_build_buf, xbuf)){
stable_sort(first, first+len, comp, xbuf);
return;
}
//Otherwise, continue in adaptive_sort
BOOST_MOVE_ADAPTIVE_SORT_PRINT("\n After collect_unique: ", len);
size_type const n_key_plus_buf = l_intbuf+n_keys;
//l_build_buf is always power of two if l_intbuf is zero
BOOST_ASSERT(l_intbuf || (0 == (l_build_buf & (l_build_buf-1))));
//Classic merge sort until internal buffer and xbuf are exhausted
size_type const l_merged = build_blocks
(first+n_key_plus_buf-l_build_buf, len-n_key_plus_buf+l_build_buf, l_base, l_build_buf, xbuf, comp);
BOOST_MOVE_ADAPTIVE_SORT_PRINT(" After build_blocks: ", len);
//Non-trivial merge
bool const buffer_right = combine_all_blocks
(first, n_keys, first+n_keys, len-n_keys, l_merged, l_intbuf, xbuf, comp);
//Sort keys and buffer and merge the whole sequence
final_merge(buffer_right, first, l_intbuf, n_keys, len, xbuf, comp);
}
template<class RandIt, class Compare>
void adaptive_merge_impl
( RandIt first
, typename iterator_traits<RandIt>::size_type const len1
, typename iterator_traits<RandIt>::size_type const len2
, Compare comp
, adaptive_xbuf<typename iterator_traits<RandIt>::value_type> & xbuf
)
{
typedef typename iterator_traits<RandIt>::size_type size_type;
if(xbuf.capacity() >= min_value<size_type>(len1, len2)){
buffered_merge(first, first+len1, first+(len1+len2), comp, xbuf);
}
else{
const size_type len = len1+len2;
//Calculate ideal parameters and try to collect needed unique keys
size_type l_block = size_type(ceil_sqrt(len));
if(len1 <= l_block*2 || len2 <= l_block*2){
merge_bufferless(first, first+len1, first+len1+len2, comp);
return;
}
size_type l_intbuf = xbuf.capacity() >= l_block ? 0u : l_block;
//This is the minimum number of case to implement the ideal algorithm
//ceil(len/l_block) - 1 (as the first block is used as buffer)
size_type n_keys = l_block;
while(n_keys >= (len-l_intbuf-n_keys)/l_block){
--n_keys;
}
++n_keys;
BOOST_ASSERT(n_keys < l_block);
if(xbuf.template supports_aligned_trailing<size_type>(l_block, n_keys)){
n_keys = 0u;
}
size_type const to_collect = l_intbuf+n_keys;
size_type const collected = collect_unique(first, first+len1, to_collect, comp, xbuf);
BOOST_MOVE_ADAPTIVE_SORT_PRINT("\n A collect: ", len);
if(collected != to_collect && collected < 4){
merge_bufferless(first, first+len1, first+len1+len2, comp);
}
else{
bool use_internal_buf = true;
if (collected != to_collect){
l_intbuf = 0u;
n_keys = collected;
use_internal_buf = false;
l_block = lblock_for_combine(l_intbuf, n_keys, len, use_internal_buf);
l_intbuf = use_internal_buf ? l_block : 0u;
}
bool xbuf_used = collected == to_collect && xbuf.capacity() >= l_block;
size_type const len = len1+len2;
size_type const l_combine = len-collected;
size_type const l_combine1 = len1-collected;
size_type n_bef_irreg2, n_aft_irreg2, l_irreg1, l_irreg2, midkey_idx;
if(n_keys){
RandIt const first_data = first+collected;
@@ -2199,7 +2107,7 @@ void adaptive_merge_impl
BOOST_MOVE_ADAPTIVE_SORT_PRINT(" A mrg xbf: ", len);
}
else if(use_internal_buf){
#define BOOST_MOVE_ADAPTIVE_MERGE_WITH_BUF
#ifdef BOOST_MOVE_ADAPTIVE_MERGE_WITH_BUF
range_xbuf<RandIt, swap_op> rbuf(first_data-l_block, first_data);
merge_blocks_with_buf
@@ -2230,7 +2138,25 @@ void adaptive_merge_impl
BOOST_MOVE_ADAPTIVE_SORT_PRINT(" A mrg lft: ", len);
}
n_keys = collected-l_intbuf;
}
template<class RandIt, class Compare>
inline void adaptive_merge_final_merge( RandIt first
, typename iterator_traits<RandIt>::size_type len1
, typename iterator_traits<RandIt>::size_type len2
, typename iterator_traits<RandIt>::size_type collected
, typename iterator_traits<RandIt>::size_type l_intbuf
, typename iterator_traits<RandIt>::size_type l_block
, bool use_internal_buf
, bool xbuf_used
, Compare comp
, adaptive_xbuf<typename iterator_traits<RandIt>::value_type> & xbuf
)
{
typedef typename iterator_traits<RandIt>::size_type size_type;
(void)l_block;
size_type n_keys = collected-l_intbuf;
size_type len = len1+len2;
if(use_internal_buf){
if(xbuf_used){
xbuf.clear();
@@ -2272,6 +2198,246 @@ void adaptive_merge_impl
}
BOOST_MOVE_ADAPTIVE_SORT_PRINT(" A key mrg: ", len);
}
template<class SizeType, class Xbuf>
inline SizeType adaptive_merge_n_keys_intbuf(SizeType l_block, SizeType len, Xbuf & xbuf, SizeType &l_intbuf_inout)
{
typedef SizeType size_type;
size_type l_intbuf = xbuf.capacity() >= l_block ? 0u : l_block;
//This is the minimum number of keys to implement the ideal algorithm
//ceil(len/l_block) - 1 (as the first block is used as buffer)
size_type n_keys = l_block;
while(n_keys >= (len-l_intbuf-n_keys)/l_block){
--n_keys;
}
++n_keys;
BOOST_ASSERT(n_keys < l_block);
if(xbuf.template supports_aligned_trailing<size_type>(l_block, n_keys)){
n_keys = 0u;
}
l_intbuf_inout = l_intbuf;
return n_keys;
}
///////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
// Main explanation of the sort algorithm.
//
// csqrtlen = ceil(sqrt(len));
//
// * First, 2*csqrtlen unique elements elements are extracted from elements to be
// sorted and placed in the beginning of the range.
//
// * Step "build_blocks": In this nearly-classic merge step, 2*csqrtlen unique elements
// will be used as auxiliary memory, so trailing len-2*csqrtlen elements are
// are grouped in blocks of sorted 4*csqrtlen elements. At the end of the step
// 2*csqrtlen unique elements are again the leading elements of the whole range.
//
// * Step "combine_blocks": pairs of previously formed blocks are merged with a different
// ("smart") algorithm to form blocks of 8*csqrtlen elements. This step is slower than the
// "build_blocks" step and repeated iteratively (forming blocks of 16*csqrtlen, 32*csqrtlen
// elements, etc) of until all trailing (len-2*csqrtlen) elements are merged.
//
// In "combine_blocks" len/csqrtlen elements used are as "keys" (markers) to
// know if elements belong to the first or second block to be merged and another
// leading csqrtlen elements are used as buffer. Explanation of the "combine_blocks" step:
//
// Iteratively until all trailing (len-2*csqrtlen) elements are merged:
// Iteratively for each pair of previously merged block:
// * Blocks are divided groups of csqrtlen elements and
// 2*merged_block/csqrtlen keys are sorted to be used as markers
// * Groups are selection-sorted by first or last element (depending wheter they
// merged to left or right) and keys are reordered accordingly as an imitation-buffer.
// * Elements of each block pair is merged using the csqrtlen buffer taking into account
// if they belong to the first half or second half (marked by the key).
//
// * In the final merge step leading elements (2*csqrtlen) are sorted and merged with
// rotations with the rest of sorted elements in the "combine_blocks" step.
//
// Corner cases:
//
// * If no 2*csqrtlen elements can be extracted:
//
// * If csqrtlen+len/csqrtlen are extracted, then only csqrtlen elements are used
// as buffer in the "build_blocks" step forming blocks of 2*csqrtlen elements. This
// means that an additional "combine_blocks" step will be needed to merge all elements.
//
// * If no csqrtlen+len/csqrtlen elements can be extracted, but still more than a minimum,
// then reduces the number of elements used as buffer and keys in the "build_blocks"
// and "combine_blocks" steps. If "combine_blocks" has no enough keys due to this reduction
// then uses a rotation based smart merge.
//
// * If the minimum number of keys can't be extracted, a rotation-based sorting is performed.
//
// * If auxiliary memory is more or equal than ceil(len/2), half-copying mergesort is used.
//
// * If auxiliary memory is more than csqrtlen+n_keys*sizeof(std::size_t),
// then only csqrtlen elements need to be extracted and "combine_blocks" will use integral
// keys to combine blocks.
//
// * If auxiliary memory is available, the "build_blocks" will be extended to build bigger blocks
// using classic merge.
template<class RandIt, class Compare>
void adaptive_sort_impl
( RandIt first
, typename iterator_traits<RandIt>::size_type const len
, Compare comp
, adaptive_xbuf<typename iterator_traits<RandIt>::value_type> & xbuf
)
{
typedef typename iterator_traits<RandIt>::size_type size_type;
//Small sorts go directly to insertion sort
if(len <= size_type(AdaptiveSortInsertionSortThreshold)){
insertion_sort(first, first + len, comp);
return;
}
if((len-len/2) <= xbuf.capacity()){
merge_sort(first, first+len, comp, xbuf.data());
return;
}
//Make sure it is at least four
BOOST_STATIC_ASSERT(AdaptiveSortInsertionSortThreshold >= 4);
size_type l_base = 0;
size_type l_intbuf = 0;
size_type n_keys = 0;
size_type l_build_buf = 0;
//Calculate and extract needed unique elements. If a minimum is not achieved
//fallback to rotation-based merge
if(!build_params(first, len, comp, n_keys, l_intbuf, l_base, l_build_buf, xbuf)){
stable_sort(first, first+len, comp, xbuf);
return;
}
//Otherwise, continue the adaptive_sort
BOOST_MOVE_ADAPTIVE_SORT_PRINT("\n After collect_unique: ", len);
size_type const n_key_plus_buf = l_intbuf+n_keys;
//l_build_buf is always power of two if l_intbuf is zero
BOOST_ASSERT(l_intbuf || (0 == (l_build_buf & (l_build_buf-1))));
//Classic merge sort until internal buffer and xbuf are exhausted
size_type const l_merged = build_blocks
(first+n_key_plus_buf-l_build_buf, len-n_key_plus_buf+l_build_buf, l_base, l_build_buf, xbuf, comp);
BOOST_MOVE_ADAPTIVE_SORT_PRINT(" After build_blocks: ", len);
//Non-trivial merge
bool const buffer_right = combine_all_blocks
(first, n_keys, first+n_keys, len-n_keys, l_merged, l_intbuf, xbuf, comp);
//Sort keys and buffer and merge the whole sequence
adaptive_sort_final_merge(buffer_right, first, l_intbuf, n_keys, len, xbuf, comp);
}
// Main explanation of the merge algorithm.
//
// csqrtlen = ceil(sqrt(len));
//
// * First, csqrtlen [to be used as buffer] + (len/csqrtlen - 1) [to be used as keys] => to_collect
// unique elements are extracted from elements to be sorted and placed in the beginning of the range.
//
// * Step "combine_blocks": the leading (len1-to_collect) elements plus trailing len2 elements
// are merged with a non-trivial ("smart") algorithm to form an ordered range trailing "len-to_collect" elements.
//
// Explanation of the "combine_blocks" step:
//
// * Trailing [first+to_collect, first+len1) elements are divided in groups of cqrtlen elements.
// Remaining elements that can't form a group are grouped in the front of those elements.
// * Trailing [first+len1, first+len1+len2) elements are divided in groups of cqrtlen elements.
// Remaining elements that can't form a group are grouped in the back of those elements.
// * Groups are selection-sorted by first or last element (depending wheter they
// merged to left or right) and keys are reordered accordingly as an imitation-buffer.
// * Elements of each block pair is merged using the csqrtlen buffer taking into account
// if they belong to the first half or second half (marked by the key).
//
// * In the final merge step leading "to_collect" elements are merged with rotations
// with the rest of merged elements in the "combine_blocks" step.
//
// Corner cases:
//
// * If no "to_collect" elements can be extracted:
//
// * If more than a minimum number of elements is extracted
// then reduces the number of elements used as buffer and keys in the
// and "combine_blocks" steps. If "combine_blocks" has no enough keys due to this reduction
// then uses a rotation based smart merge.
//
// * If the minimum number of keys can't be extracted, a rotation-based merge is performed.
//
// * If auxiliary memory is more or equal than min(len1, len2), a buffered merge is performed.
//
// * If the len1 or len2 are less than 2*csqrtlen then a rotation-based merge is performed.
//
// * If auxiliary memory is more than csqrtlen+n_keys*sizeof(std::size_t),
// then no csqrtlen need to be extracted and "combine_blocks" will use integral
// keys to combine blocks.
template<class RandIt, class Compare>
void adaptive_merge_impl
( RandIt first
, typename iterator_traits<RandIt>::size_type const len1
, typename iterator_traits<RandIt>::size_type const len2
, Compare comp
, adaptive_xbuf<typename iterator_traits<RandIt>::value_type> & xbuf
)
{
typedef typename iterator_traits<RandIt>::size_type size_type;
if(xbuf.capacity() >= min_value<size_type>(len1, len2)){
buffered_merge(first, first+len1, first+(len1+len2), comp, xbuf);
}
else{
const size_type len = len1+len2;
//Calculate ideal parameters and try to collect needed unique keys
size_type l_block = size_type(ceil_sqrt(len));
//One range is not big enough to extract keys and the internal buffer so a
//rotation-based based merge will do just fine
if(len1 <= l_block*2 || len2 <= l_block*2){
merge_bufferless(first, first+len1, first+len1+len2, comp);
return;
}
//Detail the number of keys and internal buffer. If xbuf has enough memory, no
//internal buffer is needed so l_intbuf will remain 0.
size_type l_intbuf = 0;
size_type n_keys = adaptive_merge_n_keys_intbuf(l_block, len, xbuf, l_intbuf);
size_type const to_collect = l_intbuf+n_keys;
//Try to extract needed unique values from the first range
size_type const collected = collect_unique(first, first+len1, to_collect, comp, xbuf);
BOOST_MOVE_ADAPTIVE_SORT_PRINT("\n A collect: ", len);
//Not the minimum number of keys is not available on the first range, so fallback to rotations
if(collected != to_collect && collected < 4){
merge_bufferless(first, first+len1, first+len1+len2, comp);
return;
}
//If not enough keys but more than minimum, adjust the internal buffer and key count
bool use_internal_buf = collected == to_collect;
if (!use_internal_buf){
l_intbuf = 0u;
n_keys = collected;
l_block = lblock_for_combine(l_intbuf, n_keys, len, use_internal_buf);
//If use_internal_buf is false, then then internal buffer will be zero and rotation-based combination will be used
l_intbuf = use_internal_buf ? l_block : 0u;
}
bool const xbuf_used = collected == to_collect && xbuf.capacity() >= l_block;
//Merge trailing elements using smart merges
adaptive_merge_combine_blocks(first, len1, len2, collected, n_keys, l_block, use_internal_buf, xbuf_used, comp, xbuf);
//Merge buffer and keys with the rest of the values
adaptive_merge_final_merge (first, len1, len2, collected, l_intbuf, l_block, use_internal_buf, xbuf_used, comp, xbuf);
}
}