Data-SpatialHash-Shared
view release on metacpan or search on metacpan
static inline int64_t sph_floor_cell(double v, double cs) {
double d = floor(v / cs);
if (!isfinite(d)) return 0;
if (d >= (double)SPH_CELL_LIMIT) return SPH_CELL_LIMIT;
if (d <= -(double)SPH_CELL_LIMIT) return -SPH_CELL_LIMIT;
return (int64_t)d;
}
/* positive modulo of a cell index into [0, n); n <= 0 means "no wrap on this axis" */
static inline int64_t sph_wrap_cell(int64_t c, int64_t n) {
if (n <= 0) return c;
int64_t m = c % n;
return m < 0 ? m + n : m;
}
/* shortest per-axis separation; with wrap, the minimum-image (toroidal) distance */
static inline double sph_axis_delta(double a, double b, double world) {
double d = fabs(a - b);
return (world > 0.0 && d > world * 0.5) ? world - d : d;
}
/* raw integer cell of a position (no wrap) -- used for box-corner span math */
static inline void sph_cell_raw(const SpatialHandle *h, const double pos[3], int64_t cell[3]) {
double cs = h->hdr->cell_size;
cell[0] = sph_floor_cell(pos[0], cs);
cell[1] = sph_floor_cell(pos[1], cs);
cell[2] = sph_floor_cell(pos[2], cs);
}
/* storage cell: the raw cell wrapped into [0,nx) per axis when toroidal, so a
point near x=0 and one near x=world land in adjacent cells (0 and nx-1) */
static inline void sph_cell_of(const SpatialHandle *h, const double pos[3], int64_t cell[3]) {
sph_cell_raw(h, pos, cell);
if (h->wrap) {
cell[0] = sph_wrap_cell(cell[0], h->wrap_cells[0]);
cell[1] = sph_wrap_cell(cell[1], h->wrap_cells[1]);
cell[2] = sph_wrap_cell(cell[2], h->wrap_cells[2]);
}
}
static inline uint32_t sph_bucket_of_cell(const SpatialHandle *h, const int64_t cell[3]) {
return (uint32_t)(XXH3_64bits(cell, 3 * sizeof(int64_t)) & (h->hdr->num_buckets - 1));
}
static inline int sph_cell_eq(const int64_t a[3], const int64_t b[3]) {
return a[0] == b[0] && a[1] == b[1] && a[2] == b[2];
}
/* ================================================================
* Futex-based write-preferring read-write lock
* with reader-slot dead-process recovery
* ================================================================ */
#define SPH_RWLOCK_SPIN_LIMIT 32
#define SPH_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void sph_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SPH_RWLOCK_WRITER_BIT 0x80000000U
#define SPH_RWLOCK_PID_MASK 0x7FFFFFFFU
#define SPH_RWLOCK_WR(pid) (SPH_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SPH_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/version change).
* Documented under "Crash Safety" in the POD. */
static inline int sph_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing shared state, then release.
* Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
* recovering process can detect and re-recover if we crash mid-recovery. */
static inline void sph_recover_stale_lock(SpatialHandle *h, uint32_t observed_rwlock) {
SphHeader *hdr = h->hdr;
uint32_t mypid = SPH_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* We now hold the write lock as mypid. No additional shared state needs
* repair here (this module has no seqlock); just release the lock. */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec sph_lock_timeout = { SPH_LOCK_TIMEOUT_SEC, 0 };
/* Process-global fork-generation counter. Incremented in the pthread_atfork
* child callback so every open handle detects a fork transition on the next
* lock call without paying a getpid() syscall on the hot path. */
static uint32_t sph_fork_gen = 1;
static pthread_once_t sph_atfork_once = PTHREAD_ONCE_INIT;
static void sph_on_fork_child(void) {
__atomic_add_fetch(&sph_fork_gen, 1, __ATOMIC_RELAXED);
}
static void sph_atfork_init(void) {
pthread_atfork(NULL, NULL, sph_on_fork_child);
}
/* Ensure this process owns a reader slot. Called from the lock helpers so
* that fork()'d children pick up their own slot lazily instead of sharing
* the parent's. Hot-path is a single relaxed load + compare; only on a
* fork-generation mismatch do we touch getpid() and scan slots. */
static inline void sph_claim_reader_slot(SpatialHandle *h) {
uint32_t cur_gen = __atomic_load_n(&sph_fork_gen, __ATOMIC_RELAXED);
if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
return;
/* Cold path -- register the atfork hook once per process, then claim. */
pthread_once(&sph_atfork_once, sph_atfork_init);
/* Re-read after pthread_once: sph_on_fork_child may have bumped it. */
cur_gen = __atomic_load_n(&sph_fork_gen, __ATOMIC_RELAXED);
uint32_t now_pid = (uint32_t)getpid();
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void sph_unpark_reader(SpatialHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void sph_park_writer(SpatialHandle *h) {
if (h->my_slot_idx != UINT32_MAX) {
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void sph_unpark_writer(SpatialHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX) {
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
}
static inline void sph_rwlock_rdlock(SpatialHandle *h) {
sph_claim_reader_slot(h);
SphHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
/* Claim subcount BEFORE bumping the shared rwlock counter. This way
* a concurrent writer-side recovery scan that sees our PID alive with
* subcount > 0 will (correctly) defer force-reset, even while we are
* still spinning trying to win the rwlock CAS. Without this, a reader
* killed between rwlock CAS-success and subcount++ would let recovery
* force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
* our eventual rdunlock dec. */
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: when lock is free (cur==0) and writers are
* waiting, yield to let the writer acquire. When readers are
* already active (cur>=1), new readers may join freely. */
if (cur > 0 && cur < SPH_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < SPH_RWLOCK_SPIN_LIMIT, 1)) {
sph_rwlock_spin_pause();
continue;
}
sph_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= SPH_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&sph_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
sph_unpark_reader(h);
sph_recover_after_timeout(h);
spin = 0;
continue;
}
}
sph_unpark_reader(h);
spin = 0;
}
}
static inline void sph_rwlock_rdunlock(SpatialHandle *h) {
SphHeader *hdr = h->hdr;
/* Release the shared counter BEFORE dropping our subcount so that
* "any live PID with subcount > 0" is a reliable in-flight indicator
* for the writer-side recovery scan. Inverting these would create a
* window where we still own a unit of rwlock but our slot subcount is
* 0, letting recovery force-reset rwlock underneath us. */
uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void sph_rwlock_wrlock(SpatialHandle *h) {
sph_claim_reader_slot(h); /* refresh cached_pid across fork */
SphHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
/* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
* any crash window between acquiring the lock and storing the owner. */
uint32_t mypid = SPH_RWLOCK_WR(h->cached_pid);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < SPH_RWLOCK_SPIN_LIMIT, 1)) {
sph_rwlock_spin_pause();
continue;
}
sph_park_writer(h);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&sph_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
sph_unpark_writer(h);
sph_recover_after_timeout(h);
spin = 0;
continue;
}
}
sph_unpark_writer(h);
spin = 0;
}
static int64_t sph_eventfd_consume(SpatialHandle *h) {
if (h->notify_fd < 0) return -1;
uint64_t v = 0;
if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
return (int64_t)v;
}
/* ================================================================
* Slot pool: alloc / free / liveness
* ================================================================ */
static inline int sph_is_live(const SpatialHandle *h, uint32_t idx) {
if (idx >= h->hdr->max_entries) return 0;
uint64_t w = h->bitmap[idx / 64];
return (w >> (idx % 64)) & 1;
}
static inline uint32_t sph_alloc_slot(SpatialHandle *h) {
uint32_t idx = h->hdr->free_head;
if (idx == SPH_NONE) return SPH_NONE;
h->hdr->free_head = h->entries[idx].next; /* pop free-list */
h->bitmap[idx / 64] |= (uint64_t)1 << (idx % 64);
h->entries[idx].next = SPH_NONE;
h->entries[idx].prev = SPH_NONE;
return idx;
}
static inline void sph_free_slot(SpatialHandle *h, uint32_t idx) {
h->bitmap[idx / 64] &= ~((uint64_t)1 << (idx % 64));
h->entries[idx].next = h->hdr->free_head; /* push free-list */
h->hdr->free_head = idx;
}
/* ================================================================
* Bucket chain: link / unlink (callers hold write lock)
* ================================================================ */
static inline void sph_bucket_link(SpatialHandle *h, uint32_t idx) {
int64_t cell[3]; sph_cell_of(h, h->entries[idx].pos, cell);
uint32_t b = sph_bucket_of_cell(h, cell);
uint32_t head = h->buckets[b];
h->entries[idx].prev = SPH_NONE;
h->entries[idx].next = head;
if (head != SPH_NONE) h->entries[head].prev = idx;
h->buckets[b] = idx;
}
static inline void sph_bucket_unlink(SpatialHandle *h, uint32_t idx) {
int64_t cell[3]; sph_cell_of(h, h->entries[idx].pos, cell);
uint32_t b = sph_bucket_of_cell(h, cell);
uint32_t p = h->entries[idx].prev, n = h->entries[idx].next;
if (p != SPH_NONE) h->entries[p].next = n; else h->buckets[b] = n;
if (n != SPH_NONE) h->entries[n].prev = p;
}
/* ================================================================
* Mutation ops (callers hold write lock)
* ================================================================ */
static inline uint32_t sph_insert_locked(SpatialHandle *h, double x, double y, double z,
int64_t value, double radius) {
uint32_t idx = sph_alloc_slot(h);
if (idx == SPH_NONE) return SPH_NONE;
h->entries[idx].pos[0] = x; h->entries[idx].pos[1] = y; h->entries[idx].pos[2] = z;
h->entries[idx].value = value;
h->entries[idx].radius = radius;
sph_bucket_link(h, idx);
h->hdr->count++;
return idx;
}
static inline int sph_remove_locked(SpatialHandle *h, uint32_t idx) {
if (!sph_is_live(h, idx)) return 0;
sph_bucket_unlink(h, idx);
sph_free_slot(h, idx);
h->hdr->count--;
return 1;
}
static inline int sph_move_locked(SpatialHandle *h, uint32_t idx, double x, double y, double z) {
if (!sph_is_live(h, idx)) return 0;
int64_t oc[3], nc[3];
sph_cell_of(h, h->entries[idx].pos, oc);
double np[3] = { x, y, z }; sph_cell_of(h, np, nc);
if (sph_cell_eq(oc, nc)) { /* same cell: just rewrite pos */
h->entries[idx].pos[0] = x; h->entries[idx].pos[1] = y; h->entries[idx].pos[2] = z;
return 1;
}
sph_bucket_unlink(h, idx); /* unlink uses OLD pos */
h->entries[idx].pos[0] = x; h->entries[idx].pos[1] = y; h->entries[idx].pos[2] = z;
sph_bucket_link(h, idx); /* link uses NEW pos */
return 1;
}
/* ================================================================
* Region query: collector + walk helpers
* ================================================================ */
typedef struct { int64_t *vals; size_t n, cap; } sph_collect_t;
static int sph_collect_push(sph_collect_t *c, int64_t v) {
if (c->n == c->cap) {
size_t nc = c->cap ? c->cap * 2 : 64;
int64_t *nv = (int64_t *)realloc(c->vals, nc * sizeof(int64_t));
if (!nv) return 0;
c->vals = nv; c->cap = nc;
}
c->vals[c->n++] = v;
return 1;
}
/* squared distance from a to b over `dims` axes; minimum-image when toroidal */
static inline double sph_dist2_w(const double world[3], const double a[3], const double b[3], int dims) {
double dx = sph_axis_delta(a[0], b[0], world[0]);
double dy = sph_axis_delta(a[1], b[1], world[1]);
double d = dx*dx + dy*dy;
if (dims == 3) { double dz = sph_axis_delta(a[2], b[2], world[2]); d += dz*dz; }
return d;
}
static inline double sph_dist2(const SpatialHandle *h, const double a[3], const double b[3], int dims) {
return sph_dist2_w(h->world, a, b, dims);
}
/* Walk one cell C's bucket chain; for each entry whose recomputed cell == C
and which passes `accept`, push its value. Returns 1 ok, 0 OOM. */
static int sph_walk_cell(SpatialHandle *h, const int64_t C[3], sph_collect_t *out,
int (*accept)(const double pos[3], void *ctx), void *ctx) {
uint32_t b = sph_bucket_of_cell(h, C);
for (uint32_t idx = h->buckets[b]; idx != SPH_NONE; idx = h->entries[idx].next) {
int64_t ec[3]; sph_cell_of(h, h->entries[idx].pos, ec);
if (!sph_cell_eq(ec, C)) continue; /* collision / dedup guard */
if (accept && !accept(h->entries[idx].pos, ctx)) continue;
if (!sph_collect_push(out, h->entries[idx].value)) return 0;
}
return 1;
}
static int sph_query_cell(SpatialHandle *h, const double p[3], int dims, sph_collect_t *out) {
double pp[3] = { p[0], p[1], dims == 3 ? p[2] : 0.0 };
int64_t C[3]; sph_cell_of(h, pp, C);
int collide = (fixed_r < 0.0);
/* one pass: largest radius (collision reach) + whether any entry is 3D, so the
enumeration covers the real z-cell range rather than only z-cell 0 */
double maxr = 0.0; int has3d = 0;
for (uint32_t i = 0; i < me; i++) {
if (!sph_is_live(h, i)) continue;
if (collide && h->entries[i].radius > maxr) maxr = h->entries[i].radius;
if (h->entries[i].pos[2] != 0.0) has3d = 1;
}
if (collide) { if (maxr <= 0.0) return SPH_Q_OK; } /* no radii -> points never collide */
else if (!(fixed_r > 0.0)) return SPH_Q_OK; /* non-positive radius -> nothing */
int dims = (h->world[2] > 0.0 || has3d) ? 3 : 2;
for (uint32_t a = 0; a < me; a++) {
if (!sph_is_live(h, a)) continue;
const double *pa = h->entries[a].pos;
double ra = h->entries[a].radius;
double reach_d = ceil((collide ? (ra + maxr) : fixed_r) / cs);
/* defensive: the API validates radii, but a corrupt stored radius could make this
NaN, which would slip past the cap check below (NaN >= X is false) into the cast */
if (!(reach_d >= 0)) reach_d = 0;
if (reach_d >= (double)SPH_MAX_QUERY_CELLS) return SPH_Q_TOOBIG; /* avoid int64 overflow */
int64_t reach = (int64_t)reach_d;
int64_t ac[3]; sph_cell_raw(h, pa, ac);
int64_t cnt[3] = { 1, 1, 1 };
uint64_t cells = 1;
for (int i = 0; i < dims; i++) {
uint64_t span = (uint64_t)(2 * reach + 1);
if (h->wrap_cells[i] > 0 && span > (uint64_t)h->wrap_cells[i]) span = (uint64_t)h->wrap_cells[i];
if (span >= (uint64_t)SPH_MAX_QUERY_CELLS) return SPH_Q_TOOBIG;
if (cells > (uint64_t)SPH_MAX_QUERY_CELLS / span) return SPH_Q_TOOBIG;
cells *= span; cnt[i] = (int64_t)span;
}
int64_t b0 = ac[0]-reach, b1 = ac[1]-reach, b2 = ac[2]-reach;
for (int64_t i0 = 0; i0 < cnt[0]; i0++) {
int64_t C0 = h->wrap ? sph_wrap_cell(b0+i0, h->wrap_cells[0]) : b0+i0;
for (int64_t i1 = 0; i1 < cnt[1]; i1++) {
int64_t C1 = h->wrap ? sph_wrap_cell(b1+i1, h->wrap_cells[1]) : b1+i1;
for (int64_t i2 = 0; i2 < cnt[2]; i2++) { /* cnt[2]==1 when dims==2 */
int64_t C2 = (dims == 3) ? (h->wrap ? sph_wrap_cell(b2+i2, h->wrap_cells[2]) : b2+i2) : 0;
int64_t C[3] = { C0, C1, C2 };
uint32_t bkt = sph_bucket_of_cell(h, C);
for (uint32_t idx = h->buckets[bkt]; idx != SPH_NONE; idx = h->entries[idx].next) {
if (idx <= a) continue; /* unordered: emit each pair once */
int64_t ec[3]; sph_cell_of(h, h->entries[idx].pos, ec);
if (!sph_cell_eq(ec, C)) continue; /* hash-collision guard */
double thr = collide ? (ra + h->entries[idx].radius) : fixed_r;
if (sph_dist2(h, pa, h->entries[idx].pos, dims) < thr*thr)
if (!emit(h->entries[a].value, h->entries[idx].value, ctx)) return SPH_Q_OOM;
}
}
}
}
}
return SPH_Q_OK;
}
/* ================================================================
* Lifecycle helpers: clear, chain stats
* ================================================================ */
static void sph_clear_locked(SpatialHandle *h) {
uint32_t me = h->hdr->max_entries, nb = h->hdr->num_buckets;
for (uint32_t b = 0; b < nb; b++) h->buckets[b] = SPH_NONE;
memset(h->bitmap, 0, (size_t)h->bitmap_words * sizeof(uint64_t));
for (uint32_t i = 0; i < me; i++) h->entries[i].next = (i+1 < me) ? i+1 : SPH_NONE;
h->hdr->free_head = 0; /* max_entries >= 1 (validated at create) */
h->hdr->count = 0;
}
static void sph_chain_stats(SpatialHandle *h, uint32_t *occupied, uint32_t *max_chain,
uint32_t *max_cell) {
uint32_t occ = 0, mx = 0, mxc = 0, nb = h->hdr->num_buckets;
for (uint32_t b = 0; b < nb; b++) {
uint32_t len = 0;
for (uint32_t idx = h->buckets[b]; idx != SPH_NONE; idx = h->entries[idx].next) {
len++;
/* per-cell occupancy: count chain entries sharing idx's cell (entries of
one cell always hash to one bucket, so a cell is a subset of a chain) */
int64_t ci[3]; sph_cell_of(h, h->entries[idx].pos, ci);
uint32_t cc = 0;
for (uint32_t j = h->buckets[b]; j != SPH_NONE; j = h->entries[j].next) {
int64_t cj[3]; sph_cell_of(h, h->entries[j].pos, cj);
if (sph_cell_eq(ci, cj)) cc++;
}
if (cc > mxc) mxc = cc;
}
if (len) { occ++; if (len > mx) mx = len; }
}
*occupied = occ; *max_chain = mx; *max_cell = mxc;
}
#endif /* SPHASH_H */
( run in 0.321 second using v1.01-cache-2.11-cpan-bbe5e583499 )