Compress-Zstd

 view release on metacpan or  search on metacpan

ext/zstd/programs/fileio.c  view on Meta::CPAN

    U64 compressedfilesize = 0;
    ZSTD_EndDirective directive = ZSTD_e_continue;

    /* stats */
    ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
    ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
    typedef enum { noChange, slower, faster } speedChange_e;
    speedChange_e speedChange = noChange;
    unsigned flushWaiting = 0;
    unsigned inputPresented = 0;
    unsigned inputBlocked = 0;
    unsigned lastJobID = 0;

    DISPLAYLEVEL(6, "compression using zstd format \n");

    /* init */
    if (fileSize != UTIL_FILESIZE_UNKNOWN) {
        CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
    }
    (void)srcFileName;

ext/zstd/programs/fileio.c  view on Meta::CPAN

        while ((inBuff.pos != inBuff.size)   /* input buffer must be entirely ingested */
            || (directive == ZSTD_e_end && stillToFlush != 0) ) {

            size_t const oldIPos = inBuff.pos;
            ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
            size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
            CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));

            /* count stats */
            inputPresented++;
            if (oldIPos == inBuff.pos) inputBlocked++;  /* input buffer is full and can't take any more : input speed is faster than consumption rate */
            if (!toFlushNow) flushWaiting = 1;

            /* Write compressed stream */
            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
                            (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
            if (outBuff.pos) {
                size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
                if (sizeCheck != outBuff.pos)
                    EXM_THROW(25, "Write error : %s (cannot write compressed block)",
                                    strerror(errno));

ext/zstd/programs/fileio.c  view on Meta::CPAN

                if (prefs->adaptiveMode) {

                    /* check output speed */
                    if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */

                        unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
                        unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
                        assert(zfp.produced >= previous_zfp_update.produced);
                        assert(prefs->nbWorkers >= 1);

                        /* test if compression is blocked
                         * either because output is slow and all buffers are full
                         * or because input is slow and no job can start while waiting for at least one buffer to be filled.
                         * note : exclude starting part, since currentJobID > 1 */
                        if ( (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
                          && (zfp.nbActiveWorkers == 0)                       /* confirmed : no compression ongoing */
                          ) {
                            DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
                            speedChange = slower;
                        }

ext/zstd/programs/fileio.c  view on Meta::CPAN

                        }
                        flushWaiting = 0;
                    }

                    /* course correct only if there is at least one new job completed */
                    if (zfp.currentJobID > lastJobID) {
                        DISPLAYLEVEL(6, "compression level adaptation check \n")

                        /* check input speed */
                        if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) {   /* warm up period, to fill all workers */
                            if (inputBlocked <= 0) {
                                DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
                                speedChange = slower;
                            } else if (speedChange == noChange) {
                                unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
                                unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
                                unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
                                unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
                                previous_zfp_correction = zfp;
                                assert(inputPresented > 0);
                                DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
                                                inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
                                                (unsigned)newlyIngested, (unsigned)newlyConsumed,
                                                (unsigned)newlyFlushed, (unsigned)newlyProduced);
                                if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
                                  && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
                                  && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
                                ) {
                                    DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
                                                    newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
                                    speedChange = faster;
                                }
                            }
                            inputBlocked = 0;
                            inputPresented = 0;
                        }

                        if (speedChange == slower) {
                            DISPLAYLEVEL(6, "slower speed , higher compression \n")
                            compressionLevel ++;
                            if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
                            if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
                            compressionLevel += (compressionLevel == 0);   /* skip 0 */
                            ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);



( run in 0.464 second using v1.01-cache-2.11-cpan-49f99fa48dc )