File-Raw

 view release on metacpan or  search on metacpan

file.c  view on Meta::CPAN

}

XS_INTERNAL(xs_mmap_close) {
    dXSARGS;
    HV *hash;
    SV **idx_sv;
    IV idx;

    if (items != 1) croak("Usage: $mmap->close");

    if (!SvROK(ST(0)) || SvTYPE(SvRV(ST(0))) != SVt_PVHV) {
        croak("Invalid mmap object");
    }

    hash = (HV*)SvRV(ST(0));
    idx_sv = hv_fetch(hash, "_idx", 4, 0);
    idx = idx_sv ? SvIV(*idx_sv) : -1;

    file_mmap_close(idx);
    hv_store(hash, "_idx", 4, newSViv(-1), 0);
    XSRETURN_EMPTY;
}

XS_INTERNAL(xs_mmap_DESTROY) {
    dXSARGS;
    HV *hash;
    SV **idx_sv;
    IV idx;

    PERL_UNUSED_VAR(items);

    if (PL_dirty) XSRETURN_EMPTY;

    if (!SvROK(ST(0)) || SvTYPE(SvRV(ST(0))) != SVt_PVHV) {
        XSRETURN_EMPTY;
    }

    hash = (HV*)SvRV(ST(0));
    idx_sv = hv_fetch(hash, "_idx", 4, 0);
    idx = idx_sv ? SvIV(*idx_sv) : -1;

    if (idx >= 0) {
        file_mmap_close(idx);
    }
    XSRETURN_EMPTY;
}

XS_INTERNAL(xs_lines_iter) {
    dXSARGS;
    const char *path;
    IV idx;
    SV *idx_sv;

    if (items < 1)
        croak("Usage: file::lines_iter(path [, plugin => ..., key => value ...])");

    path = SvPV_nolen(ST(0));

    /* Plugin path: slurp + dispatch READ, wrap the resulting AoA in an
     * iterator that walks records in order. This is eager (whole AoA
     * held in memory) - for true streaming use each_line($p, $cb,
     * plugin => ...). The iterator interface itself is preserved so
     * code that stores the iterator handle still composes. */
    if (items > 1) {
        HV *opts;
        SV *bytes;
        SV *out;
        AV *records;
        LineIterEntry *entry;

        opts = file_plugin_build_opts(aTHX_ &ST(0), 1, items, "lines_iter");
        bytes = file_slurp_internal(aTHX_ path);
        out = file_plugin_dispatch_read(aTHX_ opts, path, bytes);
        SvREFCNT_dec((SV *)opts);
        if (!out) {
            SvREFCNT_dec(bytes);
            ST(0) = &PL_sv_undef;
            XSRETURN(1);
        }
        if (out != bytes) SvREFCNT_dec(bytes);
        if (!SvROK(out) || SvTYPE(SvRV(out)) != SVt_PVAV) {
            SvREFCNT_dec(out);
            croak("File::Raw::lines_iter: plugin must return an arrayref of records");
        }
        records = (AV *)SvRV(out);
        SvREFCNT_inc(records);   /* keep the AV alive on its own */
        SvREFCNT_dec(out);       /* drop the RV wrapper */

        idx = alloc_iter_slot();
        entry = &g_iters[idx];
        entry->fd           = -1;        /* sentinel: no file behind us */
        entry->buffer       = NULL;
        entry->buf_size     = 0;
        entry->buf_pos      = 0;
        entry->buf_len      = 0;
        entry->eof          = 0;
        entry->refcount     = 1;
        entry->path         = NULL;
        entry->records      = records;
        entry->records_idx  = 0;

        idx_sv = newSViv(idx);
        ST(0) = sv_2mortal(sv_bless(newRV_noinc(idx_sv),
                                    gv_stashpv("File::Raw::lines", GV_ADD)));
        XSRETURN(1);
    }

    idx = file_lines_open(aTHX_ path);

    if (idx < 0) {
        ST(0) = &PL_sv_undef;
        XSRETURN(1);
    }

    /* Use simple IV reference - much faster than hash */
    idx_sv = newSViv(idx);
    ST(0) = sv_2mortal(sv_bless(newRV_noinc(idx_sv), gv_stashpv("File::Raw::lines", GV_ADD)));
    XSRETURN(1);
}

XS_INTERNAL(xs_lines_iter_next) {

file.c  view on Meta::CPAN

    hv_store(g_file_callback_registry, "is_not_empty", 12, SvREFCNT_inc(sv), 0);

    /* comment / is_comment */
    Newxz(cb, 1, FileLineCallback);
    cb->predicate = pred_is_comment;
    cb->perl_callback = NULL;
    sv = newSViv(PTR2IV(cb));
    hv_store(g_file_callback_registry, "comment", 7, sv, 0);
    hv_store(g_file_callback_registry, "is_comment", 10, SvREFCNT_inc(sv), 0);

    /* not_comment / is_not_comment */
    Newxz(cb, 1, FileLineCallback);
    cb->predicate = pred_is_not_comment;
    cb->perl_callback = NULL;
    sv = newSViv(PTR2IV(cb));
    hv_store(g_file_callback_registry, "not_comment", 11, sv, 0);
    hv_store(g_file_callback_registry, "is_not_comment", 14, SvREFCNT_inc(sv), 0);
}

static FileLineCallback* file_get_callback(pTHX_ const char *name) {
    SV **svp;
    if (!g_file_callback_registry) return NULL;
    svp = hv_fetch(g_file_callback_registry, name, strlen(name), 0);
    if (svp && SvIOK(*svp)) {
        return INT2PTR(FileLineCallback*, SvIVX(*svp));
    }
    return NULL;
}

/* Process lines with callback - MULTICALL optimized (Perl >= 5.14 only) */
XS_INTERNAL(xs_each_line) {
    dXSARGS;
#if PERL_VERSION >= 14
    dMULTICALL;
#endif
    const char *path;
    SV *callback;
    IV idx;
    CV *block_cv;
    SV *old_defsv;
    SV *line_sv;
    LineIterEntry *entry;
    char *line_start;
    char *newline;
    size_t line_len;
    ssize_t n;
#if PERL_VERSION >= 14
    U8 gimme = G_VOID;
#endif

    if (items < 2)
        croak("Usage: file::each_line(path, callback [, plugin => ..., key => value ...])");

    path = SvPV_nolen(ST(0));
    callback = ST(1);

    if (!SvROK(callback) || SvTYPE(SvRV(callback)) != SVt_PVCV) {
        croak("Second argument must be a code reference");
    }

    /* Plugin path: route through streaming dispatch. The plugin's
     * stream fn owns the record emission and calls back to `callback`
     * per record (typically once for each parsed CSV row, etc.). */
    if (items > 2) {
        HV *opts = file_plugin_build_opts(aTHX_ &ST(0), 2, items, "each_line");
        (void)file_plugin_dispatch_stream(aTHX_ opts, path, callback);
        SvREFCNT_dec((SV *)opts);
        XSRETURN_EMPTY;
    }

    block_cv = (CV*)SvRV(callback);
    idx = file_lines_open(aTHX_ path);
    if (idx < 0) {
        XSRETURN_EMPTY;
    }

    entry = &g_iters[idx];

    old_defsv = DEFSV;
    line_sv = newSV(256);
    DEFSV = line_sv;

#if PERL_VERSION >= 14
    PUSH_MULTICALL(block_cv);
#endif

    while (1) {
        /* Look for newline in current buffer */
        if (entry->buf_pos < entry->buf_len) {
            line_start = entry->buffer + entry->buf_pos;
            newline = memchr(line_start, '\n', entry->buf_len - entry->buf_pos);

            if (newline) {
                line_len = newline - line_start;
                sv_setpvn(line_sv, line_start, line_len);
                entry->buf_pos += line_len + 1;
#if PERL_VERSION >= 14
                MULTICALL;
#else
                { dSP; PUSHMARK(SP); call_sv((SV*)block_cv, G_VOID|G_DISCARD); }
#endif
                continue;
            }
        }

        /* No newline found, need more data */
        if (entry->eof) {
            /* Return remaining data if any */
            if (entry->buf_pos < entry->buf_len) {
                line_len = entry->buf_len - entry->buf_pos;
                sv_setpvn(line_sv, entry->buffer + entry->buf_pos, line_len);
                entry->buf_pos = entry->buf_len;
#if PERL_VERSION >= 14
                MULTICALL;
#else
                { dSP; PUSHMARK(SP); call_sv((SV*)block_cv, G_VOID|G_DISCARD); }
#endif
            }
            break;
        }

file.c  view on Meta::CPAN

            if (rn > 0) {
                rv = POPs;
                av_push(out, SvREFCNT_inc(rv));
            }
            PUTBACK;
        }
        SvREFCNT_dec(holder);
        ST(0) = sv_2mortal(newRV_noinc((SV *)out));
        XSRETURN(1);
    }

    idx = file_lines_open(aTHX_ path);
    if (idx < 0) {
        ST(0) = sv_2mortal(newRV_noinc((SV*)result));
        XSRETURN(1);
    }

    /* Call Perl callback */
    {
        while ((line = file_lines_next(aTHX_ idx)) != &PL_sv_undef) {
            dSP;
            IV count;
            SV *result_sv;
            PUSHMARK(SP);
            XPUSHs(sv_2mortal(line));
            PUTBACK;
            count = call_sv(callback, G_SCALAR);
            SPAGAIN;
            if (count > 0) {
                result_sv = POPs;
                av_push(result, SvREFCNT_inc(result_sv));
            }
            PUTBACK;
        }
    }

    file_lines_close(idx);
    ST(0) = sv_2mortal(newRV_noinc((SV*)result));
    XSRETURN(1);
}

/* ============================================
   Perl bridge for the plugin API.

   Perl plugins are registered as a hashref of phase coderefs:

       File::Raw::register_plugin('csv', {
           read   => sub { my ($path, $bytes,  $opts) = @_; ... },
           write  => sub { my ($path, $rows,   $opts) = @_; ... },
           record => sub { my ($path, $record, $opts) = @_; ... },
       });

   The bridge allocates a PerlPluginBridge holding the coderef SVs plus
   a FilePlugin block whose function pointers are static C thunks. The
   thunks recover the bridge from FilePluginContext::plugin_state and
   call the appropriate coderef. The bridge is pinned in
   g_perl_plugins so we can free it on unregister.

   The 'stream' phase is intentionally not supported from Perl: a Perl
   stream plugin would be invoked once per chunk by file.c's read loop,
   and the per-call call_sv overhead defeats the point of streaming.
   Perl plugins that need record-by-record callbacks should implement
   the 'record' phase instead - File::Raw drives the iteration.
   ============================================ */

typedef struct PerlPluginBridge {
    char        *name;     /* strdup'd; pointer is stored in plugin.name */
    SV          *read_cv;
    SV          *write_cv;
    SV          *record_cv;
    FilePlugin   plugin;
} PerlPluginBridge;

static HV *g_perl_plugins = NULL;

static void perl_plugin_bridge_free(pTHX_ PerlPluginBridge *b) {
    if (!b) return;
    if (b->read_cv)   SvREFCNT_dec(b->read_cv);
    if (b->write_cv)  SvREFCNT_dec(b->write_cv);
    if (b->record_cv) SvREFCNT_dec(b->record_cv);
    if (b->name)      Safefree(b->name);
    Safefree(b);
}

static SV *perl_plugin_thunk_read(pTHX_ FilePluginContext *ctx) {
    PerlPluginBridge *b = (PerlPluginBridge *)ctx->plugin_state;
    SV *result;
    int count;
    dSP;

    ENTER;
    SAVETMPS;
    PUSHMARK(SP);
    XPUSHs(sv_2mortal(newSVpv(ctx->path ? ctx->path : "", 0)));
    XPUSHs(sv_2mortal(newSVsv(ctx->data)));
    XPUSHs(sv_2mortal(newRV_inc((SV *)ctx->options)));
    PUTBACK;

    count = call_sv(b->read_cv, G_SCALAR | G_EVAL);

    SPAGAIN;
    if (SvTRUE(ERRSV)) {
        SV *err = newSVsv(ERRSV);
        FREETMPS;
        LEAVE;
        croak_sv(err);
    }
    if (count > 0) {
        SV *ret = POPs;
        if (SvOK(ret)) {
            result = newSVsv(ret);
        } else {
            ctx->cancel = 1;
            result = NULL;
        }
    } else {
        ctx->cancel = 1;
        result = NULL;
    }
    PUTBACK;
    FREETMPS;



( run in 0.875 second using v1.01-cache-2.11-cpan-140bd7fdf52 )