Alien-XGBoost

 view release on metacpan or  search on metacpan

xgboost/plugin/lz4/sparse_page_lz4_format.cc  view on Meta::CPAN

}

template<typename DType>
inline void CompressArray<DType>::Write(dmlc::Stream* fo) {
  encoded_chunks_.clear();
  encoded_chunks_.push_back(0);
  for (size_t i = 0; i < out_buffer_.size(); ++i) {
    encoded_chunks_.push_back(encoded_chunks_.back() + out_buffer_[i].length());
  }
  fo->Write(raw_chunks_);
  fo->Write(encoded_chunks_);
  for (const std::string& buf : out_buffer_) {
    fo->Write(dmlc::BeginPtr(buf), buf.length());
  }
}

template<typename StorageIndex>
class SparsePageLZ4Format : public SparsePage::Format {
 public:
  explicit SparsePageLZ4Format(bool use_lz4_hc)
      : use_lz4_hc_(use_lz4_hc) {
    raw_bytes_ = raw_bytes_value_ = raw_bytes_index_ = 0;
    encoded_bytes_value_ = encoded_bytes_index_ = 0;
    nthread_ = dmlc::GetEnv("XGBOOST_LZ4_DECODE_NTHREAD", 4);
    nthread_write_ = dmlc::GetEnv("XGBOOST_LZ4_COMPRESS_NTHREAD", 12);
  }
  virtual ~SparsePageLZ4Format() {
    size_t encoded_bytes = raw_bytes_ +  encoded_bytes_value_ + encoded_bytes_index_;
    raw_bytes_ += raw_bytes_value_ + raw_bytes_index_;
    if (raw_bytes_ != 0) {
      LOG(CONSOLE) << "raw_bytes=" << raw_bytes_
                   << ", encoded_bytes=" << encoded_bytes
                   << ", ratio=" << double(encoded_bytes) / raw_bytes_
                   << ", ratio-index=" << double(encoded_bytes_index_) /raw_bytes_index_
                   << ", ratio-value=" << double(encoded_bytes_value_) /raw_bytes_value_;
    }
  }

  bool Read(SparsePage* page, dmlc::SeekStream* fi) override {
    if (!fi->Read(&(page->offset))) return false;
    CHECK_NE(page->offset.size(), 0) << "Invalid SparsePage file";
    this->LoadIndexValue(fi);

    page->data.resize(page->offset.back());
    CHECK_EQ(index_.data.size(), value_.data.size());
    CHECK_EQ(index_.data.size(), page->data.size());
    for (size_t i = 0; i < page->data.size(); ++i) {
      page->data[i] = SparseBatch::Entry(index_.data[i] + min_index_, value_.data[i]);
    }
    return true;
  }

  bool Read(SparsePage* page,
            dmlc::SeekStream* fi,
            const std::vector<bst_uint>& sorted_index_set) override {
    if (!fi->Read(&disk_offset_)) return false;
    this->LoadIndexValue(fi);

    page->offset.clear();
    page->offset.push_back(0);
    for (bst_uint cid : sorted_index_set) {
      page->offset.push_back(
          page->offset.back() + disk_offset_[cid + 1] - disk_offset_[cid]);
    }
    page->data.resize(page->offset.back());
    CHECK_EQ(index_.data.size(), value_.data.size());
    CHECK_EQ(index_.data.size(), disk_offset_.back());

    for (size_t i = 0; i < sorted_index_set.size(); ++i) {
      bst_uint cid = sorted_index_set[i];
      size_t dst_begin = page->offset[i];
      size_t src_begin = disk_offset_[cid];
      size_t num = disk_offset_[cid + 1] - disk_offset_[cid];
      for (size_t j = 0; j < num; ++j) {
        page->data[dst_begin + j] = SparseBatch::Entry(
            index_.data[src_begin + j] + min_index_, value_.data[src_begin + j]);
      }
    }
    return true;
  }

  void Write(const SparsePage& page, dmlc::Stream* fo) override {
    CHECK(page.offset.size() != 0 && page.offset[0] == 0);
    CHECK_EQ(page.offset.back(), page.data.size());
    fo->Write(page.offset);
    min_index_ = page.min_index;
    fo->Write(&min_index_, sizeof(min_index_));
    index_.data.resize(page.data.size());
    value_.data.resize(page.data.size());

    for (size_t i = 0; i < page.data.size(); ++i) {
      bst_uint idx = page.data[i].index - min_index_;
      CHECK_LE(idx, static_cast<bst_uint>(std::numeric_limits<StorageIndex>::max()))
          << "The storage index is chosen to limited to smaller equal than "
          << std::numeric_limits<StorageIndex>::max()
          << "min_index=" << min_index_;
      index_.data[i] = static_cast<StorageIndex>(idx);
      value_.data[i] = page.data[i].fvalue;
    }

    index_.InitCompressChunks(kChunkSize, kMaxChunk);
    value_.InitCompressChunks(kChunkSize, kMaxChunk);

    int nindex = index_.num_chunk();
    int nvalue = value_.num_chunk();
    int ntotal = nindex + nvalue;
    #pragma omp parallel for schedule(dynamic, 1)  num_threads(nthread_write_)
    for (int i = 0; i < ntotal; ++i) {
      if (i < nindex) {
        index_.Compress(i, use_lz4_hc_);
      } else {
        value_.Compress(i - nindex, use_lz4_hc_);
      }
    }
    index_.Write(fo);
    value_.Write(fo);
    // statistics
    raw_bytes_index_ += index_.RawBytes() * sizeof(bst_uint) / sizeof(StorageIndex);
    raw_bytes_value_ += value_.RawBytes();
    encoded_bytes_index_ += index_.EncodedBytes();
    encoded_bytes_value_ += value_.EncodedBytes();
    raw_bytes_ += page.offset.size() * sizeof(size_t);
  }

  inline void LoadIndexValue(dmlc::SeekStream* fi) {
    fi->Read(&min_index_, sizeof(min_index_));
    index_.Read(fi);
    value_.Read(fi);

    int nindex = index_.num_chunk();
    int nvalue = value_.num_chunk();
    int ntotal = nindex + nvalue;
    #pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_)



( run in 0.854 second using v1.01-cache-2.11-cpan-39bf76dae61 )