Compress-Stream-Zstd

 view release on metacpan or  search on metacpan

ext/zstd/programs/fileio_asyncio.c  view on Meta::CPAN

 * Executes a write job synchronously. Can be used as a function for a thread pool. */
static void AIO_WritePool_executeWriteJob(void* opaque){
    IOJob_t* const job = (IOJob_t*) opaque;
    WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
    ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
    AIO_IOPool_releaseIoJob(job);
}

/* AIO_WritePool_create:
 * Allocates and sets and a new write pool including its included jobs. */
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
    WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
    if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
    AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
    ctx->storedSkips = 0;
    return ctx;
}

/* AIO_WritePool_free:
 * Frees and releases a writePool and its resources. Closes destination file if needs to. */
void AIO_WritePool_free(WritePoolCtx_t* ctx) {
    /* Make sure we finish all tasks and then free the resources */
    if(AIO_WritePool_getFile(ctx))
        AIO_WritePool_closeFile(ctx);
    AIO_IOPool_destroy(&ctx->base);
    assert(ctx->storedSkips==0);
    free(ctx);
}

/* AIO_WritePool_setAsync:
 * Allows (de)activating async mode, to be used when the expected overhead
 * of asyncio costs more than the expected gains. */
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
    AIO_IOPool_setThreaded(&ctx->base, async);
}


/* ***********************************
 *  ReadPool implementation
 *************************************/
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
    int i;
    for(i=0; i<ctx->completedJobsCount; i++) {
        IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
        AIO_IOPool_releaseIoJob(job);
    }
    ctx->completedJobsCount = 0;
}

static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
    AIO_IOPool_lockJobsMutex(&ctx->base);
    assert(ctx->completedJobsCount < MAX_IO_JOBS);
    ctx->completedJobs[ctx->completedJobsCount++] = job;
    if(AIO_IOPool_threadPoolActive(&ctx->base)) {
        ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
    }
    AIO_IOPool_unlockJobsMutex(&ctx->base);
}

/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
 * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
 * if job wasn't found returns NULL.
 * IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
    IOJob_t *job = NULL;
    int i;
    /* This implementation goes through all completed jobs and looks for the one matching the next offset.
     * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
     * reads to be completed in order) this implementation was chosen as it better fits other asyncio
     * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
    for (i=0; i<ctx->completedJobsCount; i++) {
        job = (IOJob_t *) ctx->completedJobs[i];
        if (job->offset == ctx->waitingOnOffset) {
            ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
            return job;
        }
    }
    return NULL;
}

/* AIO_ReadPool_numReadsInFlight:
 * Returns the number of IO read jobs currently in flight. */
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
    const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
    return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}

/* AIO_ReadPool_getNextCompletedJob:
 * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
 * Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
    IOJob_t *job = NULL;
    AIO_IOPool_lockJobsMutex(&ctx->base);

    job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);

    /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
    while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
        assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
        ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
        job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
    }

    if(job) {
        assert(job->offset == ctx->waitingOnOffset);
        ctx->waitingOnOffset += job->usedBufferSize;
    }

    AIO_IOPool_unlockJobsMutex(&ctx->base);
    return job;
}


/* AIO_ReadPool_executeReadJob:
 * Executes a read job synchronously. Can be used as a function for a thread pool. */
static void AIO_ReadPool_executeReadJob(void* opaque){
    IOJob_t* const job = (IOJob_t*) opaque;
    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
    if(ctx->reachedEof) {
        job->usedBufferSize = 0;
        AIO_ReadPool_addJobToCompleted(job);
        return;
    }
    job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
    if(job->usedBufferSize < job->bufferSize) {
        if(ferror(job->file)) {
            EXM_THROW(37, "Read error");
        } else if(feof(job->file)) {
            ctx->reachedEof = 1;
        } else {
            EXM_THROW(37, "Unexpected short read");
        }
    }
    AIO_ReadPool_addJobToCompleted(job);
}

static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
    IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
    job->offset = ctx->nextReadOffset;
    ctx->nextReadOffset += job->bufferSize;
    AIO_IOPool_enqueueJob(job);
}

static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
    int i;
    for (i = 0; i < ctx->base.availableJobsCount; i++) {
        AIO_ReadPool_enqueueRead(ctx);
    }
}

/* AIO_ReadPool_setFile:
 * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
 * Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
    assert(ctx!=NULL);
    AIO_IOPool_join(&ctx->base);
    AIO_ReadPool_releaseAllCompletedJobs(ctx);
    if (ctx->currentJobHeld) {
        AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
        ctx->currentJobHeld = NULL;
    }



( run in 2.524 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )