Compress-Stream-Zstd

 view release on metacpan or  search on metacpan

ext/zstd/programs/fileio.c  view on Meta::CPAN

    ZSTD_EndDirective directive = ZSTD_e_continue;
    U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;

    /* stats */
    ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
    ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
    typedef enum { noChange, slower, faster } speedChange_e;
    speedChange_e speedChange = noChange;
    unsigned flushWaiting = 0;
    unsigned inputPresented = 0;
    unsigned inputBlocked = 0;
    unsigned lastJobID = 0;
    UTIL_time_t lastAdaptTime = UTIL_getTime();
    U64 const adaptEveryMicro = REFRESH_RATE;

    UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);

    DISPLAYLEVEL(6, "compression using zstd format \n");

    /* init */
    if (fileSize != UTIL_FILESIZE_UNKNOWN) {

ext/zstd/programs/fileio.c  view on Meta::CPAN

            || (directive == ZSTD_e_end && stillToFlush != 0) ) {

            size_t const oldIPos = inBuff.pos;
            ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 );
            size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
            CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
            AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);

            /* count stats */
            inputPresented++;
            if (oldIPos == inBuff.pos) inputBlocked++;  /* input buffer is full and can't take any more : input speed is faster than consumption rate */
            if (!toFlushNow) flushWaiting = 1;

            /* Write compressed stream */
            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
                         (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
            if (outBuff.pos) {
                writeJob->usedBufferSize = outBuff.pos;
                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                compressedfilesize += outBuff.pos;
            }

ext/zstd/programs/fileio.c  view on Meta::CPAN

                lastAdaptTime = UTIL_getTime();

                /* check output speed */
                if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */

                    unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
                    unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
                    assert(zfp.produced >= previous_zfp_update.produced);
                    assert(prefs->nbWorkers >= 1);

                    /* test if compression is blocked
                        * either because output is slow and all buffers are full
                        * or because input is slow and no job can start while waiting for at least one buffer to be filled.
                        * note : exclude starting part, since currentJobID > 1 */
                    if ( (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
                        && (zfp.nbActiveWorkers == 0)                       /* confirmed : no compression ongoing */
                        ) {
                        DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
                        speedChange = slower;
                    }

ext/zstd/programs/fileio.c  view on Meta::CPAN

                    }
                    flushWaiting = 0;
                }

                /* course correct only if there is at least one new job completed */
                if (zfp.currentJobID > lastJobID) {
                    DISPLAYLEVEL(6, "compression level adaptation check \n")

                    /* check input speed */
                    if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) {   /* warm up period, to fill all workers */
                        if (inputBlocked <= 0) {
                            DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
                            speedChange = slower;
                        } else if (speedChange == noChange) {
                            unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
                            unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
                            unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
                            unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
                            previous_zfp_correction = zfp;
                            assert(inputPresented > 0);
                            DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
                                            inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
                                            (unsigned)newlyIngested, (unsigned)newlyConsumed,
                                            (unsigned)newlyFlushed, (unsigned)newlyProduced);
                            if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
                                && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
                                && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
                            ) {
                                DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
                                                newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
                                speedChange = faster;
                            }
                        }
                        inputBlocked = 0;
                        inputPresented = 0;
                    }

                    if (speedChange == slower) {
                        DISPLAYLEVEL(6, "slower speed , higher compression \n")
                        compressionLevel ++;
                        if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
                        if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
                        compressionLevel += (compressionLevel == 0);   /* skip 0 */
                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);

ext/zstd/programs/fileio_asyncio.c  view on Meta::CPAN

    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
    AIO_IOPool_lockJobsMutex(&ctx->base);
    assert(ctx->completedJobsCount < MAX_IO_JOBS);
    ctx->completedJobs[ctx->completedJobsCount++] = job;
    if(AIO_IOPool_threadPoolActive(&ctx->base)) {
        ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
    }
    AIO_IOPool_unlockJobsMutex(&ctx->base);
}

/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
 * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
 * if job wasn't found returns NULL.
 * IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
    IOJob_t *job = NULL;
    int i;
    /* This implementation goes through all completed jobs and looks for the one matching the next offset.
     * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
     * reads to be completed in order) this implementation was chosen as it better fits other asyncio
     * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
    for (i=0; i<ctx->completedJobsCount; i++) {
        job = (IOJob_t *) ctx->completedJobs[i];
        if (job->offset == ctx->waitingOnOffset) {
            ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];

ext/zstd/programs/fileio_asyncio.c  view on Meta::CPAN

    return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}

/* AIO_ReadPool_getNextCompletedJob:
 * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
 * Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
    IOJob_t *job = NULL;
    AIO_IOPool_lockJobsMutex(&ctx->base);

    job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);

    /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
    while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
        assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
        ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
        job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
    }

    if(job) {
        assert(job->offset == ctx->waitingOnOffset);
        ctx->waitingOnOffset += job->usedBufferSize;
    }

    AIO_IOPool_unlockJobsMutex(&ctx->base);
    return job;
}

ppport.h  view on Meta::CPAN

get_and_check_backslash_N_name_wrapper|5.029009||Viu
get_ANYOF_cp_list_for_ssc|5.019005||Viu
get_ANYOFM_contents|5.027009||Viu
GETATARGET|5.003007||Viu
get_aux_mg|5.011000||Viu
get_av|5.006000|5.003007|p
getc|5.003007||Viu
get_c_backtrace|5.021001||Vi
get_c_backtrace_dump|5.021001||V
get_context|5.006000|5.006000|nu
getc_unlocked|5.003007||Viu
get_cv|5.006000|5.003007|p
get_cvn_flags|5.009005|5.003007|p
get_cvs|5.011000|5.003007|p
getcwd_sv|5.007002|5.007002|
get_db_sub|||iu
get_debug_opts|5.008001||Viu
get_deprecated_property_msg|5.031011||cVniu
getegid|5.005000||Viu
getenv|5.005000||Viu
getenv_len|5.006000||Viu

ppport.h  view on Meta::CPAN

PERL_MAGIC_vec|5.007002|5.003007|p
PERL_MAGIC_vstring|5.008001|5.003007|p
PERL_MAGIC_VTABLE_MASK|5.015000||Viu
PERL_MALLOC_CTL_H|5.027001||Viu
Perl_malloc_good_size|5.010001||Viu
PERL_MALLOC_WRAP|5.009002|5.009002|Vn
PerlMem_calloc|5.006000||Viu
PerlMem_free|5.005000||Viu
PerlMem_free_lock|5.006000||Viu
PerlMem_get_lock|5.006000||Viu
PerlMem_is_locked|5.006000||Viu
PerlMem_malloc|5.005000||Viu
PERL_MEMORY_DEBUG_HEADER_SIZE|5.019009||Viu
PerlMemParse_calloc|5.006000||Viu
PerlMemParse_free|5.006000||Viu
PerlMemParse_free_lock|5.006000||Viu
PerlMemParse_get_lock|5.006000||Viu
PerlMemParse_is_locked|5.006000||Viu
PerlMemParse_malloc|5.006000||Viu
PerlMemParse_realloc|5.006000||Viu
PerlMem_realloc|5.005000||Viu
PerlMemShared_calloc|5.006000||Viu
PerlMemShared_free|5.006000||Viu
PerlMemShared_free_lock|5.006000||Viu
PerlMemShared_get_lock|5.006000||Viu
PerlMemShared_is_locked|5.006000||Viu
PerlMemShared_malloc|5.006000||Viu
PerlMemShared_realloc|5.006000||Viu
PERL_MG_UFUNC|5.007001||Viu
Perl_modf|5.006000|5.006000|n
PERL_MULTICONCAT_HEADER_SIZE|5.027006||Viu
PERL_MULTICONCAT_IX_LENGTHS|5.027006||Viu
PERL_MULTICONCAT_IX_NARGS|5.027006||Viu
PERL_MULTICONCAT_IX_PLAIN_LEN|5.027006||Viu
PERL_MULTICONCAT_IX_PLAIN_PV|5.027006||Viu
PERL_MULTICONCAT_IX_UTF8_LEN|5.027006||Viu

ppport.h  view on Meta::CPAN

PUSHSTACKi|5.005000||Viu
PUSHSTACK_INIT_HWM|5.027002||Viu
PUSHTARG|5.003007||Viu
PUSHu|5.004000|5.003007|p
PUTBACK|5.003007|5.003007|
putc|5.003007||Viu
put_charclass_bitmap_innards|5.021004||Viu
put_charclass_bitmap_innards_common|5.023008||Viu
put_charclass_bitmap_innards_invlist|5.023008||Viu
put_code_point|5.021004||Viu
putc_unlocked|5.003007||Viu
putenv|5.005000||Viu
put_range|5.019009||Viu
putw|5.003007||Viu
pv_display|5.006000|5.003007|p
pv_escape|5.009004|5.003007|p
pv_pretty|5.009004|5.003007|p
pv_uni_display|5.007003|5.007003|
pWARN_ALL|5.006000||Viu
pWARN_NONE|5.006000||Viu
pWARN_STD|5.006000||Viu



( run in 0.865 second using v1.01-cache-2.11-cpan-39bf76dae61 )