File-Raw
view release on metacpan or search on metacpan
}
XS_INTERNAL(xs_mmap_close) {
dXSARGS;
HV *hash;
SV **idx_sv;
IV idx;
if (items != 1) croak("Usage: $mmap->close");
if (!SvROK(ST(0)) || SvTYPE(SvRV(ST(0))) != SVt_PVHV) {
croak("Invalid mmap object");
}
hash = (HV*)SvRV(ST(0));
idx_sv = hv_fetch(hash, "_idx", 4, 0);
idx = idx_sv ? SvIV(*idx_sv) : -1;
file_mmap_close(idx);
hv_store(hash, "_idx", 4, newSViv(-1), 0);
XSRETURN_EMPTY;
}
XS_INTERNAL(xs_mmap_DESTROY) {
dXSARGS;
HV *hash;
SV **idx_sv;
IV idx;
PERL_UNUSED_VAR(items);
if (PL_dirty) XSRETURN_EMPTY;
if (!SvROK(ST(0)) || SvTYPE(SvRV(ST(0))) != SVt_PVHV) {
XSRETURN_EMPTY;
}
hash = (HV*)SvRV(ST(0));
idx_sv = hv_fetch(hash, "_idx", 4, 0);
idx = idx_sv ? SvIV(*idx_sv) : -1;
if (idx >= 0) {
file_mmap_close(idx);
}
XSRETURN_EMPTY;
}
XS_INTERNAL(xs_lines_iter) {
dXSARGS;
const char *path;
IV idx;
SV *idx_sv;
if (items < 1)
croak("Usage: file::lines_iter(path [, plugin => ..., key => value ...])");
path = SvPV_nolen(ST(0));
/* Plugin path: slurp + dispatch READ, wrap the resulting AoA in an
* iterator that walks records in order. This is eager (whole AoA
* held in memory) - for true streaming use each_line($p, $cb,
* plugin => ...). The iterator interface itself is preserved so
* code that stores the iterator handle still composes. */
if (items > 1) {
HV *opts;
SV *bytes;
SV *out;
AV *records;
LineIterEntry *entry;
opts = file_plugin_build_opts(aTHX_ &ST(0), 1, items, "lines_iter");
bytes = file_slurp_internal(aTHX_ path);
out = file_plugin_dispatch_read(aTHX_ opts, path, bytes);
SvREFCNT_dec((SV *)opts);
if (!out) {
SvREFCNT_dec(bytes);
ST(0) = &PL_sv_undef;
XSRETURN(1);
}
if (out != bytes) SvREFCNT_dec(bytes);
if (!SvROK(out) || SvTYPE(SvRV(out)) != SVt_PVAV) {
SvREFCNT_dec(out);
croak("File::Raw::lines_iter: plugin must return an arrayref of records");
}
records = (AV *)SvRV(out);
SvREFCNT_inc(records); /* keep the AV alive on its own */
SvREFCNT_dec(out); /* drop the RV wrapper */
idx = alloc_iter_slot();
entry = &g_iters[idx];
entry->fd = -1; /* sentinel: no file behind us */
entry->buffer = NULL;
entry->buf_size = 0;
entry->buf_pos = 0;
entry->buf_len = 0;
entry->eof = 0;
entry->refcount = 1;
entry->path = NULL;
entry->records = records;
entry->records_idx = 0;
idx_sv = newSViv(idx);
ST(0) = sv_2mortal(sv_bless(newRV_noinc(idx_sv),
gv_stashpv("File::Raw::lines", GV_ADD)));
XSRETURN(1);
}
idx = file_lines_open(aTHX_ path);
if (idx < 0) {
ST(0) = &PL_sv_undef;
XSRETURN(1);
}
/* Use simple IV reference - much faster than hash */
idx_sv = newSViv(idx);
ST(0) = sv_2mortal(sv_bless(newRV_noinc(idx_sv), gv_stashpv("File::Raw::lines", GV_ADD)));
XSRETURN(1);
}
XS_INTERNAL(xs_lines_iter_next) {
hv_store(g_file_callback_registry, "is_not_empty", 12, SvREFCNT_inc(sv), 0);
/* comment / is_comment */
Newxz(cb, 1, FileLineCallback);
cb->predicate = pred_is_comment;
cb->perl_callback = NULL;
sv = newSViv(PTR2IV(cb));
hv_store(g_file_callback_registry, "comment", 7, sv, 0);
hv_store(g_file_callback_registry, "is_comment", 10, SvREFCNT_inc(sv), 0);
/* not_comment / is_not_comment */
Newxz(cb, 1, FileLineCallback);
cb->predicate = pred_is_not_comment;
cb->perl_callback = NULL;
sv = newSViv(PTR2IV(cb));
hv_store(g_file_callback_registry, "not_comment", 11, sv, 0);
hv_store(g_file_callback_registry, "is_not_comment", 14, SvREFCNT_inc(sv), 0);
}
static FileLineCallback* file_get_callback(pTHX_ const char *name) {
SV **svp;
if (!g_file_callback_registry) return NULL;
svp = hv_fetch(g_file_callback_registry, name, strlen(name), 0);
if (svp && SvIOK(*svp)) {
return INT2PTR(FileLineCallback*, SvIVX(*svp));
}
return NULL;
}
/* Process lines with callback - MULTICALL optimized (Perl >= 5.14 only) */
XS_INTERNAL(xs_each_line) {
dXSARGS;
#if PERL_VERSION >= 14
dMULTICALL;
#endif
const char *path;
SV *callback;
IV idx;
CV *block_cv;
SV *old_defsv;
SV *line_sv;
LineIterEntry *entry;
char *line_start;
char *newline;
size_t line_len;
ssize_t n;
#if PERL_VERSION >= 14
U8 gimme = G_VOID;
#endif
if (items < 2)
croak("Usage: file::each_line(path, callback [, plugin => ..., key => value ...])");
path = SvPV_nolen(ST(0));
callback = ST(1);
if (!SvROK(callback) || SvTYPE(SvRV(callback)) != SVt_PVCV) {
croak("Second argument must be a code reference");
}
/* Plugin path: route through streaming dispatch. The plugin's
* stream fn owns the record emission and calls back to `callback`
* per record (typically once for each parsed CSV row, etc.). */
if (items > 2) {
HV *opts = file_plugin_build_opts(aTHX_ &ST(0), 2, items, "each_line");
(void)file_plugin_dispatch_stream(aTHX_ opts, path, callback);
SvREFCNT_dec((SV *)opts);
XSRETURN_EMPTY;
}
block_cv = (CV*)SvRV(callback);
idx = file_lines_open(aTHX_ path);
if (idx < 0) {
XSRETURN_EMPTY;
}
entry = &g_iters[idx];
old_defsv = DEFSV;
line_sv = newSV(256);
DEFSV = line_sv;
#if PERL_VERSION >= 14
PUSH_MULTICALL(block_cv);
#endif
while (1) {
/* Look for newline in current buffer */
if (entry->buf_pos < entry->buf_len) {
line_start = entry->buffer + entry->buf_pos;
newline = memchr(line_start, '\n', entry->buf_len - entry->buf_pos);
if (newline) {
line_len = newline - line_start;
sv_setpvn(line_sv, line_start, line_len);
entry->buf_pos += line_len + 1;
#if PERL_VERSION >= 14
MULTICALL;
#else
{ dSP; PUSHMARK(SP); call_sv((SV*)block_cv, G_VOID|G_DISCARD); }
#endif
continue;
}
}
/* No newline found, need more data */
if (entry->eof) {
/* Return remaining data if any */
if (entry->buf_pos < entry->buf_len) {
line_len = entry->buf_len - entry->buf_pos;
sv_setpvn(line_sv, entry->buffer + entry->buf_pos, line_len);
entry->buf_pos = entry->buf_len;
#if PERL_VERSION >= 14
MULTICALL;
#else
{ dSP; PUSHMARK(SP); call_sv((SV*)block_cv, G_VOID|G_DISCARD); }
#endif
}
break;
}
if (rn > 0) {
rv = POPs;
av_push(out, SvREFCNT_inc(rv));
}
PUTBACK;
}
SvREFCNT_dec(holder);
ST(0) = sv_2mortal(newRV_noinc((SV *)out));
XSRETURN(1);
}
idx = file_lines_open(aTHX_ path);
if (idx < 0) {
ST(0) = sv_2mortal(newRV_noinc((SV*)result));
XSRETURN(1);
}
/* Call Perl callback */
{
while ((line = file_lines_next(aTHX_ idx)) != &PL_sv_undef) {
dSP;
IV count;
SV *result_sv;
PUSHMARK(SP);
XPUSHs(sv_2mortal(line));
PUTBACK;
count = call_sv(callback, G_SCALAR);
SPAGAIN;
if (count > 0) {
result_sv = POPs;
av_push(result, SvREFCNT_inc(result_sv));
}
PUTBACK;
}
}
file_lines_close(idx);
ST(0) = sv_2mortal(newRV_noinc((SV*)result));
XSRETURN(1);
}
/* ============================================
Perl bridge for the plugin API.
Perl plugins are registered as a hashref of phase coderefs:
File::Raw::register_plugin('csv', {
read => sub { my ($path, $bytes, $opts) = @_; ... },
write => sub { my ($path, $rows, $opts) = @_; ... },
record => sub { my ($path, $record, $opts) = @_; ... },
});
The bridge allocates a PerlPluginBridge holding the coderef SVs plus
a FilePlugin block whose function pointers are static C thunks. The
thunks recover the bridge from FilePluginContext::plugin_state and
call the appropriate coderef. The bridge is pinned in
g_perl_plugins so we can free it on unregister.
The 'stream' phase is intentionally not supported from Perl: a Perl
stream plugin would be invoked once per chunk by file.c's read loop,
and the per-call call_sv overhead defeats the point of streaming.
Perl plugins that need record-by-record callbacks should implement
the 'record' phase instead - File::Raw drives the iteration.
============================================ */
typedef struct PerlPluginBridge {
char *name; /* strdup'd; pointer is stored in plugin.name */
SV *read_cv;
SV *write_cv;
SV *record_cv;
FilePlugin plugin;
} PerlPluginBridge;
static HV *g_perl_plugins = NULL;
static void perl_plugin_bridge_free(pTHX_ PerlPluginBridge *b) {
if (!b) return;
if (b->read_cv) SvREFCNT_dec(b->read_cv);
if (b->write_cv) SvREFCNT_dec(b->write_cv);
if (b->record_cv) SvREFCNT_dec(b->record_cv);
if (b->name) Safefree(b->name);
Safefree(b);
}
static SV *perl_plugin_thunk_read(pTHX_ FilePluginContext *ctx) {
PerlPluginBridge *b = (PerlPluginBridge *)ctx->plugin_state;
SV *result;
int count;
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(newSVpv(ctx->path ? ctx->path : "", 0)));
XPUSHs(sv_2mortal(newSVsv(ctx->data)));
XPUSHs(sv_2mortal(newRV_inc((SV *)ctx->options)));
PUTBACK;
count = call_sv(b->read_cv, G_SCALAR | G_EVAL);
SPAGAIN;
if (SvTRUE(ERRSV)) {
SV *err = newSVsv(ERRSV);
FREETMPS;
LEAVE;
croak_sv(err);
}
if (count > 0) {
SV *ret = POPs;
if (SvOK(ret)) {
result = newSVsv(ret);
} else {
ctx->cancel = 1;
result = NULL;
}
} else {
ctx->cancel = 1;
result = NULL;
}
PUTBACK;
FREETMPS;
( run in 0.875 second using v1.01-cache-2.11-cpan-140bd7fdf52 )