Compress-Stream-Zstd

 view release on metacpan or  search on metacpan

ext/zstd/programs/fileio.c  view on Meta::CPAN

            if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
                DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%",
                                (unsigned)(inFileSize>>20),
                                (double)outFileSize/(double)inFileSize*100)
            } else {
                DISPLAYUPDATE_PROGRESS("\rRead : %u / %u MB ==> %.2f%%",
                                (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
                                (double)outFileSize/(double)inFileSize*100);
            }

            /* Write Block */
            writeJob->usedBufferSize = outSize;
            AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);

            /* Read next block */
            AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
            inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
        }

        /* End of Stream mark */
        headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
        if (LZ4F_isError(headerSize))
            EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
                        srcFileName, LZ4F_getErrorName(headerSize));

        writeJob->usedBufferSize = headerSize;
        AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
        outFileSize += headerSize;
    }

    *readsize = inFileSize;
    LZ4F_freeCompressionContext(ctx);
    AIO_WritePool_releaseIoJob(writeJob);
    AIO_WritePool_sparseWriteEnd(ress->writeCtx);

    return outFileSize;
}
#endif

static unsigned long long
FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
                      FIO_prefs_t* const prefs,
                      const cRess_t* ressPtr,
                      const char* srcFileName, U64 fileSize,
                      int compressionLevel, U64* readsize)
{
    cRess_t const ress = *ressPtr;
    IOJob_t *writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);

    U64 compressedfilesize = 0;
    ZSTD_EndDirective directive = ZSTD_e_continue;
    U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;

    /* stats */
    ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
    ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
    typedef enum { noChange, slower, faster } speedChange_e;
    speedChange_e speedChange = noChange;
    unsigned flushWaiting = 0;
    unsigned inputPresented = 0;
    unsigned inputBlocked = 0;
    unsigned lastJobID = 0;
    UTIL_time_t lastAdaptTime = UTIL_getTime();
    U64 const adaptEveryMicro = REFRESH_RATE;

    UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);

    DISPLAYLEVEL(6, "compression using zstd format \n");

    /* init */
    if (fileSize != UTIL_FILESIZE_UNKNOWN) {
        pledgedSrcSize = fileSize;
        CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
    } else if (prefs->streamSrcSize > 0) {
      /* unknown source size; use the declared stream size */
      pledgedSrcSize = prefs->streamSrcSize;
      CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, prefs->streamSrcSize) );
    }

    {
        int windowLog;
        UTIL_HumanReadableSize_t windowSize;
        CHECK(ZSTD_CCtx_getParameter(ress.cctx, ZSTD_c_windowLog, &windowLog));
        if (windowLog == 0) {
            if (prefs->ldmFlag) {
                /* If long mode is set without a window size libzstd will set this size internally */
                windowLog = ZSTD_WINDOWLOG_LIMIT_DEFAULT;
            } else {
                const ZSTD_compressionParameters cParams = ZSTD_getCParams(compressionLevel, fileSize, 0);
                windowLog = (int)cParams.windowLog;
            }
        }
        windowSize = UTIL_makeHumanReadableSize(MAX(1ULL, MIN(1ULL << windowLog, pledgedSrcSize)));
        DISPLAYLEVEL(4, "Decompression will require %.*f%s of memory\n", windowSize.precision, windowSize.value, windowSize.suffix);
    }
    (void)srcFileName;

    /* Main compression loop */
    do {
        size_t stillToFlush;
        /* Fill input Buffer */
        size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
        ZSTD_inBuffer inBuff = setInBuffer( ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 );
        DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
        *readsize += inSize;

        if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
            directive = ZSTD_e_end;

        stillToFlush = 1;
        while ((inBuff.pos != inBuff.size)   /* input buffer must be entirely ingested */
            || (directive == ZSTD_e_end && stillToFlush != 0) ) {

            size_t const oldIPos = inBuff.pos;
            ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 );
            size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
            CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
            AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);

            /* count stats */
            inputPresented++;
            if (oldIPos == inBuff.pos) inputBlocked++;  /* input buffer is full and can't take any more : input speed is faster than consumption rate */
            if (!toFlushNow) flushWaiting = 1;

            /* Write compressed stream */
            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
                         (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
            if (outBuff.pos) {
                writeJob->usedBufferSize = outBuff.pos;
                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                compressedfilesize += outBuff.pos;
            }

            /* adaptive mode : statistics measurement and speed correction */
            if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
                ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);

                lastAdaptTime = UTIL_getTime();

                /* check output speed */
                if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */

                    unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
                    unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
                    assert(zfp.produced >= previous_zfp_update.produced);
                    assert(prefs->nbWorkers >= 1);

                    /* test if compression is blocked
                        * either because output is slow and all buffers are full
                        * or because input is slow and no job can start while waiting for at least one buffer to be filled.
                        * note : exclude starting part, since currentJobID > 1 */
                    if ( (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
                        && (zfp.nbActiveWorkers == 0)                       /* confirmed : no compression ongoing */
                        ) {
                        DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
                        speedChange = slower;
                    }

                    previous_zfp_update = zfp;

                    if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
                        && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
                        ) {
                        DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
                        speedChange = slower;
                    }
                    flushWaiting = 0;
                }

                /* course correct only if there is at least one new job completed */
                if (zfp.currentJobID > lastJobID) {
                    DISPLAYLEVEL(6, "compression level adaptation check \n")

                    /* check input speed */
                    if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) {   /* warm up period, to fill all workers */
                        if (inputBlocked <= 0) {
                            DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
                            speedChange = slower;
                        } else if (speedChange == noChange) {
                            unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
                            unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
                            unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
                            unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
                            previous_zfp_correction = zfp;
                            assert(inputPresented > 0);
                            DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
                                            inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
                                            (unsigned)newlyIngested, (unsigned)newlyConsumed,
                                            (unsigned)newlyFlushed, (unsigned)newlyProduced);
                            if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
                                && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
                                && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
                            ) {
                                DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
                                                newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
                                speedChange = faster;
                            }
                        }
                        inputBlocked = 0;
                        inputPresented = 0;
                    }

                    if (speedChange == slower) {
                        DISPLAYLEVEL(6, "slower speed , higher compression \n")
                        compressionLevel ++;
                        if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
                        if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
                        compressionLevel += (compressionLevel == 0);   /* skip 0 */
                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
                    }
                    if (speedChange == faster) {
                        DISPLAYLEVEL(6, "faster speed , lighter compression \n")
                        compressionLevel --;
                        if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
                        compressionLevel -= (compressionLevel == 0);   /* skip 0 */
                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
                    }
                    speedChange = noChange;

                    lastJobID = zfp.currentJobID;
                }  /* if (zfp.currentJobID > lastJobID) */
            } /* if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) */

            /* display notification */
            if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
                ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
                double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
                UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
                UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
                UTIL_HumanReadableSize_t const produced_hrs = UTIL_makeHumanReadableSize(zfp.produced);

                DELAY_NEXT_UPDATE();

                /* display progress notifications */
                DISPLAY_PROGRESS("\r%79s\r", "");    /* Clear out the current displayed line */
                if (g_display_prefs.displayLevel >= 3) {
                    /* Verbose progress update */
                    DISPLAY_PROGRESS(
                            "(L%i) Buffered:%5.*f%s - Consumed:%5.*f%s - Compressed:%5.*f%s => %.2f%% ",
                            compressionLevel,
                            buffered_hrs.precision, buffered_hrs.value, buffered_hrs.suffix,
                            consumed_hrs.precision, consumed_hrs.value, consumed_hrs.suffix,
                            produced_hrs.precision, produced_hrs.value, produced_hrs.suffix,
                            cShare );
                } else {
                    /* Require level 2 or forcibly displayed progress counter for summarized updates */
                    if (fCtx->nbFilesTotal > 1) {
                        size_t srcFileNameSize = strlen(srcFileName);
                        /* Ensure that the string we print is roughly the same size each time */
                        if (srcFileNameSize > 18) {
                            const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15;
                            DISPLAY_PROGRESS("Compress: %u/%u files. Current: ...%s ",
                                        fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName);
                        } else {
                            DISPLAY_PROGRESS("Compress: %u/%u files. Current: %*s ",
                                        fCtx->currFileIdx+1, fCtx->nbFilesTotal, (int)(18-srcFileNameSize), srcFileName);
                        }
                    }
                    DISPLAY_PROGRESS("Read:%6.*f%4s ", consumed_hrs.precision, consumed_hrs.value, consumed_hrs.suffix);



( run in 1.816 second using v1.01-cache-2.11-cpan-39bf76dae61 )