Data-Queue-Shared
view release on metacpan or search on metacpan
$q->sync; # msync â flush to disk for crash durability
$q->unlink; # remove backing file
Class->unlink($path); # class method form
my $p = $q->path; # backing file path
my $s = $q->stats; # diagnostic hashref
Stats keys: "size", "capacity", "mmap_size", "push_ok", "pop_ok",
"push_full", "pop_empty", "recoveries", "push_waiters", "pop_waiters".
Str queues additionally include "arena_cap" and "arena_used". All
counters are approximate under concurrent access (diagnostic only).
"push_waiters"/"pop_waiters" show currently blocked producers/consumers.
Event Loop Integration (eventfd)
my $fd = $q->eventfd; # create eventfd, returns fd number
$q->eventfd_set($fd); # use an existing fd (e.g. inherited via fork)
my $fd = $q->fileno; # current eventfd (-1 if none)
$q->notify; # signal eventfd (call after push)
$q->eventfd_consume; # drain notification counter
Notification is opt-in: "push" does not write to the eventfd
automatically. Call "notify" explicitly after pushing. This gives full
Newx(args, count, struct qsm_arg);
SAVEFREEPV(args);
for (uint32_t i = 0; i < count; i++) {
SV *sv = ST(i + 1);
args[i].str = SvPV(sv, args[i].len);
args[i].utf8 = SvUTF8(sv) ? true : false;
}
}
queue_mutex_lock(h->hdr);
for (uint32_t i = 0; i < count; i++) {
int r = queue_str_push_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
if (r != 1) break;
pushed++;
}
queue_mutex_unlock(h->hdr);
if (pushed) queue_wake_consumers_n(h->hdr, pushed);
RETVAL = pushed;
OUTPUT:
RETVAL
/* Cap count at capacity: the queue can't hold more than capacity items,
* so a single pop_multi can't return more than that. This also prevents
* size_t overflow in the items_buf allocation for adversarial inputs. */
if (count > h->capacity) count = h->capacity;
if (count > 0) {
items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
if (!items_buf) croak("Data::Queue::Shared::Str: out of memory");
}
queue_mutex_lock(h->hdr);
for (UV i = 0; i < count; i++) {
last_r = queue_str_pop_locked(h, &str, &len, &utf8);
if (last_r <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].utf8 = utf8;
n++;
}
queue_mutex_unlock(h->hdr);
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
/* Hoist SV construction out of the mutex (see pop_multi). */
struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
UV drained_n = 0;
int last_r = 0;
int oom = 0;
queue_mutex_lock(h->hdr);
while (max_count-- > 0) {
last_r = queue_str_pop_locked(h, &str, &len, &utf8);
if (last_r <= 0) break;
struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
char *c = (char *)malloc(len ? len : 1);
if (!it || !c) { free(it); free(c); oom = 1; break; }
if (len) memcpy(c, str, len);
it->buf = c; it->len = len; it->utf8 = utf8; it->next = NULL;
if (drained_tail) drained_tail->next = it; else drained_head = it;
drained_tail = it;
drained_n++;
}
lib/Data/Queue/Shared.pm view on Meta::CPAN
$q->sync; # msync â flush to disk for crash durability
$q->unlink; # remove backing file
Class->unlink($path); # class method form
my $p = $q->path; # backing file path
my $s = $q->stats; # diagnostic hashref
Stats keys: C<size>, C<capacity>, C<mmap_size>, C<push_ok>, C<pop_ok>,
C<push_full>, C<pop_empty>, C<recoveries>, C<push_waiters>, C<pop_waiters>.
Str queues additionally include C<arena_cap> and C<arena_used>.
All counters are approximate under concurrent access (diagnostic only).
C<push_waiters>/C<pop_waiters> show currently blocked producers/consumers.
=head2 Event Loop Integration (eventfd)
my $fd = $q->eventfd; # create eventfd, returns fd number
$q->eventfd_set($fd); # use an existing fd (e.g. inherited via fork)
my $fd = $q->fileno; # current eventfd (-1 if none)
$q->notify; # signal eventfd (call after push)
$q->eventfd_consume; # drain notification counter
Notification is B<opt-in>: C<push> does not write to the eventfd
uint32_t mode; /* 8: QUEUE_MODE_INT or QUEUE_MODE_STR */
uint32_t capacity; /* 12: max elements (power of 2) */
uint64_t total_size; /* 16: mmap size */
uint64_t slots_off; /* 24: offset to slot array */
uint64_t arena_off; /* 32: str mode: offset to arena; int: 0 */
uint64_t arena_cap; /* 40: str mode: arena byte capacity; int: 0 */
uint8_t _pad0[16]; /* 48-63 */
/* ---- Cache line 1 (64-127): head / consumer hot ---- */
uint64_t head; /* 64: consumer position */
uint32_t pop_waiters; /* 72: count of blocked consumers */
uint32_t pop_futex; /* 76: futex word for consumer wakeup */
uint8_t _pad1[48]; /* 80-127 */
/* ---- Cache line 2 (128-191): tail / producer hot ---- */
uint64_t tail; /* 128: producer position */
uint32_t push_waiters; /* 136: count of blocked producers */
uint32_t push_futex; /* 140: futex word for producer wakeup */
uint8_t _pad2[48]; /* 144-191 */
/* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
uint32_t mutex_waiters; /* 196 */
uint32_t arena_wpos; /* 200: str mode: next write position in arena */
uint32_t arena_used; /* 204: str mode: total arena bytes consumed */
uint64_t stat_push_ok; /* 208 */
uint64_t stat_pop_ok; /* 216 */
spin = 0;
}
}
static inline void queue_mutex_unlock(QueueHeader *hdr) {
__atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
/* Wake up to `n` blocked consumers (after batch push). Each woken
* consumer pops at most one item, so batch publishers must wake `n`
* (not 1) to drain a multi-item commit without leaving consumers
* sleeping on still-available items. */
static inline void queue_wake_consumers_n(QueueHeader *hdr, uint32_t n) {
if (n == 0) return;
if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE,
n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
}
}
/* Wake blocked consumers (after single push) */
static inline void queue_wake_consumers(QueueHeader *hdr) {
queue_wake_consumers_n(hdr, 1);
}
/* Wake up to `n` blocked producers (after batch pop). See
* queue_wake_consumers_n for the batching rationale. */
static inline void queue_wake_producers_n(QueueHeader *hdr, uint32_t n) {
if (n == 0) return;
if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE,
n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
}
}
/* Wake blocked producers (after single pop) */
static inline void queue_wake_producers(QueueHeader *hdr) {
queue_wake_producers_n(hdr, 1);
}
/* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
static inline int queue_remaining_time(const struct timespec *deadline,
struct timespec *remaining) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
remaining->tv_sec = deadline->tv_sec - now.tv_sec;
DEFINE_INT_QUEUE(int32, QueueInt32Slot, int32_t, uint32_t, int32_t)
/* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
DEFINE_INT_QUEUE(int16, QueueInt16Slot, int16_t, uint32_t, int32_t)
/* ================================================================
* Str queue: mutex-protected with circular arena
* ================================================================ */
/* Push one item while mutex is already held. Returns 1=ok, 0=full, -2=too long. */
static inline int queue_str_push_locked(QueueHandle *h, const char *str,
uint32_t len, bool utf8) {
QueueHeader *hdr = h->hdr;
if (len > QUEUE_STR_LEN_MASK) return -2;
if (hdr->tail - hdr->head >= h->capacity) {
__atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
return 0;
}
hdr->arena_wpos = pos + alloc;
hdr->arena_used += (uint32_t)skip;
hdr->tail++;
__atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
return 1;
}
static inline int queue_str_try_push(QueueHandle *h, const char *str,
uint32_t len, bool utf8) {
queue_mutex_lock(h->hdr);
int r = queue_str_push_locked(h, str, len, utf8);
queue_mutex_unlock(h->hdr);
if (r == 1) queue_wake_consumers(h->hdr);
return r;
}
/* Pop one item while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int queue_str_pop_locked(QueueHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8) {
QueueHeader *hdr = h->hdr;
if (hdr->tail == hdr->head) {
__atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
return 0;
}
uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
hdr->arena_wpos = 0;
hdr->head++;
__atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
return 1;
}
static inline int queue_str_try_pop(QueueHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8) {
queue_mutex_lock(h->hdr);
int r = queue_str_pop_locked(h, out_str, out_len, out_utf8);
queue_mutex_unlock(h->hdr);
if (r == 1) queue_wake_producers(h->hdr);
return r;
}
static int queue_str_push_wait(QueueHandle *h, const char *str,
uint32_t len, bool utf8, double timeout) {
int r = queue_str_try_push(h, str, len, utf8);
if (r != 0) return r; /* 1 = success, -2 = too long */
if (timeout == 0) return 0;
}
static void queue_str_clear(QueueHandle *h) {
QueueHeader *hdr = h->hdr;
queue_mutex_lock(hdr);
hdr->head = 0;
hdr->tail = 0;
hdr->arena_wpos = 0;
hdr->arena_used = 0;
queue_mutex_unlock(hdr);
/* clear is a bulk transition â wake every blocked producer and
* consumer so they re-evaluate state, not just one of each. */
if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
}
t/03-multiprocess.t view on Meta::CPAN
if ($pid == 0) {
my $cq = Data::Queue::Shared::Str->new($path5, 4, 4096);
select(undef, undef, undef, 0.1);
$cq->clear;
POSIX::_exit(0);
}
my $t0 = time;
ok $q->push_wait("after_clear", 30), 'push_wait succeeded after clear()';
cmp_ok time - $t0, '<', 20, 'push_wait unblocked (not full-timeout hang)';
waitpid($pid, 0);
$q->unlink;
};
# Str: clear() wakes blocked pop_wait consumers
my $path6 = tmpnam() . '.shm';
END { unlink $path6 if $$ == $main_pid && $path6 && -f $path6 }
subtest 'str clear wakes pop_wait' => sub {
my $q = Data::Queue::Shared::Str->new($path6, 4, 4096);
my $pid = fork();
die "fork: $!" unless defined $pid;
if ($pid == 0) {
my $cq = Data::Queue::Shared::Str->new($path6, 4, 4096);
select(undef, undef, undef, 0.1);
$cq->clear; # should wake the blocked consumer
select(undef, undef, undef, 0.1);
$cq->push("after_clear"); # then push something
POSIX::_exit(0);
}
# Parent blocks in pop_wait â clear wakes it, then it re-blocks,
# then the push wakes it again with actual data
my $t0 = time;
my $val = $q->pop_wait(30);
is $val, "after_clear", 'got value pushed after clear';
if ($pid == 0) {
# Producer: wait a bit, push
select undef, undef, undef, 0.3;
$q->push(42);
_exit(0);
}
# Install a SIGUSR1 handler that fires during the pop_wait
$SIG{USR1} = sub { diag "SIGUSR1 received" };
# Schedule a SIGUSR1 to ourself while blocked
my $signaller_pid = fork // die;
if ($signaller_pid == 0) {
select undef, undef, undef, 0.1;
kill USR1 => getppid();
_exit(0);
}
my $t0 = time;
my $v = $q->pop_wait(2.0);
my $dt = time - $t0;
xt/eintr_sigalrm.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
use POSIX ();
use Time::HiRes qw(time);
# SIGALRM during futex-based pop_wait: the syscall may return EINTR; the
# wrapper must either retry transparently or surface a clean error, and
# must not leave waiters deadlocked.
use Data::Queue::Shared::Int;
my $q = Data::Queue::Shared::Int->new_memfd("eintr", 4);
# Arrange SIGALRM to fire ~100ms into a 500ms wait.
my $got_alrm = 0;
local $SIG{ALRM} = sub { $got_alrm++ };
POSIX::setitimer(POSIX::ITIMER_REAL(), 0.1, 0) if POSIX->can('setitimer');
( run in 2.320 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )