Compress-Stream-Zstd

 view release on metacpan or  search on metacpan

ext/zstd/programs/fileio.c  view on Meta::CPAN

    ZSTD_EndDirective directive = ZSTD_e_continue;
    U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;

    /* stats */
    ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
    ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
    typedef enum { noChange, slower, faster } speedChange_e;
    speedChange_e speedChange = noChange;
    unsigned flushWaiting = 0;
    unsigned inputPresented = 0;
    unsigned inputBlocked = 0;
    unsigned lastJobID = 0;
    UTIL_time_t lastAdaptTime = UTIL_getTime();
    U64 const adaptEveryMicro = REFRESH_RATE;

    UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);

    DISPLAYLEVEL(6, "compression using zstd format \n");

    /* init */
    if (fileSize != UTIL_FILESIZE_UNKNOWN) {

ext/zstd/programs/fileio.c  view on Meta::CPAN

            || (directive == ZSTD_e_end && stillToFlush != 0) ) {

            size_t const oldIPos = inBuff.pos;
            ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 );
            size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
            CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
            AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);

            /* count stats */
            inputPresented++;
            if (oldIPos == inBuff.pos) inputBlocked++;  /* input buffer is full and can't take any more : input speed is faster than consumption rate */
            if (!toFlushNow) flushWaiting = 1;

            /* Write compressed stream */
            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
                         (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
            if (outBuff.pos) {
                writeJob->usedBufferSize = outBuff.pos;
                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                compressedfilesize += outBuff.pos;
            }

ext/zstd/programs/fileio.c  view on Meta::CPAN

                lastAdaptTime = UTIL_getTime();

                /* check output speed */
                if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */

                    unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
                    unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
                    assert(zfp.produced >= previous_zfp_update.produced);
                    assert(prefs->nbWorkers >= 1);

                    /* test if compression is blocked
                        * either because output is slow and all buffers are full
                        * or because input is slow and no job can start while waiting for at least one buffer to be filled.
                        * note : exclude starting part, since currentJobID > 1 */
                    if ( (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
                        && (zfp.nbActiveWorkers == 0)                       /* confirmed : no compression ongoing */
                        ) {
                        DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
                        speedChange = slower;
                    }

ext/zstd/programs/fileio.c  view on Meta::CPAN

                    }
                    flushWaiting = 0;
                }

                /* course correct only if there is at least one new job completed */
                if (zfp.currentJobID > lastJobID) {
                    DISPLAYLEVEL(6, "compression level adaptation check \n")

                    /* check input speed */
                    if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) {   /* warm up period, to fill all workers */
                        if (inputBlocked <= 0) {
                            DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
                            speedChange = slower;
                        } else if (speedChange == noChange) {
                            unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
                            unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
                            unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
                            unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
                            previous_zfp_correction = zfp;
                            assert(inputPresented > 0);
                            DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
                                            inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
                                            (unsigned)newlyIngested, (unsigned)newlyConsumed,
                                            (unsigned)newlyFlushed, (unsigned)newlyProduced);
                            if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
                                && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
                                && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
                            ) {
                                DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
                                                newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
                                speedChange = faster;
                            }
                        }
                        inputBlocked = 0;
                        inputPresented = 0;
                    }

                    if (speedChange == slower) {
                        DISPLAYLEVEL(6, "slower speed , higher compression \n")
                        compressionLevel ++;
                        if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
                        if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
                        compressionLevel += (compressionLevel == 0);   /* skip 0 */
                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);

ext/zstd/programs/fileio_asyncio.c  view on Meta::CPAN

    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
    AIO_IOPool_lockJobsMutex(&ctx->base);
    assert(ctx->completedJobsCount < MAX_IO_JOBS);
    ctx->completedJobs[ctx->completedJobsCount++] = job;
    if(AIO_IOPool_threadPoolActive(&ctx->base)) {
        ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
    }
    AIO_IOPool_unlockJobsMutex(&ctx->base);
}

/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
 * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
 * if job wasn't found returns NULL.
 * IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
    IOJob_t *job = NULL;
    int i;
    /* This implementation goes through all completed jobs and looks for the one matching the next offset.
     * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
     * reads to be completed in order) this implementation was chosen as it better fits other asyncio
     * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
    for (i=0; i<ctx->completedJobsCount; i++) {
        job = (IOJob_t *) ctx->completedJobs[i];
        if (job->offset == ctx->waitingOnOffset) {
            ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];

ext/zstd/programs/fileio_asyncio.c  view on Meta::CPAN

    return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}

/* AIO_ReadPool_getNextCompletedJob:
 * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
 * Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
    IOJob_t *job = NULL;
    AIO_IOPool_lockJobsMutex(&ctx->base);

    job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);

    /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
    while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
        assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
        ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
        job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
    }

    if(job) {
        assert(job->offset == ctx->waitingOnOffset);
        ctx->waitingOnOffset += job->usedBufferSize;
    }

    AIO_IOPool_unlockJobsMutex(&ctx->base);
    return job;
}



( run in 0.640 second using v1.01-cache-2.11-cpan-49f99fa48dc )