Sereal-Encoder
view release on metacpan or search on metacpan
zstd/compress/zstdmt_compress.c view on Meta::CPAN
serialState->nextJobID = jobID + 1;
ZSTD_pthread_cond_broadcast(&serialState->cond);
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);
ZSTD_window_clear(&serialState->ldmWindow);
ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);
ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);
}
ZSTD_pthread_mutex_unlock(&serialState->mutex);
}
/* ------------------------------------------ */
/* ===== Worker thread ===== */
/* ------------------------------------------ */
static const range_t kNullRange = { NULL, 0 };
typedef struct {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */
ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */
range_t src; /* set by mtctx, then read by worker & mtctx => no barrier */
unsigned jobID; /* set by mtctx, then read by worker => no barrier */
unsigned firstJob; /* set by mtctx, then read by worker => no barrier */
unsigned lastJob; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */
unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */
size_t dstFlushed; /* used only by mtctx */
unsigned frameChecksumNeeded; /* used only by mtctx */
} ZSTDMT_jobDescription;
#define JOB_ERROR(e) { \
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \
job->cSize = e; \
ZSTD_pthread_mutex_unlock(&job->job_mutex); \
goto _endJob; \
}
/* ZSTDMT_compressionJob() is a POOL_function type */
static void ZSTDMT_compressionJob(void* jobDescription)
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
buffer_t dstBuff = job->dstBuff;
size_t lastCBlockSize = 0;
/* resources */
if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */
dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
}
if (jobParams.ldmParams.enableLdm == ZSTD_ps_enable && rawSeqStore.seq == NULL)
JOB_ERROR(ERROR(memory_allocation));
/* Don't compute the checksum for chunks, since we compute it externally,
* but write it in the header.
*/
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
/* Don't run LDM for the chunks, since we handle it externally */
jobParams.ldmParams.enableLdm = ZSTD_ps_disable;
/* Correct nbWorkers to 0. */
jobParams.nbWorkers = 0;
/* init */
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);
assert(job->firstJob); /* only allowed for first job */
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
{ size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
}
if (!job->firstJob) {
size_t const err = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_deterministicRefPrefix, 0);
if (ZSTD_isError(err)) JOB_ERROR(err);
}
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
ZSTD_dtlm_fast,
NULL, /*cdict*/
&jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} }
/* Perform serial step as early as possible, but after CCtx initialization */
ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
if (ZSTD_isError(hSize)) JOB_ERROR(hSize);
DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
ZSTD_invalidateRepCodes(cctx);
}
/* compress */
{ size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX;
int const nbChunks = (int)((job->src.size + (chunkSize-1)) / chunkSize);
const BYTE* ip = (const BYTE*) job->src.start;
BYTE* const ostart = (BYTE*)dstBuff.start;
BYTE* op = ostart;
BYTE* oend = op + dstBuff.capacity;
int chunkNb;
if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize); /* check overflow */
DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks);
assert(job->cSize == 0);
zstd/compress/zstdmt_compress.c view on Meta::CPAN
U32 const jobSizeKB = (U32)(mtctx->targetSectionSize >> 10);
U32 const rsyncBits = (assert(jobSizeKB >= 1), ZSTD_highbit32(jobSizeKB) + 10);
/* We refuse to create jobs < RSYNC_MIN_BLOCK_SIZE bytes, so make sure our
* expected job size is at least 4x larger. */
assert(rsyncBits >= RSYNC_MIN_BLOCK_LOG + 2);
DEBUGLOG(4, "rsyncLog = %u", rsyncBits);
mtctx->rsync.hash = 0;
mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
}
if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)params.jobSize);
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
{
/* If ldm is enabled we need windowSize space. */
size_t const windowSize = mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->params.cParams.windowLog) : 0;
/* Two buffers of slack, plus extra space for the overlap
* This is the minimum slack that LDM works with. One extra because
* flush might waste up to targetSectionSize-1 bytes. Another extra
* for the overlap (if > 0), then one to fill which doesn't overlap
* with the LDM window.
*/
size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;
/* Compute the total size, and always have enough slack */
size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;
size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
if (mtctx->roundBuff.capacity < capacity) {
if (mtctx->roundBuff.buffer)
ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem);
mtctx->roundBuff.buffer = (BYTE*)ZSTD_customMalloc(capacity, mtctx->cMem);
if (mtctx->roundBuff.buffer == NULL) {
mtctx->roundBuff.capacity = 0;
return ERROR(memory_allocation);
}
mtctx->roundBuff.capacity = capacity;
}
}
DEBUGLOG(4, "roundBuff capacity : %u KB", (U32)(mtctx->roundBuff.capacity>>10));
mtctx->roundBuff.pos = 0;
mtctx->inBuff.buffer = g_nullBuffer;
mtctx->inBuff.filled = 0;
mtctx->inBuff.prefix = kNullRange;
mtctx->doneJobID = 0;
mtctx->nextJobID = 0;
mtctx->frameEnded = 0;
mtctx->allJobsCompleted = 0;
mtctx->consumed = 0;
mtctx->produced = 0;
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize,
dict, dictSize, dictContentType))
return ERROR(memory_allocation);
return 0;
}
/* ZSTDMT_writeLastEmptyBlock()
* Write a single empty block with an end-of-frame to finish a frame.
* Job must be created from streaming variant.
* This function is always successful if expected conditions are fulfilled.
*/
static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
assert(job->lastJob == 1);
assert(job->src.size == 0); /* last job is empty -> will be simplified into a last empty block */
assert(job->firstJob == 0); /* cannot be first job, as it also needs to create frame header */
assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
job->dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (job->dstBuff.start == NULL) {
job->cSize = ERROR(memory_allocation);
return;
}
assert(job->dstBuff.capacity >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */
job->src = kNullRange;
job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.capacity);
assert(!ZSTD_isError(job->cSize));
assert(job->consumed == 0);
}
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{
unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
int const endFrame = (endOp == ZSTD_e_end);
if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask));
return 0;
}
if (!mtctx->jobReady) {
BYTE const* src = (BYTE const*)mtctx->inBuff.buffer.start;
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefix.size);
mtctx->jobs[jobID].src.start = src;
mtctx->jobs[jobID].src.size = srcSize;
assert(mtctx->inBuff.filled >= srcSize);
mtctx->jobs[jobID].prefix = mtctx->inBuff.prefix;
mtctx->jobs[jobID].consumed = 0;
mtctx->jobs[jobID].cSize = 0;
mtctx->jobs[jobID].params = mtctx->params;
mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL;
mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize;
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
mtctx->jobs[jobID].bufPool = mtctx->bufPool;
mtctx->jobs[jobID].seqPool = mtctx->seqPool;
mtctx->jobs[jobID].serial = &mtctx->serial;
mtctx->jobs[jobID].jobID = mtctx->nextJobID;
mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
mtctx->jobs[jobID].lastJob = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
mtctx->jobs[jobID].dstFlushed = 0;
/* Update the round buffer pos and clear the input buffer to be reset */
mtctx->roundBuff.pos += srcSize;
mtctx->inBuff.buffer = g_nullBuffer;
mtctx->inBuff.filled = 0;
/* Set the prefix */
if (!endFrame) {
size_t const newPrefixSize = MIN(srcSize, mtctx->targetPrefixSize);
mtctx->inBuff.prefix.start = src + srcSize - newPrefixSize;
mtctx->inBuff.prefix.size = newPrefixSize;
} else { /* endFrame==1 => no need for another input buffer */
mtctx->inBuff.prefix = kNullRange;
mtctx->frameEnded = endFrame;
if (mtctx->nextJobID == 0) {
( run in 0.879 second using v1.01-cache-2.11-cpan-39bf76dae61 )