Alien-XGBoost
view release on metacpan or search on metacpan
xgboost/plugin/lz4/sparse_page_lz4_format.cc view on Meta::CPAN
}
template<typename DType>
inline void CompressArray<DType>::Write(dmlc::Stream* fo) {
encoded_chunks_.clear();
encoded_chunks_.push_back(0);
for (size_t i = 0; i < out_buffer_.size(); ++i) {
encoded_chunks_.push_back(encoded_chunks_.back() + out_buffer_[i].length());
}
fo->Write(raw_chunks_);
fo->Write(encoded_chunks_);
for (const std::string& buf : out_buffer_) {
fo->Write(dmlc::BeginPtr(buf), buf.length());
}
}
template<typename StorageIndex>
class SparsePageLZ4Format : public SparsePage::Format {
public:
explicit SparsePageLZ4Format(bool use_lz4_hc)
: use_lz4_hc_(use_lz4_hc) {
raw_bytes_ = raw_bytes_value_ = raw_bytes_index_ = 0;
encoded_bytes_value_ = encoded_bytes_index_ = 0;
nthread_ = dmlc::GetEnv("XGBOOST_LZ4_DECODE_NTHREAD", 4);
nthread_write_ = dmlc::GetEnv("XGBOOST_LZ4_COMPRESS_NTHREAD", 12);
}
virtual ~SparsePageLZ4Format() {
size_t encoded_bytes = raw_bytes_ + encoded_bytes_value_ + encoded_bytes_index_;
raw_bytes_ += raw_bytes_value_ + raw_bytes_index_;
if (raw_bytes_ != 0) {
LOG(CONSOLE) << "raw_bytes=" << raw_bytes_
<< ", encoded_bytes=" << encoded_bytes
<< ", ratio=" << double(encoded_bytes) / raw_bytes_
<< ", ratio-index=" << double(encoded_bytes_index_) /raw_bytes_index_
<< ", ratio-value=" << double(encoded_bytes_value_) /raw_bytes_value_;
}
}
bool Read(SparsePage* page, dmlc::SeekStream* fi) override {
if (!fi->Read(&(page->offset))) return false;
CHECK_NE(page->offset.size(), 0) << "Invalid SparsePage file";
this->LoadIndexValue(fi);
page->data.resize(page->offset.back());
CHECK_EQ(index_.data.size(), value_.data.size());
CHECK_EQ(index_.data.size(), page->data.size());
for (size_t i = 0; i < page->data.size(); ++i) {
page->data[i] = SparseBatch::Entry(index_.data[i] + min_index_, value_.data[i]);
}
return true;
}
bool Read(SparsePage* page,
dmlc::SeekStream* fi,
const std::vector<bst_uint>& sorted_index_set) override {
if (!fi->Read(&disk_offset_)) return false;
this->LoadIndexValue(fi);
page->offset.clear();
page->offset.push_back(0);
for (bst_uint cid : sorted_index_set) {
page->offset.push_back(
page->offset.back() + disk_offset_[cid + 1] - disk_offset_[cid]);
}
page->data.resize(page->offset.back());
CHECK_EQ(index_.data.size(), value_.data.size());
CHECK_EQ(index_.data.size(), disk_offset_.back());
for (size_t i = 0; i < sorted_index_set.size(); ++i) {
bst_uint cid = sorted_index_set[i];
size_t dst_begin = page->offset[i];
size_t src_begin = disk_offset_[cid];
size_t num = disk_offset_[cid + 1] - disk_offset_[cid];
for (size_t j = 0; j < num; ++j) {
page->data[dst_begin + j] = SparseBatch::Entry(
index_.data[src_begin + j] + min_index_, value_.data[src_begin + j]);
}
}
return true;
}
void Write(const SparsePage& page, dmlc::Stream* fo) override {
CHECK(page.offset.size() != 0 && page.offset[0] == 0);
CHECK_EQ(page.offset.back(), page.data.size());
fo->Write(page.offset);
min_index_ = page.min_index;
fo->Write(&min_index_, sizeof(min_index_));
index_.data.resize(page.data.size());
value_.data.resize(page.data.size());
for (size_t i = 0; i < page.data.size(); ++i) {
bst_uint idx = page.data[i].index - min_index_;
CHECK_LE(idx, static_cast<bst_uint>(std::numeric_limits<StorageIndex>::max()))
<< "The storage index is chosen to limited to smaller equal than "
<< std::numeric_limits<StorageIndex>::max()
<< "min_index=" << min_index_;
index_.data[i] = static_cast<StorageIndex>(idx);
value_.data[i] = page.data[i].fvalue;
}
index_.InitCompressChunks(kChunkSize, kMaxChunk);
value_.InitCompressChunks(kChunkSize, kMaxChunk);
int nindex = index_.num_chunk();
int nvalue = value_.num_chunk();
int ntotal = nindex + nvalue;
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_write_)
for (int i = 0; i < ntotal; ++i) {
if (i < nindex) {
index_.Compress(i, use_lz4_hc_);
} else {
value_.Compress(i - nindex, use_lz4_hc_);
}
}
index_.Write(fo);
value_.Write(fo);
// statistics
raw_bytes_index_ += index_.RawBytes() * sizeof(bst_uint) / sizeof(StorageIndex);
raw_bytes_value_ += value_.RawBytes();
encoded_bytes_index_ += index_.EncodedBytes();
encoded_bytes_value_ += value_.EncodedBytes();
raw_bytes_ += page.offset.size() * sizeof(size_t);
}
inline void LoadIndexValue(dmlc::SeekStream* fi) {
fi->Read(&min_index_, sizeof(min_index_));
index_.Read(fi);
value_.Read(fi);
int nindex = index_.num_chunk();
int nvalue = value_.num_chunk();
int ntotal = nindex + nvalue;
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_)
( run in 0.854 second using v1.01-cache-2.11-cpan-39bf76dae61 )