Compress-Stream-Zstd

 view release on metacpan or  search on metacpan

ext/zstd/contrib/pzstd/Pzstd.cpp  view on Meta::CPAN

              !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
        return;
      }
      // Pass the buffer with the decompressed data to the output queue
      out->push(split(outBuffer, zstdOutBuffer));
      // Advance past the input we already read
      advance(inBuffer, zstdInBuffer);
      if (returnCode == 0) {
        // The frame is over, prepare to (maybe) start a new frame
        ZSTD_initDStream(ctx.get());
      }
    }
  }
  if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
    return;
  }
  // We've given ZSTD_decompressStream all of our data, but there may still
  // be data to read.
  while (returnCode == 1) {
    // Allocate a buffer with at least outSize bytes.
    Buffer outBuffer(outSize);
    auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
    // Pass in no input.
    ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
    // Decompress
    returnCode =
        ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
    if (!errorHolder.check(
            !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
      return;
    }
    // Pass the buffer with the decompressed data to the output queue
    out->push(split(outBuffer, zstdOutBuffer));
  }
}

std::uint64_t asyncDecompressFrames(
    SharedState& state,
    WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
    ThreadPool& executor,
    FILE* fd) {
  auto framesGuard = makeScopeGuard([&] { frames.finish(); });
  std::uint64_t totalBytesRead = 0;

  // Split the source up into its component frames.
  // If we find our recognized skippable frame we know the next frames size
  // which means that we can decompress each standard frame in independently.
  // Otherwise, we will decompress using only one decompression task.
  const size_t chunkSize = ZSTD_DStreamInSize();
  auto status = FileStatus::Continue;
  while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
    // Make a new input queue that we will put the frames's bytes into.
    auto in = std::make_shared<BufferWorkQueue>();
    auto inGuard = makeScopeGuard([&] { in->finish(); });
    // Make a output queue that decompress will put the decompressed data into
    auto out = std::make_shared<BufferWorkQueue>();

    size_t frameSize;
    {
      // Calculate the size of the next frame.
      // frameSize is 0 if the frame info can't be decoded.
      Buffer buffer(SkippableFrame::kSize);
      auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
      totalBytesRead += bytesRead;
      status = fileStatus(fd);
      if (bytesRead == 0 && status != FileStatus::Continue) {
        break;
      }
      buffer.subtract(buffer.size() - bytesRead);
      frameSize = SkippableFrame::tryRead(buffer.range());
      in->push(std::move(buffer));
    }
    if (frameSize == 0) {
      // We hit a non SkippableFrame, so this will be the last job.
      // Make sure that we don't use too much memory
      in->setMaxSize(64);
      out->setMaxSize(64);
    }
    // Start decompression in the thread pool
    executor.add([&state, in, out] {
      return decompress(state, std::move(in), std::move(out));
    });
    // Pass the output queue to the writer thread
    frames.push(std::move(out));
    if (frameSize == 0) {
      // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
      // Pass the rest of the source to this decompression task
      state.log(kLogVerbose, "%s\n",
          "Input not in pzstd format, falling back to serial decompression");
      while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
        status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
      }
      break;
    }
    state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize);
    // Fill the input queue for the decompression job we just started
    status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
  }
  state.errorHolder.check(status != FileStatus::Error, "Error reading input");
  return totalBytesRead;
}

/// Write `data` to `fd`, returns true iff success.
static bool writeData(ByteRange data, FILE* fd) {
  while (!data.empty()) {
    data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
    if (std::ferror(fd)) {
      return false;
    }
  }
  return true;
}

std::uint64_t writeFile(
    SharedState& state,
    WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
    FILE* outputFd,
    bool decompress) {
  auto& errorHolder = state.errorHolder;
  auto lineClearGuard = makeScopeGuard([&state] {
    state.log.clear(kLogInfo);



( run in 0.533 second using v1.01-cache-2.11-cpan-39bf76dae61 )