Data-NDArray-Shared

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN


    *   "$array = Data::NDArray::Shared->from_pdl($piddle, $path)"

        A new shared array copied from $piddle (made physical and contiguous
        first); the dtype and shape follow the piddle's type and "reverse"
        of its dims. $path is the backing file ("undef" or omitted for an
        anonymous mapping).

    *   "$array->update_from_pdl($piddle)"

        Copy $piddle into this array in place (write-locked). The piddle's
        type must match the dtype and its dims must equal
        reverse($array->shape), else it croaks. Returns the array.

    *   "$piddle = $array->as_pdl_alias"

        A piddle that aliases the shared mapping with no copy (a real
        "PDL_DONTTOUCHDATA" ndarray over our memory): an in-place PDL
        operation ("$p .= ...", "$p->inplace->...") writes straight through
        to shared memory -- visible to every process that maps it -- and
        reads see live data. The array is kept alive for as long as the
        piddle.

        This one method needs PDL at build time (it is compiled against
        PDL's C API): if the module was installed without PDL present it
        "croak"s, while the copy methods above keep working through a
        runtime "require PDL". Reinstall with PDL installed to enable it.

        Caveats. The alias bypasses the rwlock: you must coordinate access
        yourself (no other process mutating concurrently), as with any
        unlocked shared-memory view. Do not resize or retype the alias (a
        reshape that grows it, a type conversion) -- it is a fixed window
        onto the mapping; use "to_pdl"/"from_pdl" when you want an
        independent, resizable copy.

    *   "$bytes = $array->buffer"

        The raw contiguous data region as a byte string (read-locked
        snapshot), row-major C-order -- useful on its own for serialization
        or IPC, and the basis for "to_pdl".
        "$array->update_from_bytes($bytes)" is the inverse (write-locked;
        the string must be exactly "size * itemsize" bytes).

    See eg/pdl_interop.pl for a worked example, including a cross-process
    PDL transform on one shared array.

SHARING ACROSS PROCESSES
    The array lives in a shared mapping, shared the same three ways as the
    rest of the family: a backing file (every process calls "new($path,
    $dtype, @shape)" on the same path), an anonymous mapping inherited
    across "fork", or a memfd whose descriptor is passed to an unrelated

Shared.xs  view on Meta::CPAN

        case NDA_U64: { uint64_t v = (uint64_t)SvUV(val); memcpy(base + e*8, &v, 8); break; }
        case NDA_U32: { uint32_t v = (uint32_t)SvUV(val); memcpy(base + e*4, &v, 4); break; }
        case NDA_U16: { uint16_t v = (uint16_t)SvUV(val); memcpy(base + e*2, &v, 2); break; }
        case NDA_U8:  { uint8_t  v = (uint8_t)SvUV(val);  base[e] = (char)v;        break; }
    }
}

/* ----------------------------------------------------------------
 * Fill every element with the typed value of val.
 * ---------------------------------------------------------------- */
static void nda_fill_locked(pTHX_ NdaHandle *h, SV *val) {
    uint64_t size = h->hdr->size, e;
    char *base = nda_data(h);
    /* Decode once, then splat the raw bytes for speed + consistency. */
    switch (h->hdr->dtype) {
        case NDA_F64: { double   v = (double)SvNV(val); for (e=0;e<size;e++) memcpy(base+e*8,&v,8); break; }
        case NDA_F32: { float    v = (float)SvNV(val);  for (e=0;e<size;e++) memcpy(base+e*4,&v,4); break; }
        case NDA_I64: { int64_t  v = (int64_t)SvIV(val); for (e=0;e<size;e++) memcpy(base+e*8,&v,8); break; }
        case NDA_I32: { int32_t  v = (int32_t)SvIV(val); for (e=0;e<size;e++) memcpy(base+e*4,&v,4); break; }
        case NDA_I16: { int16_t  v = (int16_t)SvIV(val); for (e=0;e<size;e++) memcpy(base+e*2,&v,2); break; }
        case NDA_I8:  { int8_t   v = (int8_t)SvIV(val);  memset(base, (int)(unsigned char)v, (size_t)size); break; }

Shared.xs  view on Meta::CPAN

    CT *p = (CT *)base; \
    if (op == '+') { CT s = (CT)siv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] + (UT)s); } \
    else           { CT s = (CT)siv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] * (UT)s); } \
} while (0)
#define NDA_SCALAR_UINT(CT, UT) do { \
    CT *p = (CT *)base; \
    if (op == '+') { CT s = (CT)suv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] + (UT)s); } \
    else           { CT s = (CT)suv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] * (UT)s); } \
} while (0)

static void nda_scalar_op_locked(pTHX_ NdaHandle *h, SV *sv, int op) {
    uint64_t size = h->hdr->size, e;
    char *base = nda_data(h);
    if (nda_is_float(h->hdr->dtype)) {
        double s = (double)SvNV(sv);
        if (h->hdr->dtype == NDA_F64) {
            double *p = (double *)base;
            if (op == '+') for (e=0;e<size;e++) p[e] += s; else for (e=0;e<size;e++) p[e] *= s;
        } else { /* F32 */
            float *p = (float *)base; float fs = (float)s;
            if (op == '+') for (e=0;e<size;e++) p[e] += fs; else for (e=0;e<size;e++) p[e] *= fs;

Shared.xs  view on Meta::CPAN

} while (0)
/* integer dtypes: compute in UT (unsigned, >= int rank) for defined wrap -- see
 * the NDA_SCALAR_INT note above. */
#define NDA_EW_INT(CT, UT) do { \
    CT *pa = (CT *)da; const CT *pb = (const CT *)db; \
    if      (op == '+') for (e=0;e<size;e++) pa[e] = (CT)((UT)pa[e] + (UT)pb[e]); \
    else if (op == '-') for (e=0;e<size;e++) pa[e] = (CT)((UT)pa[e] - (UT)pb[e]); \
    else                for (e=0;e<size;e++) pa[e] = (CT)((UT)pa[e] * (UT)pb[e]); \
} while (0)

static void nda_elementwise_op_locked(NdaHandle *a, NdaHandle *b, int op) {
    uint64_t size = a->hdr->size, e;
    char *da = nda_data(a);
    const char *db = nda_data(b);
    switch (a->hdr->dtype) {
        case NDA_F64: NDA_EW_FLT(double);   break;
        case NDA_F32: NDA_EW_FLT(float);    break;
        case NDA_I64: NDA_EW_INT(int64_t,  uint64_t); break;
        case NDA_I32: NDA_EW_INT(int32_t,  uint32_t); break;
        case NDA_I16: NDA_EW_INT(int16_t,  uint32_t); break;
        case NDA_I8:  NDA_EW_INT(int8_t,   uint32_t); break;

Shared.xs  view on Meta::CPAN

        croak("%s: expected a Data::NDArray::Shared object", who);
    NdaHandle *o = INT2PTR(NdaHandle*, SvIV(SvRV(other)));
    if (!o) croak("Attempted to use a destroyed Data::NDArray::Shared object");
    if (o->hdr->dtype != h->hdr->dtype)
        croak("%s: dtype mismatch", who);
    if (o->hdr->size != h->hdr->size)
        croak("%s: size mismatch (%" UVuf " vs %" UVuf ")",
              who, (UV)h->hdr->size, (UV)o->hdr->size);
    if (o == h || o->hdr->array_id == h->hdr->array_id) {
        nda_rwlock_wrlock(h);
        nda_elementwise_op_locked(h, h, op);   /* self: +=->double, -=->zero, *=->square */
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
        nda_rwlock_wrunlock(h);
    } else {
        nda_lock_pair(h, o);
        nda_elementwise_op_locked(h, o, op);
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
        nda_unlock_pair(h, o);
    }
}

/* Resolve the uniform (path, dtype, @shape) argument convention shared by new
 * and new_memfd, matching the rest of the Data::*::Shared family:
 *   ($path_or_undef, $dtype, @shape)
 * ST(1) is ALWAYS the path/memfd label (an SV that may carry undef for an
 * anonymous mapping), ST(2) is ALWAYS the dtype name string, and ST(3).. are

Shared.xs  view on Meta::CPAN

    nda_rwlock_wrunlock(h);

SV *
fill(self, val)
    SV *self
    SV *val
  PREINIT:
    EXTRACT(self);
  CODE:
    nda_rwlock_wrlock(h);
    nda_fill_locked(aTHX_ h, val);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    nda_rwlock_wrunlock(h);
    SvREFCNT_inc(self);
    RETVAL = self;
  OUTPUT:
    RETVAL

SV *
zero(self)
    SV *self

Shared.xs  view on Meta::CPAN

  OUTPUT:
    RETVAL

NV
sum(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    nda_rwlock_rdlock(h);
    RETVAL = nda_sum_locked(h);
    nda_rwlock_rdunlock(h);
  OUTPUT:
    RETVAL

NV
mean(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        double acc;
        nda_rwlock_rdlock(h);
        acc = nda_sum_locked(h);
        nda_rwlock_rdunlock(h);
        RETVAL = acc / (double)h->hdr->size;   /* size >= 1 always */
    }
  OUTPUT:
    RETVAL

SV *
min(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        uint64_t best;
        nda_rwlock_rdlock(h);
        best = nda_argextreme_locked(h, 0);   /* compare in native dtype */
        RETVAL = nda_get_sv(aTHX_ h, best);   /* dtype-correct value of the min element */
        nda_rwlock_rdunlock(h);
    }
  OUTPUT:
    RETVAL

SV *
max(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        uint64_t best;
        nda_rwlock_rdlock(h);
        best = nda_argextreme_locked(h, 1);   /* compare in native dtype */
        RETVAL = nda_get_sv(aTHX_ h, best);
        nda_rwlock_rdunlock(h);
    }
  OUTPUT:
    RETVAL

SV *
add_scalar(self, s)
    SV *self
    SV *s
  PREINIT:
    EXTRACT(self);
  CODE:
    nda_rwlock_wrlock(h);
    nda_scalar_op_locked(aTHX_ h, s, '+');
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    nda_rwlock_wrunlock(h);
    SvREFCNT_inc(self);
    RETVAL = self;
  OUTPUT:
    RETVAL

SV *
mul_scalar(self, s)
    SV *self
    SV *s
  PREINIT:
    EXTRACT(self);
  CODE:
    nda_rwlock_wrlock(h);
    nda_scalar_op_locked(aTHX_ h, s, '*');
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    nda_rwlock_wrunlock(h);
    SvREFCNT_inc(self);
    RETVAL = self;
  OUTPUT:
    RETVAL

SV *
add(self, other)
    SV *self

Shared.xs  view on Meta::CPAN

UV
itemsize(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = (UV)h->hdr->itemsize;   /* immutable after creation -- lock-free */
  OUTPUT:
    RETVAL

# Raw contiguous data region as a byte string (read-locked snapshot copy).  The
# bytes are row-major C-order; pair with shape()/dtype() to interpret them.
SV *
buffer(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        uint64_t bytes = h->hdr->size * h->hdr->itemsize;
        char *base = nda_data(h);

Shared.xs  view on Meta::CPAN

        nda_rwlock_rdlock(h);
        Copy(base, SvPVX(RETVAL), bytes, char);
        nda_rwlock_rdunlock(h);
        SvCUR_set(RETVAL, (STRLEN)bytes);
        *SvEND(RETVAL) = '\0';
    }
  OUTPUT:
    RETVAL

# Overwrite the whole data region from a byte string of exactly size*itemsize
# bytes (write-locked).  Used by from_pdl/update_from_pdl.
void
update_from_bytes(self, src)
    SV *self
    SV *src
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        STRLEN slen;
        const char *sbytes = SvPVbyte(src, slen);   /* resolve + any croak BEFORE the lock */

lib/Data/NDArray/Shared.pm  view on Meta::CPAN

    exists &{"PDL::$name"}
        or Carp::croak("this PDL has no '$name' type (needed for dtype '$dtype'); upgrade PDL");
    \&{"PDL::$name"};
}

# NDArray -> a NEW (copied) PDL piddle; dims = reverse(shape).
sub to_pdl {
    my ($self) = @_;
    _require_pdl();
    my $p = PDL->new_from_specification(_pdl_ctor($self->dtype)->(), reverse $self->shape);
    ${ $p->get_dataref } = $self->buffer;   # read-locked snapshot
    $p->upd_data;
    return $p;
}

# A NEW shared NDArray copied from a piddle; $path undef => anonymous mapping.
sub from_pdl {
    my ($class, $p, $path) = @_;
    _require_pdl();
    my $tname = "" . $p->type;
    my $dt = $DTYPE_OF{$tname}

lib/Data/NDArray/Shared.pm  view on Meta::CPAN

snapshot.

=item * C<< $array = Data::NDArray::Shared->from_pdl($piddle, $path) >>

A B<new> shared array B<copied> from C<$piddle> (made physical and contiguous
first); the dtype and shape follow the piddle's type and C<reverse> of its dims.
C<$path> is the backing file (C<undef> or omitted for an anonymous mapping).

=item * C<< $array->update_from_pdl($piddle) >>

Copy C<$piddle> into this array B<in place> (write-locked). The piddle's type
must match the dtype and its dims must equal C<< reverse($array-E<gt>shape) >>,
else it croaks. Returns the array.

=item * C<< $piddle = $array->as_pdl_alias >>

A piddle that B<aliases the shared mapping with no copy> (a real
C<PDL_DONTTOUCHDATA> ndarray over our memory): an B<in-place> PDL operation
(C<< $p .= ... >>, C<< $p-E<gt>inplace-E<gt>... >>) writes straight through to
shared memory -- visible to every process that maps it -- and reads see live
data. The array is kept alive for as long as the piddle.

This one method needs PDL at B<build> time (it is compiled against PDL's C API):
if the module was installed without PDL present it C<croak>s, while the copy
methods above keep working through a runtime C<require PDL>. Reinstall with PDL
installed to enable it.

B<Caveats.> The alias B<bypasses the rwlock>: you must coordinate access
yourself (no other process mutating concurrently), as with any unlocked
shared-memory view. Do not B<resize or retype> the alias (a reshape that grows
it, a type conversion) -- it is a fixed window onto the mapping; use
C<to_pdl>/C<from_pdl> when you want an independent, resizable copy.

=item * C<< $bytes = $array->buffer >>

The raw contiguous data region as a byte string (read-locked snapshot),
row-major C-order -- useful on its own for serialization or IPC, and the basis
for C<to_pdl>. C<< $array->update_from_bytes($bytes) >> is the inverse
(write-locked; the string must be exactly C<< size * itemsize >> bytes).

=back

See F<eg/pdl_interop.pl> for a worked example, including a cross-process PDL
transform on one shared array.

=head1 SHARING ACROSS PROCESSES

The array lives in a shared mapping, shared the same three ways as the rest of
the family: a B<backing file> (every process calls C<< new($path, $dtype,

ndarray.h  view on Meta::CPAN

static inline void nda_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define NDA_RWLOCK_WRITER_BIT 0x80000000U
#define NDA_RWLOCK_PID_MASK   0x7FFFFFFFU
#define NDA_RWLOCK_WR(pid)    (NDA_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & NDA_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Documented under "Crash Safety"
 * in the POD. */

ndarray.h  view on Meta::CPAN

    switch (h->hdr->dtype) {
        case NDA_U64: { uint64_t v; memcpy(&v, base + e*8, 8); return v; }
        case NDA_U32: { uint32_t v; memcpy(&v, base + e*4, 4); return (uint64_t)v; }
        case NDA_U16: { uint16_t v; memcpy(&v, base + e*2, 2); return (uint64_t)v; }
        case NDA_U8:  { uint8_t  v = (uint8_t)base[e];          return (uint64_t)v; }
        default: return 0;
    }
}

/* Sum every element as a double (caller holds the read lock). */
static inline double nda_sum_locked(NdaHandle *h) {
    uint64_t size = h->hdr->size, e;
    double acc = 0.0;
    for (e = 0; e < size; e++) acc += nda_load_nv(h, e);
    return acc;
}

/* Find the flat index of the min (want_max=0) or max (want_max=1) element,
 * comparing in the element's NATIVE type so that i64/u64 values above 2^53
 * (which collapse/mis-order as doubles) are ranked exactly.  Float dtypes
 * compare as double.  Caller holds the read lock; size >= 1 always. */
static inline uint64_t nda_argextreme_locked(NdaHandle *h, int want_max) {
    uint64_t size = h->hdr->size, e, best = 0;
    uint32_t dt = h->hdr->dtype;
    if (nda_is_float(dt)) {
        double bestv = nda_load_nv(h, 0);
        for (e = 1; e < size; e++) {
            double v = nda_load_nv(h, e);
            if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
        }
    } else if (nda_is_signed(dt)) {
        int64_t bestv = nda_load_i64(h, 0);

xt/crash_recovery.t  view on Meta::CPAN

use strict;
use warnings;
use Test::More;
plan skip_all => 'author test' unless $ENV{AUTHOR_TESTING};
use Data::NDArray::Shared;

# A child cannot finish 2M write-locked fills in 50ms; SIGKILL it mid-storm while
# it may hold the write lock, then verify the parent can still take the write lock
# and mutate -- the futex rwlock's dead-owner recovery. The anonymous MAP_SHARED
# mapping is inherited across fork.
my $h = Data::NDArray::Shared->new(undef, "i64", 1000);
my $pid = fork // die $!;
if (!$pid) { $h->fill($_ & 0x7f) for 1 .. 2_000_000; exit 0 }
select undef, undef, undef, 0.05;
kill 'KILL', $pid;
waitpid $pid, 0;



( run in 0.734 second using v1.01-cache-2.11-cpan-bbe5e583499 )