Compress-Stream-Zstd
view release on metacpan or search on metacpan
ext/zstd/programs/fileio.c view on Meta::CPAN
ZSTD_EndDirective directive = ZSTD_e_continue;
U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
/* stats */
ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
typedef enum { noChange, slower, faster } speedChange_e;
speedChange_e speedChange = noChange;
unsigned flushWaiting = 0;
unsigned inputPresented = 0;
unsigned inputBlocked = 0;
unsigned lastJobID = 0;
UTIL_time_t lastAdaptTime = UTIL_getTime();
U64 const adaptEveryMicro = REFRESH_RATE;
UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);
DISPLAYLEVEL(6, "compression using zstd format \n");
/* init */
if (fileSize != UTIL_FILESIZE_UNKNOWN) {
ext/zstd/programs/fileio.c view on Meta::CPAN
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 );
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
/* count stats */
inputPresented++;
if (oldIPos == inBuff.pos) inputBlocked++; /* input buffer is full and can't take any more : input speed is faster than consumption rate */
if (!toFlushNow) flushWaiting = 1;
/* Write compressed stream */
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
if (outBuff.pos) {
writeJob->usedBufferSize = outBuff.pos;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
compressedfilesize += outBuff.pos;
}
ext/zstd/programs/fileio.c view on Meta::CPAN
lastAdaptTime = UTIL_getTime();
/* check output speed */
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
assert(zfp.produced >= previous_zfp_update.produced);
assert(prefs->nbWorkers >= 1);
/* test if compression is blocked
* either because output is slow and all buffers are full
* or because input is slow and no job can start while waiting for at least one buffer to be filled.
* note : exclude starting part, since currentJobID > 1 */
if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */
) {
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
speedChange = slower;
}
ext/zstd/programs/fileio.c view on Meta::CPAN
}
flushWaiting = 0;
}
/* course correct only if there is at least one new job completed */
if (zfp.currentJobID > lastJobID) {
DISPLAYLEVEL(6, "compression level adaptation check \n")
/* check input speed */
if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */
if (inputBlocked <= 0) {
DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
speedChange = slower;
} else if (speedChange == noChange) {
unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
previous_zfp_correction = zfp;
assert(inputPresented > 0);
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
(unsigned)newlyIngested, (unsigned)newlyConsumed,
(unsigned)newlyFlushed, (unsigned)newlyProduced);
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
&& (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
&& (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
) {
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
speedChange = faster;
}
}
inputBlocked = 0;
inputPresented = 0;
}
if (speedChange == slower) {
DISPLAYLEVEL(6, "slower speed , higher compression \n")
compressionLevel ++;
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
compressionLevel += (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
ext/zstd/programs/fileio_asyncio.c view on Meta::CPAN
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
AIO_IOPool_lockJobsMutex(&ctx->base);
assert(ctx->completedJobsCount < MAX_IO_JOBS);
ctx->completedJobs[ctx->completedJobsCount++] = job;
if(AIO_IOPool_threadPoolActive(&ctx->base)) {
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
}
AIO_IOPool_unlockJobsMutex(&ctx->base);
}
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
* if job wasn't found returns NULL.
* IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
int i;
/* This implementation goes through all completed jobs and looks for the one matching the next offset.
* While not strictly needed for a single threaded reader implementation (as in such a case we could expect
* reads to be completed in order) this implementation was chosen as it better fits other asyncio
* interfaces (such as io_uring) that do not provide promises regarding order of completion. */
for (i=0; i<ctx->completedJobsCount; i++) {
job = (IOJob_t *) ctx->completedJobs[i];
if (job->offset == ctx->waitingOnOffset) {
ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
ext/zstd/programs/fileio_asyncio.c view on Meta::CPAN
return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}
/* AIO_ReadPool_getNextCompletedJob:
* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
* Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
AIO_IOPool_lockJobsMutex(&ctx->base);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
/* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
}
if(job) {
assert(job->offset == ctx->waitingOnOffset);
ctx->waitingOnOffset += job->usedBufferSize;
}
AIO_IOPool_unlockJobsMutex(&ctx->base);
return job;
}
( run in 0.640 second using v1.01-cache-2.11-cpan-49f99fa48dc )