Alien-XGBoost
view release on metacpan or search on metacpan
xgboost/cub/cub/agent/agent_reduce_by_key.cuh view on Meta::CPAN
#pragma once
#include <iterator>
#include "single_pass_scan_operators.cuh"
#include "../block/block_load.cuh"
#include "../block/block_store.cuh"
#include "../block/block_scan.cuh"
#include "../block/block_discontinuity.cuh"
#include "../iterator/cache_modified_input_iterator.cuh"
#include "../iterator/constant_input_iterator.cuh"
#include "../util_namespace.cuh"
/// Optional outer namespace(s)
CUB_NS_PREFIX
/// CUB namespace
namespace cub {
/******************************************************************************
* Tuning policy types
******************************************************************************/
/**
* Parameterizable tuning policy type for AgentReduceByKey
*/
template <
int _BLOCK_THREADS, ///< Threads per thread block
int _ITEMS_PER_THREAD, ///< Items per thread (per tile of input)
BlockLoadAlgorithm _LOAD_ALGORITHM, ///< The BlockLoad algorithm to use
CacheLoadModifier _LOAD_MODIFIER, ///< Cache load modifier for reading input elements
BlockScanAlgorithm _SCAN_ALGORITHM> ///< The BlockScan algorithm to use
struct AgentReduceByKeyPolicy
{
enum
{
BLOCK_THREADS = _BLOCK_THREADS, ///< Threads per thread block
ITEMS_PER_THREAD = _ITEMS_PER_THREAD, ///< Items per thread (per tile of input)
};
static const BlockLoadAlgorithm LOAD_ALGORITHM = _LOAD_ALGORITHM; ///< The BlockLoad algorithm to use
static const CacheLoadModifier LOAD_MODIFIER = _LOAD_MODIFIER; ///< Cache load modifier for reading input elements
static const BlockScanAlgorithm SCAN_ALGORITHM = _SCAN_ALGORITHM; ///< The BlockScan algorithm to use
};
/******************************************************************************
* Thread block abstractions
******************************************************************************/
/**
* \brief AgentReduceByKey implements a stateful abstraction of CUDA thread blocks for participating in device-wide reduce-value-by-key
*/
template <
typename AgentReduceByKeyPolicyT, ///< Parameterized AgentReduceByKeyPolicy tuning policy type
typename KeysInputIteratorT, ///< Random-access input iterator type for keys
typename UniqueOutputIteratorT, ///< Random-access output iterator type for keys
typename ValuesInputIteratorT, ///< Random-access input iterator type for values
typename AggregatesOutputIteratorT, ///< Random-access output iterator type for values
typename NumRunsOutputIteratorT, ///< Output iterator type for recording number of items selected
typename EqualityOpT, ///< KeyT equality operator type
typename ReductionOpT, ///< ValueT reduction operator type
typename OffsetT> ///< Signed integer type for global offsets
struct AgentReduceByKey
{
//---------------------------------------------------------------------
// Types and constants
//---------------------------------------------------------------------
// The input keys type
typedef typename std::iterator_traits<KeysInputIteratorT>::value_type KeyInputT;
// The output keys type
typedef typename If<(Equals<typename std::iterator_traits<UniqueOutputIteratorT>::value_type, void>::VALUE), // KeyOutputT = (if output iterator's value type is void) ?
typename std::iterator_traits<KeysInputIteratorT>::value_type, // ... then the input iterator's value type,
typename std::iterator_traits<UniqueOutputIteratorT>::value_type>::Type KeyOutputT; // ... else the output iterator's value type
// The input values type
typedef typename std::iterator_traits<ValuesInputIteratorT>::value_type ValueInputT;
// The output values type
typedef typename If<(Equals<typename std::iterator_traits<AggregatesOutputIteratorT>::value_type, void>::VALUE), // ValueOutputT = (if output iterator's value type is void) ?
typename std::iterator_traits<ValuesInputIteratorT>::value_type, // ... then the input iterator's value type,
typename std::iterator_traits<AggregatesOutputIteratorT>::value_type>::Type ValueOutputT; // ... else the output iterator's value type
// Tuple type for scanning (pairs accumulated segment-value with segment-index)
typedef KeyValuePair<OffsetT, ValueOutputT> OffsetValuePairT;
// Tuple type for pairing keys and values
typedef KeyValuePair<KeyOutputT, ValueOutputT> KeyValuePairT;
// Tile status descriptor interface type
typedef ReduceByKeyScanTileState<ValueOutputT, OffsetT> ScanTileStateT;
// Guarded inequality functor
template <typename _EqualityOpT>
struct GuardedInequalityWrapper
{
_EqualityOpT op; ///< Wrapped equality operator
int num_remaining; ///< Items remaining
/// Constructor
__host__ __device__ __forceinline__
GuardedInequalityWrapper(_EqualityOpT op, int num_remaining) : op(op), num_remaining(num_remaining) {}
/// Boolean inequality operator, returns <tt>(a != b)</tt>
template <typename T>
__host__ __device__ __forceinline__ bool operator()(const T &a, const T &b, int idx) const
{
if (idx < num_remaining)
return !op(a, b); // In bounds
// Return true if first out-of-bounds item, false otherwise
return (idx == num_remaining);
}
};
// Constants
enum
xgboost/cub/cub/agent/agent_reduce_by_key.cuh view on Meta::CPAN
{
// Scatter flagged keys and values
#pragma unroll
for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
{
if (segment_flags[ITEM])
{
d_unique_out[segment_indices[ITEM]] = scatter_items[ITEM].key;
d_aggregates_out[segment_indices[ITEM]] = scatter_items[ITEM].value;
}
}
}
/**
* 2-phase scatter flagged items to output offsets
*
* The exclusive scan causes each head flag to be paired with the previous
* value aggregate: the scatter offsets must be decremented for value aggregates
*/
__device__ __forceinline__ void ScatterTwoPhase(
KeyValuePairT (&scatter_items)[ITEMS_PER_THREAD],
OffsetT (&segment_flags)[ITEMS_PER_THREAD],
OffsetT (&segment_indices)[ITEMS_PER_THREAD],
OffsetT num_tile_segments,
OffsetT num_tile_segments_prefix)
{
CTA_SYNC();
// Compact and scatter pairs
#pragma unroll
for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
{
if (segment_flags[ITEM])
{
temp_storage.raw_exchange.Alias()[segment_indices[ITEM] - num_tile_segments_prefix] = scatter_items[ITEM];
}
}
CTA_SYNC();
for (int item = threadIdx.x; item < num_tile_segments; item += BLOCK_THREADS)
{
KeyValuePairT pair = temp_storage.raw_exchange.Alias()[item];
d_unique_out[num_tile_segments_prefix + item] = pair.key;
d_aggregates_out[num_tile_segments_prefix + item] = pair.value;
}
}
/**
* Scatter flagged items
*/
__device__ __forceinline__ void Scatter(
KeyValuePairT (&scatter_items)[ITEMS_PER_THREAD],
OffsetT (&segment_flags)[ITEMS_PER_THREAD],
OffsetT (&segment_indices)[ITEMS_PER_THREAD],
OffsetT num_tile_segments,
OffsetT num_tile_segments_prefix)
{
// Do a one-phase scatter if (a) two-phase is disabled or (b) the average number of selected items per thread is less than one
if (TWO_PHASE_SCATTER && (num_tile_segments > BLOCK_THREADS))
{
ScatterTwoPhase(
scatter_items,
segment_flags,
segment_indices,
num_tile_segments,
num_tile_segments_prefix);
}
else
{
ScatterDirect(
scatter_items,
segment_flags,
segment_indices);
}
}
//---------------------------------------------------------------------
// Cooperatively scan a device-wide sequence of tiles with other CTAs
//---------------------------------------------------------------------
/**
* Process a tile of input (dynamic chained scan)
*/
template <bool IS_LAST_TILE> ///< Whether the current tile is the last tile
__device__ __forceinline__ void ConsumeTile(
OffsetT num_remaining, ///< Number of global input items remaining (including this tile)
int tile_idx, ///< Tile index
OffsetT tile_offset, ///< Tile offset
ScanTileStateT& tile_state) ///< Global tile state descriptor
{
KeyOutputT keys[ITEMS_PER_THREAD]; // Tile keys
KeyOutputT prev_keys[ITEMS_PER_THREAD]; // Tile keys shuffled up
ValueOutputT values[ITEMS_PER_THREAD]; // Tile values
OffsetT head_flags[ITEMS_PER_THREAD]; // Segment head flags
OffsetT segment_indices[ITEMS_PER_THREAD]; // Segment indices
OffsetValuePairT scan_items[ITEMS_PER_THREAD]; // Zipped values and segment flags|indices
KeyValuePairT scatter_items[ITEMS_PER_THREAD]; // Zipped key value pairs for scattering
// Load keys
if (IS_LAST_TILE)
BlockLoadKeysT(temp_storage.load_keys).Load(d_keys_in + tile_offset, keys, num_remaining);
else
BlockLoadKeysT(temp_storage.load_keys).Load(d_keys_in + tile_offset, keys);
// Load tile predecessor key in first thread
KeyOutputT tile_predecessor;
if (threadIdx.x == 0)
{
tile_predecessor = (tile_idx == 0) ?
keys[0] : // First tile gets repeat of first item (thus first item will not be flagged as a head)
d_keys_in[tile_offset - 1]; // Subsequent tiles get last key from previous tile
}
CTA_SYNC();
// Load values
if (IS_LAST_TILE)
xgboost/cub/cub/agent/agent_reduce_by_key.cuh view on Meta::CPAN
// Perform exclusive tile scan
OffsetValuePairT block_aggregate; // Inclusive block-wide scan aggregate
OffsetT num_segments_prefix; // Number of segments prior to this tile
ValueOutputT total_aggregate; // The tile prefix folded with block_aggregate
if (tile_idx == 0)
{
// Scan first tile
BlockScanT(temp_storage.scan).ExclusiveScan(scan_items, scan_items, scan_op, block_aggregate);
num_segments_prefix = 0;
total_aggregate = block_aggregate.value;
// Update tile status if there are successor tiles
if ((!IS_LAST_TILE) && (threadIdx.x == 0))
tile_state.SetInclusive(0, block_aggregate);
}
else
{
// Scan non-first tile
TilePrefixCallbackOpT prefix_op(tile_state, temp_storage.prefix, scan_op, tile_idx);
BlockScanT(temp_storage.scan).ExclusiveScan(scan_items, scan_items, scan_op, prefix_op);
block_aggregate = prefix_op.GetBlockAggregate();
num_segments_prefix = prefix_op.GetExclusivePrefix().key;
total_aggregate = reduction_op(
prefix_op.GetExclusivePrefix().value,
block_aggregate.value);
}
// Rezip scatter items and segment indices
#pragma unroll
for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
{
scatter_items[ITEM].key = prev_keys[ITEM];
scatter_items[ITEM].value = scan_items[ITEM].value;
segment_indices[ITEM] = scan_items[ITEM].key;
}
// At this point, each flagged segment head has:
// - The key for the previous segment
// - The reduced value from the previous segment
// - The segment index for the reduced value
// Scatter flagged keys and values
OffsetT num_tile_segments = block_aggregate.key;
Scatter(scatter_items, head_flags, segment_indices, num_tile_segments, num_segments_prefix);
// Last thread in last tile will output final count (and last pair, if necessary)
if ((IS_LAST_TILE) && (threadIdx.x == BLOCK_THREADS - 1))
{
OffsetT num_segments = num_segments_prefix + num_tile_segments;
// If the last tile is a whole tile, output the final_value
if (num_remaining == TILE_ITEMS)
{
d_unique_out[num_segments] = keys[ITEMS_PER_THREAD - 1];
d_aggregates_out[num_segments] = total_aggregate;
num_segments++;
}
// Output the total number of items selected
*d_num_runs_out = num_segments;
}
}
/**
* Scan tiles of items as part of a dynamic chained scan
*/
__device__ __forceinline__ void ConsumeRange(
int num_items, ///< Total number of input items
ScanTileStateT& tile_state, ///< Global tile state descriptor
int start_tile) ///< The starting tile for the current grid
{
// Blocks are launched in increasing order, so just assign one tile per block
int tile_idx = start_tile + blockIdx.x; // Current tile index
OffsetT tile_offset = OffsetT(TILE_ITEMS) * tile_idx; // Global offset for the current tile
OffsetT num_remaining = num_items - tile_offset; // Remaining items (including this tile)
if (num_remaining > TILE_ITEMS)
{
// Not last tile
ConsumeTile<false>(num_remaining, tile_idx, tile_offset, tile_state);
}
else if (num_remaining > 0)
{
// Last tile
ConsumeTile<true>(num_remaining, tile_idx, tile_offset, tile_state);
}
}
};
} // CUB namespace
CUB_NS_POSTFIX // Optional outer namespace(s)
( run in 0.817 second using v1.01-cache-2.11-cpan-13bb782fe5a )