Compress-Zstd

 view release on metacpan or  search on metacpan

ext/zstd/programs/fileio.c  view on Meta::CPAN

                                (double)outFileSize/inFileSize*100)
            } else {
                DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%%",
                                (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
                                (double)outFileSize/inFileSize*100);
            }

            /* Write Block */
            {   size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile);
                if (sizeCheck != outSize)
                    EXM_THROW(36, "Write error : %s", strerror(errno));
            }

            /* Read next block */
            readSize  = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
            inFileSize += readSize;
        }
        if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName);

        /* End of Stream mark */
        headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL);
        if (LZ4F_isError(headerSize))
            EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
                        srcFileName, LZ4F_getErrorName(headerSize));

        {   size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile);
            if (sizeCheck != headerSize)
                EXM_THROW(39, "Write error : %s (cannot write end of stream)",
                            strerror(errno));
        }
        outFileSize += headerSize;
    }

    *readsize = inFileSize;
    LZ4F_freeCompressionContext(ctx);

    return outFileSize;
}
#endif


static unsigned long long
FIO_compressZstdFrame(FIO_prefs_t* const prefs,
                      const cRess_t* ressPtr,
                      const char* srcFileName, U64 fileSize,
                      int compressionLevel, U64* readsize)
{
    cRess_t const ress = *ressPtr;
    FILE* const srcFile = ress.srcFile;
    FILE* const dstFile = ress.dstFile;
    U64 compressedfilesize = 0;
    ZSTD_EndDirective directive = ZSTD_e_continue;

    /* stats */
    ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
    ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
    typedef enum { noChange, slower, faster } speedChange_e;
    speedChange_e speedChange = noChange;
    unsigned flushWaiting = 0;
    unsigned inputPresented = 0;
    unsigned inputBlocked = 0;
    unsigned lastJobID = 0;

    DISPLAYLEVEL(6, "compression using zstd format \n");

    /* init */
    if (fileSize != UTIL_FILESIZE_UNKNOWN) {
        CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
    }
    (void)srcFileName;

    /* Main compression loop */
    do {
        size_t stillToFlush;
        /* Fill input Buffer */
        size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
        ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
        DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
        *readsize += inSize;

        if ((inSize == 0) || (*readsize == fileSize))
            directive = ZSTD_e_end;

        stillToFlush = 1;
        while ((inBuff.pos != inBuff.size)   /* input buffer must be entirely ingested */
            || (directive == ZSTD_e_end && stillToFlush != 0) ) {

            size_t const oldIPos = inBuff.pos;
            ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
            size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
            CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));

            /* count stats */
            inputPresented++;
            if (oldIPos == inBuff.pos) inputBlocked++;  /* input buffer is full and can't take any more : input speed is faster than consumption rate */
            if (!toFlushNow) flushWaiting = 1;

            /* Write compressed stream */
            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
                            (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
            if (outBuff.pos) {
                size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
                if (sizeCheck != outBuff.pos)
                    EXM_THROW(25, "Write error : %s (cannot write compressed block)",
                                    strerror(errno));
                compressedfilesize += outBuff.pos;
            }

            /* display notification; and adapt compression level */
            if (READY_FOR_UPDATE()) {
                ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
                double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;

                /* display progress notifications */
                if (g_display_prefs.displayLevel >= 3) {
                    DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
                                compressionLevel,
                                (unsigned)((zfp.ingested - zfp.consumed) >> 20),
                                (unsigned)(zfp.consumed >> 20),
                                (unsigned)(zfp.produced >> 20),
                                cShare );
                } else {   /* summarized notifications if == 2; */
                    DISPLAYLEVEL(2, "\rRead : %u ", (unsigned)(zfp.consumed >> 20));
                    if (fileSize != UTIL_FILESIZE_UNKNOWN)
                        DISPLAYLEVEL(2, "/ %u ", (unsigned)(fileSize >> 20));
                    DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare);
                    DELAY_NEXT_UPDATE();
                }

                /* adaptive mode : statistics measurement and speed correction */
                if (prefs->adaptiveMode) {

                    /* check output speed */
                    if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */

                        unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
                        unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
                        assert(zfp.produced >= previous_zfp_update.produced);
                        assert(prefs->nbWorkers >= 1);

                        /* test if compression is blocked
                         * either because output is slow and all buffers are full
                         * or because input is slow and no job can start while waiting for at least one buffer to be filled.
                         * note : exclude starting part, since currentJobID > 1 */
                        if ( (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
                          && (zfp.nbActiveWorkers == 0)                       /* confirmed : no compression ongoing */
                          ) {
                            DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
                            speedChange = slower;
                        }

                        previous_zfp_update = zfp;

                        if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
                          && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
                          ) {
                            DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
                            speedChange = slower;
                        }
                        flushWaiting = 0;
                    }

                    /* course correct only if there is at least one new job completed */
                    if (zfp.currentJobID > lastJobID) {
                        DISPLAYLEVEL(6, "compression level adaptation check \n")

                        /* check input speed */
                        if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) {   /* warm up period, to fill all workers */
                            if (inputBlocked <= 0) {
                                DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
                                speedChange = slower;
                            } else if (speedChange == noChange) {
                                unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
                                unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
                                unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
                                unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
                                previous_zfp_correction = zfp;
                                assert(inputPresented > 0);
                                DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
                                                inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
                                                (unsigned)newlyIngested, (unsigned)newlyConsumed,
                                                (unsigned)newlyFlushed, (unsigned)newlyProduced);
                                if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
                                  && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
                                  && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
                                ) {
                                    DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
                                                    newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
                                    speedChange = faster;
                                }
                            }
                            inputBlocked = 0;
                            inputPresented = 0;
                        }

                        if (speedChange == slower) {
                            DISPLAYLEVEL(6, "slower speed , higher compression \n")
                            compressionLevel ++;
                            if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
                            if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
                            compressionLevel += (compressionLevel == 0);   /* skip 0 */
                            ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
                        }
                        if (speedChange == faster) {
                            DISPLAYLEVEL(6, "faster speed , lighter compression \n")
                            compressionLevel --;
                            if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
                            compressionLevel -= (compressionLevel == 0);   /* skip 0 */
                            ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
                        }
                        speedChange = noChange;

                        lastJobID = zfp.currentJobID;
                    }  /* if (zfp.currentJobID > lastJobID) */
                }  /* if (g_adaptiveMode) */
            }  /* if (READY_FOR_UPDATE()) */
        }  /* while ((inBuff.pos != inBuff.size) */
    } while (directive != ZSTD_e_end);

    if (ferror(srcFile)) {
        EXM_THROW(26, "Read error : I/O error");
    }
    if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) {
        EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B",
                (unsigned long long)*readsize, (unsigned long long)fileSize);
    }

    return compressedfilesize;
}

/*! FIO_compressFilename_internal() :
 *  same as FIO_compressFilename_extRess(), with `ress.desFile` already opened.
 *  @return : 0 : compression completed correctly,
 *            1 : missing or pb opening srcFileName
 */
static int
FIO_compressFilename_internal(FIO_prefs_t* const prefs,
                              cRess_t ress,
                              const char* dstFileName, const char* srcFileName,
                              int compressionLevel)
{
    UTIL_time_t const timeStart = UTIL_getTime();
    clock_t const cpuStart = clock();
    U64 readsize = 0;
    U64 compressedfilesize = 0;
    U64 const fileSize = UTIL_getFileSize(srcFileName);
    DISPLAYLEVEL(5, "%s: %u bytes \n", srcFileName, (unsigned)fileSize);

    /* compression format selection */
    switch (prefs->compressionType) {
        default:
        case FIO_zstdCompression:



( run in 1.962 second using v1.01-cache-2.11-cpan-39bf76dae61 )