Compress-Stream-Zstd
view release on metacpan or search on metacpan
ext/zstd/lib/compress/zstd_compress.c view on Meta::CPAN
size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
FORWARD_IF_ERROR( ZSTD_compressStream2(zcs, output, input, ZSTD_e_continue) , "");
return ZSTD_nextInputSizeHint_MTorST(zcs);
}
/* After a compression call set the expected input/output buffer.
* This is validated at the start of the next compression call.
*/
static void
ZSTD_setBufferExpectations(ZSTD_CCtx* cctx, const ZSTD_outBuffer* output, const ZSTD_inBuffer* input)
{
DEBUGLOG(5, "ZSTD_setBufferExpectations (for advanced stable in/out modes)");
if (cctx->appliedParams.inBufferMode == ZSTD_bm_stable) {
cctx->expectedInBuffer = *input;
}
if (cctx->appliedParams.outBufferMode == ZSTD_bm_stable) {
cctx->expectedOutBufferSize = output->size - output->pos;
}
}
/* Validate that the input/output buffers match the expectations set by
* ZSTD_setBufferExpectations.
*/
static size_t ZSTD_checkBufferStability(ZSTD_CCtx const* cctx,
ZSTD_outBuffer const* output,
ZSTD_inBuffer const* input,
ZSTD_EndDirective endOp)
{
if (cctx->appliedParams.inBufferMode == ZSTD_bm_stable) {
ZSTD_inBuffer const expect = cctx->expectedInBuffer;
if (expect.src != input->src || expect.pos != input->pos)
RETURN_ERROR(stabilityCondition_notRespected, "ZSTD_c_stableInBuffer enabled but input differs!");
}
(void)endOp;
if (cctx->appliedParams.outBufferMode == ZSTD_bm_stable) {
size_t const outBufferSize = output->size - output->pos;
if (cctx->expectedOutBufferSize != outBufferSize)
RETURN_ERROR(stabilityCondition_notRespected, "ZSTD_c_stableOutBuffer enabled but output size differs!");
}
return 0;
}
static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx,
ZSTD_EndDirective endOp,
size_t inSize)
{
ZSTD_CCtx_params params = cctx->requestedParams;
ZSTD_prefixDict const prefixDict = cctx->prefixDict;
FORWARD_IF_ERROR( ZSTD_initLocalDict(cctx) , ""); /* Init the local dict if present. */
ZSTD_memset(&cctx->prefixDict, 0, sizeof(cctx->prefixDict)); /* single usage */
assert(prefixDict.dict==NULL || cctx->cdict==NULL); /* only one can be set */
if (cctx->cdict && !cctx->localDict.cdict) {
/* Let the cdict's compression level take priority over the requested params.
* But do not take the cdict's compression level if the "cdict" is actually a localDict
* generated from ZSTD_initLocalDict().
*/
params.compressionLevel = cctx->cdict->compressionLevel;
}
DEBUGLOG(4, "ZSTD_compressStream2 : transparent init stage");
if (endOp == ZSTD_e_end) cctx->pledgedSrcSizePlusOne = inSize + 1; /* auto-determine pledgedSrcSize */
{ size_t const dictSize = prefixDict.dict
? prefixDict.dictSize
: (cctx->cdict ? cctx->cdict->dictContentSize : 0);
ZSTD_cParamMode_e const mode = ZSTD_getCParamMode(cctx->cdict, ¶ms, cctx->pledgedSrcSizePlusOne - 1);
params.cParams = ZSTD_getCParamsFromCCtxParams(
¶ms, cctx->pledgedSrcSizePlusOne-1,
dictSize, mode);
}
params.useBlockSplitter = ZSTD_resolveBlockSplitterMode(params.useBlockSplitter, ¶ms.cParams);
params.ldmParams.enableLdm = ZSTD_resolveEnableLdm(params.ldmParams.enableLdm, ¶ms.cParams);
params.useRowMatchFinder = ZSTD_resolveRowMatchFinderMode(params.useRowMatchFinder, ¶ms.cParams);
params.validateSequences = ZSTD_resolveExternalSequenceValidation(params.validateSequences);
params.maxBlockSize = ZSTD_resolveMaxBlockSize(params.maxBlockSize);
params.searchForExternalRepcodes = ZSTD_resolveExternalRepcodeSearch(params.searchForExternalRepcodes, params.compressionLevel);
#ifdef ZSTD_MULTITHREAD
/* If external matchfinder is enabled, make sure to fail before checking job size (for consistency) */
RETURN_ERROR_IF(
params.useSequenceProducer == 1 && params.nbWorkers >= 1,
parameter_combination_unsupported,
"External sequence producer isn't supported with nbWorkers >= 1"
);
if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) {
params.nbWorkers = 0; /* do not invoke multi-threading when src size is too small */
}
if (params.nbWorkers > 0) {
#if ZSTD_TRACE
cctx->traceCtx = (ZSTD_trace_compress_begin != NULL) ? ZSTD_trace_compress_begin(cctx) : 0;
#endif
/* mt context creation */
if (cctx->mtctx == NULL) {
DEBUGLOG(4, "ZSTD_compressStream2: creating new mtctx for nbWorkers=%u",
params.nbWorkers);
cctx->mtctx = ZSTDMT_createCCtx_advanced((U32)params.nbWorkers, cctx->customMem, cctx->pool);
RETURN_ERROR_IF(cctx->mtctx == NULL, memory_allocation, "NULL pointer!");
}
/* mt compression */
DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers);
FORWARD_IF_ERROR( ZSTDMT_initCStream_internal(
cctx->mtctx,
prefixDict.dict, prefixDict.dictSize, prefixDict.dictContentType,
cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) , "");
cctx->dictID = cctx->cdict ? cctx->cdict->dictID : 0;
cctx->dictContentSize = cctx->cdict ? cctx->cdict->dictContentSize : prefixDict.dictSize;
cctx->consumedSrcSize = 0;
cctx->producedCSize = 0;
cctx->streamStage = zcss_load;
cctx->appliedParams = params;
} else
#endif /* ZSTD_MULTITHREAD */
{ U64 const pledgedSrcSize = cctx->pledgedSrcSizePlusOne - 1;
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
FORWARD_IF_ERROR( ZSTD_compressBegin_internal(cctx,
prefixDict.dict, prefixDict.dictSize, prefixDict.dictContentType, ZSTD_dtlm_fast,
cctx->cdict,
¶ms, pledgedSrcSize,
ZSTDb_buffered) , "");
assert(cctx->appliedParams.nbWorkers == 0);
cctx->inToCompress = 0;
cctx->inBuffPos = 0;
if (cctx->appliedParams.inBufferMode == ZSTD_bm_buffered) {
/* for small input: avoid automatic flush on reaching end of block, since
* it would require to add a 3-bytes null block to end frame
*/
cctx->inBuffTarget = cctx->blockSize + (cctx->blockSize == pledgedSrcSize);
} else {
cctx->inBuffTarget = 0;
}
cctx->outBuffContentSize = cctx->outBuffFlushedSize = 0;
cctx->streamStage = zcss_load;
cctx->frameEnded = 0;
}
return 0;
}
/* @return provides a minimum amount of data remaining to be flushed from internal buffers
*/
size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
ZSTD_outBuffer* output,
ZSTD_inBuffer* input,
ZSTD_EndDirective endOp)
{
DEBUGLOG(5, "ZSTD_compressStream2, endOp=%u ", (unsigned)endOp);
/* check conditions */
RETURN_ERROR_IF(output->pos > output->size, dstSize_tooSmall, "invalid output buffer");
RETURN_ERROR_IF(input->pos > input->size, srcSize_wrong, "invalid input buffer");
RETURN_ERROR_IF((U32)endOp > (U32)ZSTD_e_end, parameter_outOfBound, "invalid endDirective");
assert(cctx != NULL);
/* transparent initialization stage */
if (cctx->streamStage == zcss_init) {
size_t const inputSize = input->size - input->pos; /* no obligation to start from pos==0 */
size_t const totalInputSize = inputSize + cctx->stableIn_notConsumed;
if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */
&& (endOp == ZSTD_e_continue) /* no flush requested, more input to come */
&& (totalInputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */
if (cctx->stableIn_notConsumed) { /* not the first time */
/* check stable source guarantees */
RETURN_ERROR_IF(input->src != cctx->expectedInBuffer.src, stabilityCondition_notRespected, "stableInBuffer condition not respected: wrong src pointer");
RETURN_ERROR_IF(input->pos != cctx->expectedInBuffer.size, stabilityCondition_notRespected, "stableInBuffer condition not respected: externally modified pos");
}
/* pretend input was consumed, to give a sense forward progress */
input->pos = input->size;
/* save stable inBuffer, for later control, and flush/end */
cctx->expectedInBuffer = *input;
/* but actually input wasn't consumed, so keep track of position from where compression shall resume */
cctx->stableIn_notConsumed += inputSize;
/* don't initialize yet, wait for the first block of flush() order, for better parameters adaptation */
return ZSTD_FRAMEHEADERSIZE_MIN(cctx->requestedParams.format); /* at least some header to produce */
}
FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, totalInputSize), "compressStream2 initialization failed");
ZSTD_setBufferExpectations(cctx, output, input); /* Set initial buffer expectations now that we've initialized */
}
/* end of transparent initialization stage */
FORWARD_IF_ERROR(ZSTD_checkBufferStability(cctx, output, input, endOp), "invalid buffers");
/* compression stage */
#ifdef ZSTD_MULTITHREAD
if (cctx->appliedParams.nbWorkers > 0) {
size_t flushMin;
if (cctx->cParamsChanged) {
ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams);
cctx->cParamsChanged = 0;
}
if (cctx->stableIn_notConsumed) {
assert(cctx->appliedParams.inBufferMode == ZSTD_bm_stable);
/* some early data was skipped - make it available for consumption */
assert(input->pos >= cctx->stableIn_notConsumed);
input->pos -= cctx->stableIn_notConsumed;
cctx->stableIn_notConsumed = 0;
}
for (;;) {
size_t const ipos = input->pos;
size_t const opos = output->pos;
flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
cctx->consumedSrcSize += (U64)(input->pos - ipos);
cctx->producedCSize += (U64)(output->pos - opos);
if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
if (flushMin == 0)
ZSTD_CCtx_trace(cctx, 0);
ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only);
}
FORWARD_IF_ERROR(flushMin, "ZSTDMT_compressStream_generic failed");
if (endOp == ZSTD_e_continue) {
/* We only require some progress with ZSTD_e_continue, not maximal progress.
* We're done if we've consumed or produced any bytes, or either buffer is
* full.
*/
if (input->pos != ipos || output->pos != opos || input->pos == input->size || output->pos == output->size)
break;
} else {
assert(endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
/* We require maximal progress. We're done when the flush is complete or the
* output buffer is full.
*/
if (flushMin == 0 || output->pos == output->size)
break;
}
}
DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
/* Either we don't require maximum forward progress, we've finished the
* flush, or we are out of output space.
*/
assert(endOp == ZSTD_e_continue || flushMin == 0 || output->pos == output->size);
ZSTD_setBufferExpectations(cctx, output, input);
return flushMin;
}
#endif /* ZSTD_MULTITHREAD */
FORWARD_IF_ERROR( ZSTD_compressStream_generic(cctx, output, input, endOp) , "");
DEBUGLOG(5, "completed ZSTD_compressStream2");
ZSTD_setBufferExpectations(cctx, output, input);
return cctx->outBuffContentSize - cctx->outBuffFlushedSize; /* remaining to flush */
( run in 0.610 second using v1.01-cache-2.11-cpan-39bf76dae61 )