Data-NDArray-Shared
view release on metacpan or search on metacpan
* "$array = Data::NDArray::Shared->from_pdl($piddle, $path)"
A new shared array copied from $piddle (made physical and contiguous
first); the dtype and shape follow the piddle's type and "reverse"
of its dims. $path is the backing file ("undef" or omitted for an
anonymous mapping).
* "$array->update_from_pdl($piddle)"
Copy $piddle into this array in place (write-locked). The piddle's
type must match the dtype and its dims must equal
reverse($array->shape), else it croaks. Returns the array.
* "$piddle = $array->as_pdl_alias"
A piddle that aliases the shared mapping with no copy (a real
"PDL_DONTTOUCHDATA" ndarray over our memory): an in-place PDL
operation ("$p .= ...", "$p->inplace->...") writes straight through
to shared memory -- visible to every process that maps it -- and
reads see live data. The array is kept alive for as long as the
piddle.
This one method needs PDL at build time (it is compiled against
PDL's C API): if the module was installed without PDL present it
"croak"s, while the copy methods above keep working through a
runtime "require PDL". Reinstall with PDL installed to enable it.
Caveats. The alias bypasses the rwlock: you must coordinate access
yourself (no other process mutating concurrently), as with any
unlocked shared-memory view. Do not resize or retype the alias (a
reshape that grows it, a type conversion) -- it is a fixed window
onto the mapping; use "to_pdl"/"from_pdl" when you want an
independent, resizable copy.
* "$bytes = $array->buffer"
The raw contiguous data region as a byte string (read-locked
snapshot), row-major C-order -- useful on its own for serialization
or IPC, and the basis for "to_pdl".
"$array->update_from_bytes($bytes)" is the inverse (write-locked;
the string must be exactly "size * itemsize" bytes).
See eg/pdl_interop.pl for a worked example, including a cross-process
PDL transform on one shared array.
SHARING ACROSS PROCESSES
The array lives in a shared mapping, shared the same three ways as the
rest of the family: a backing file (every process calls "new($path,
$dtype, @shape)" on the same path), an anonymous mapping inherited
across "fork", or a memfd whose descriptor is passed to an unrelated
case NDA_U64: { uint64_t v = (uint64_t)SvUV(val); memcpy(base + e*8, &v, 8); break; }
case NDA_U32: { uint32_t v = (uint32_t)SvUV(val); memcpy(base + e*4, &v, 4); break; }
case NDA_U16: { uint16_t v = (uint16_t)SvUV(val); memcpy(base + e*2, &v, 2); break; }
case NDA_U8: { uint8_t v = (uint8_t)SvUV(val); base[e] = (char)v; break; }
}
}
/* ----------------------------------------------------------------
* Fill every element with the typed value of val.
* ---------------------------------------------------------------- */
static void nda_fill_locked(pTHX_ NdaHandle *h, SV *val) {
uint64_t size = h->hdr->size, e;
char *base = nda_data(h);
/* Decode once, then splat the raw bytes for speed + consistency. */
switch (h->hdr->dtype) {
case NDA_F64: { double v = (double)SvNV(val); for (e=0;e<size;e++) memcpy(base+e*8,&v,8); break; }
case NDA_F32: { float v = (float)SvNV(val); for (e=0;e<size;e++) memcpy(base+e*4,&v,4); break; }
case NDA_I64: { int64_t v = (int64_t)SvIV(val); for (e=0;e<size;e++) memcpy(base+e*8,&v,8); break; }
case NDA_I32: { int32_t v = (int32_t)SvIV(val); for (e=0;e<size;e++) memcpy(base+e*4,&v,4); break; }
case NDA_I16: { int16_t v = (int16_t)SvIV(val); for (e=0;e<size;e++) memcpy(base+e*2,&v,2); break; }
case NDA_I8: { int8_t v = (int8_t)SvIV(val); memset(base, (int)(unsigned char)v, (size_t)size); break; }
CT *p = (CT *)base; \
if (op == '+') { CT s = (CT)siv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] + (UT)s); } \
else { CT s = (CT)siv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] * (UT)s); } \
} while (0)
#define NDA_SCALAR_UINT(CT, UT) do { \
CT *p = (CT *)base; \
if (op == '+') { CT s = (CT)suv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] + (UT)s); } \
else { CT s = (CT)suv; for (e=0;e<size;e++) p[e] = (CT)((UT)p[e] * (UT)s); } \
} while (0)
static void nda_scalar_op_locked(pTHX_ NdaHandle *h, SV *sv, int op) {
uint64_t size = h->hdr->size, e;
char *base = nda_data(h);
if (nda_is_float(h->hdr->dtype)) {
double s = (double)SvNV(sv);
if (h->hdr->dtype == NDA_F64) {
double *p = (double *)base;
if (op == '+') for (e=0;e<size;e++) p[e] += s; else for (e=0;e<size;e++) p[e] *= s;
} else { /* F32 */
float *p = (float *)base; float fs = (float)s;
if (op == '+') for (e=0;e<size;e++) p[e] += fs; else for (e=0;e<size;e++) p[e] *= fs;
} while (0)
/* integer dtypes: compute in UT (unsigned, >= int rank) for defined wrap -- see
* the NDA_SCALAR_INT note above. */
#define NDA_EW_INT(CT, UT) do { \
CT *pa = (CT *)da; const CT *pb = (const CT *)db; \
if (op == '+') for (e=0;e<size;e++) pa[e] = (CT)((UT)pa[e] + (UT)pb[e]); \
else if (op == '-') for (e=0;e<size;e++) pa[e] = (CT)((UT)pa[e] - (UT)pb[e]); \
else for (e=0;e<size;e++) pa[e] = (CT)((UT)pa[e] * (UT)pb[e]); \
} while (0)
static void nda_elementwise_op_locked(NdaHandle *a, NdaHandle *b, int op) {
uint64_t size = a->hdr->size, e;
char *da = nda_data(a);
const char *db = nda_data(b);
switch (a->hdr->dtype) {
case NDA_F64: NDA_EW_FLT(double); break;
case NDA_F32: NDA_EW_FLT(float); break;
case NDA_I64: NDA_EW_INT(int64_t, uint64_t); break;
case NDA_I32: NDA_EW_INT(int32_t, uint32_t); break;
case NDA_I16: NDA_EW_INT(int16_t, uint32_t); break;
case NDA_I8: NDA_EW_INT(int8_t, uint32_t); break;
croak("%s: expected a Data::NDArray::Shared object", who);
NdaHandle *o = INT2PTR(NdaHandle*, SvIV(SvRV(other)));
if (!o) croak("Attempted to use a destroyed Data::NDArray::Shared object");
if (o->hdr->dtype != h->hdr->dtype)
croak("%s: dtype mismatch", who);
if (o->hdr->size != h->hdr->size)
croak("%s: size mismatch (%" UVuf " vs %" UVuf ")",
who, (UV)h->hdr->size, (UV)o->hdr->size);
if (o == h || o->hdr->array_id == h->hdr->array_id) {
nda_rwlock_wrlock(h);
nda_elementwise_op_locked(h, h, op); /* self: +=->double, -=->zero, *=->square */
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
nda_rwlock_wrunlock(h);
} else {
nda_lock_pair(h, o);
nda_elementwise_op_locked(h, o, op);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
nda_unlock_pair(h, o);
}
}
/* Resolve the uniform (path, dtype, @shape) argument convention shared by new
* and new_memfd, matching the rest of the Data::*::Shared family:
* ($path_or_undef, $dtype, @shape)
* ST(1) is ALWAYS the path/memfd label (an SV that may carry undef for an
* anonymous mapping), ST(2) is ALWAYS the dtype name string, and ST(3).. are
nda_rwlock_wrunlock(h);
SV *
fill(self, val)
SV *self
SV *val
PREINIT:
EXTRACT(self);
CODE:
nda_rwlock_wrlock(h);
nda_fill_locked(aTHX_ h, val);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
nda_rwlock_wrunlock(h);
SvREFCNT_inc(self);
RETVAL = self;
OUTPUT:
RETVAL
SV *
zero(self)
SV *self
OUTPUT:
RETVAL
NV
sum(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
nda_rwlock_rdlock(h);
RETVAL = nda_sum_locked(h);
nda_rwlock_rdunlock(h);
OUTPUT:
RETVAL
NV
mean(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
{
double acc;
nda_rwlock_rdlock(h);
acc = nda_sum_locked(h);
nda_rwlock_rdunlock(h);
RETVAL = acc / (double)h->hdr->size; /* size >= 1 always */
}
OUTPUT:
RETVAL
SV *
min(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
{
uint64_t best;
nda_rwlock_rdlock(h);
best = nda_argextreme_locked(h, 0); /* compare in native dtype */
RETVAL = nda_get_sv(aTHX_ h, best); /* dtype-correct value of the min element */
nda_rwlock_rdunlock(h);
}
OUTPUT:
RETVAL
SV *
max(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
{
uint64_t best;
nda_rwlock_rdlock(h);
best = nda_argextreme_locked(h, 1); /* compare in native dtype */
RETVAL = nda_get_sv(aTHX_ h, best);
nda_rwlock_rdunlock(h);
}
OUTPUT:
RETVAL
SV *
add_scalar(self, s)
SV *self
SV *s
PREINIT:
EXTRACT(self);
CODE:
nda_rwlock_wrlock(h);
nda_scalar_op_locked(aTHX_ h, s, '+');
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
nda_rwlock_wrunlock(h);
SvREFCNT_inc(self);
RETVAL = self;
OUTPUT:
RETVAL
SV *
mul_scalar(self, s)
SV *self
SV *s
PREINIT:
EXTRACT(self);
CODE:
nda_rwlock_wrlock(h);
nda_scalar_op_locked(aTHX_ h, s, '*');
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
nda_rwlock_wrunlock(h);
SvREFCNT_inc(self);
RETVAL = self;
OUTPUT:
RETVAL
SV *
add(self, other)
SV *self
UV
itemsize(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
RETVAL = (UV)h->hdr->itemsize; /* immutable after creation -- lock-free */
OUTPUT:
RETVAL
# Raw contiguous data region as a byte string (read-locked snapshot copy). The
# bytes are row-major C-order; pair with shape()/dtype() to interpret them.
SV *
buffer(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
{
uint64_t bytes = h->hdr->size * h->hdr->itemsize;
char *base = nda_data(h);
nda_rwlock_rdlock(h);
Copy(base, SvPVX(RETVAL), bytes, char);
nda_rwlock_rdunlock(h);
SvCUR_set(RETVAL, (STRLEN)bytes);
*SvEND(RETVAL) = '\0';
}
OUTPUT:
RETVAL
# Overwrite the whole data region from a byte string of exactly size*itemsize
# bytes (write-locked). Used by from_pdl/update_from_pdl.
void
update_from_bytes(self, src)
SV *self
SV *src
PREINIT:
EXTRACT(self);
CODE:
{
STRLEN slen;
const char *sbytes = SvPVbyte(src, slen); /* resolve + any croak BEFORE the lock */
lib/Data/NDArray/Shared.pm view on Meta::CPAN
exists &{"PDL::$name"}
or Carp::croak("this PDL has no '$name' type (needed for dtype '$dtype'); upgrade PDL");
\&{"PDL::$name"};
}
# NDArray -> a NEW (copied) PDL piddle; dims = reverse(shape).
sub to_pdl {
my ($self) = @_;
_require_pdl();
my $p = PDL->new_from_specification(_pdl_ctor($self->dtype)->(), reverse $self->shape);
${ $p->get_dataref } = $self->buffer; # read-locked snapshot
$p->upd_data;
return $p;
}
# A NEW shared NDArray copied from a piddle; $path undef => anonymous mapping.
sub from_pdl {
my ($class, $p, $path) = @_;
_require_pdl();
my $tname = "" . $p->type;
my $dt = $DTYPE_OF{$tname}
lib/Data/NDArray/Shared.pm view on Meta::CPAN
snapshot.
=item * C<< $array = Data::NDArray::Shared->from_pdl($piddle, $path) >>
A B<new> shared array B<copied> from C<$piddle> (made physical and contiguous
first); the dtype and shape follow the piddle's type and C<reverse> of its dims.
C<$path> is the backing file (C<undef> or omitted for an anonymous mapping).
=item * C<< $array->update_from_pdl($piddle) >>
Copy C<$piddle> into this array B<in place> (write-locked). The piddle's type
must match the dtype and its dims must equal C<< reverse($array-E<gt>shape) >>,
else it croaks. Returns the array.
=item * C<< $piddle = $array->as_pdl_alias >>
A piddle that B<aliases the shared mapping with no copy> (a real
C<PDL_DONTTOUCHDATA> ndarray over our memory): an B<in-place> PDL operation
(C<< $p .= ... >>, C<< $p-E<gt>inplace-E<gt>... >>) writes straight through to
shared memory -- visible to every process that maps it -- and reads see live
data. The array is kept alive for as long as the piddle.
This one method needs PDL at B<build> time (it is compiled against PDL's C API):
if the module was installed without PDL present it C<croak>s, while the copy
methods above keep working through a runtime C<require PDL>. Reinstall with PDL
installed to enable it.
B<Caveats.> The alias B<bypasses the rwlock>: you must coordinate access
yourself (no other process mutating concurrently), as with any unlocked
shared-memory view. Do not B<resize or retype> the alias (a reshape that grows
it, a type conversion) -- it is a fixed window onto the mapping; use
C<to_pdl>/C<from_pdl> when you want an independent, resizable copy.
=item * C<< $bytes = $array->buffer >>
The raw contiguous data region as a byte string (read-locked snapshot),
row-major C-order -- useful on its own for serialization or IPC, and the basis
for C<to_pdl>. C<< $array->update_from_bytes($bytes) >> is the inverse
(write-locked; the string must be exactly C<< size * itemsize >> bytes).
=back
See F<eg/pdl_interop.pl> for a worked example, including a cross-process PDL
transform on one shared array.
=head1 SHARING ACROSS PROCESSES
The array lives in a shared mapping, shared the same three ways as the rest of
the family: a B<backing file> (every process calls C<< new($path, $dtype,
static inline void nda_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define NDA_RWLOCK_WRITER_BIT 0x80000000U
#define NDA_RWLOCK_PID_MASK 0x7FFFFFFFU
#define NDA_RWLOCK_WR(pid) (NDA_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & NDA_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Documented under "Crash Safety"
* in the POD. */
switch (h->hdr->dtype) {
case NDA_U64: { uint64_t v; memcpy(&v, base + e*8, 8); return v; }
case NDA_U32: { uint32_t v; memcpy(&v, base + e*4, 4); return (uint64_t)v; }
case NDA_U16: { uint16_t v; memcpy(&v, base + e*2, 2); return (uint64_t)v; }
case NDA_U8: { uint8_t v = (uint8_t)base[e]; return (uint64_t)v; }
default: return 0;
}
}
/* Sum every element as a double (caller holds the read lock). */
static inline double nda_sum_locked(NdaHandle *h) {
uint64_t size = h->hdr->size, e;
double acc = 0.0;
for (e = 0; e < size; e++) acc += nda_load_nv(h, e);
return acc;
}
/* Find the flat index of the min (want_max=0) or max (want_max=1) element,
* comparing in the element's NATIVE type so that i64/u64 values above 2^53
* (which collapse/mis-order as doubles) are ranked exactly. Float dtypes
* compare as double. Caller holds the read lock; size >= 1 always. */
static inline uint64_t nda_argextreme_locked(NdaHandle *h, int want_max) {
uint64_t size = h->hdr->size, e, best = 0;
uint32_t dt = h->hdr->dtype;
if (nda_is_float(dt)) {
double bestv = nda_load_nv(h, 0);
for (e = 1; e < size; e++) {
double v = nda_load_nv(h, e);
if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
}
} else if (nda_is_signed(dt)) {
int64_t bestv = nda_load_i64(h, 0);
xt/crash_recovery.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
plan skip_all => 'author test' unless $ENV{AUTHOR_TESTING};
use Data::NDArray::Shared;
# A child cannot finish 2M write-locked fills in 50ms; SIGKILL it mid-storm while
# it may hold the write lock, then verify the parent can still take the write lock
# and mutate -- the futex rwlock's dead-owner recovery. The anonymous MAP_SHARED
# mapping is inherited across fork.
my $h = Data::NDArray::Shared->new(undef, "i64", 1000);
my $pid = fork // die $!;
if (!$pid) { $h->fill($_ & 0x7f) for 1 .. 2_000_000; exit 0 }
select undef, undef, undef, 0.05;
kill 'KILL', $pid;
waitpid $pid, 0;
( run in 0.389 second using v1.01-cache-2.11-cpan-bbe5e583499 )