EV-Redis
view release on metacpan or search on metacpan
src/EV__Redis.xs view on Meta::CPAN
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
#include "EVAPI.h"
#include "hiredis.h"
#include "async.h"
#include "libev_adapter.h"
#include "ngx-queue.h"
#ifdef EV_REDIS_SSL
#include "hiredis_ssl.h"
#endif
typedef struct ev_redis_s ev_redis_t;
typedef struct ev_redis_cb_s ev_redis_cb_t;
typedef struct ev_redis_wait_s ev_redis_wait_t;
typedef ev_redis_t* EV__Redis;
typedef struct ev_loop* EV__Loop;
#define EV_REDIS_MAGIC 0xDEADBEEF
#define EV_REDIS_FREED 0xFEEDFACE
#define CLEAR_HANDLER(field) \
do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
struct ev_redis_s {
unsigned int magic; /* Set to EV_REDIS_MAGIC when alive */
struct ev_loop* loop;
redisAsyncContext* ac;
SV* error_handler;
SV* connect_handler;
SV* disconnect_handler;
SV* push_handler;
struct timeval* connect_timeout;
struct timeval* command_timeout;
ngx_queue_t cb_queue;
ngx_queue_t wait_queue;
int pending_count;
int waiting_count;
int max_pending; /* 0 = unlimited */
ev_redis_cb_t* current_cb; /* callback currently executing */
int resume_waiting_on_reconnect; /* keep waiting queue on disconnect */
int waiting_timeout_ms; /* max ms in waiting queue, 0 = unlimited */
ev_timer waiting_timer;
int waiting_timer_active;
/* Reconnect settings */
char* host;
int port;
char* path;
int reconnect; /* 0 = disabled, 1 = enabled */
int reconnect_delay_ms; /* delay between reconnect attempts */
int max_reconnect_attempts; /* 0 = unlimited */
int reconnect_attempts; /* current attempt count */
ev_timer reconnect_timer;
int reconnect_timer_active;
int intentional_disconnect; /* set before explicit disconnect() */
int priority; /* libev watcher priority, default 0 */
int in_cb_cleanup; /* prevent re-entrant cb_queue modification */
int in_wait_cleanup; /* prevent re-entrant wait_queue modification */
int callback_depth; /* nesting depth of C-level callbacks invoking Perl code */
int keepalive; /* TCP keepalive interval in seconds, 0 = disabled */
int prefer_ipv4; /* prefer IPv4 DNS resolution */
int prefer_ipv6; /* prefer IPv6 DNS resolution */
char* source_addr; /* local address to bind to */
unsigned int tcp_user_timeout; /* TCP_USER_TIMEOUT in ms, 0 = OS default */
int cloexec; /* set SOCK_CLOEXEC on socket */
int reuseaddr; /* set SO_REUSEADDR on socket */
redisAsyncContext* ac_saved; /* saved ac pointer for deferred disconnect cleanup */
#ifdef EV_REDIS_SSL
redisSSLContext* ssl_ctx;
#endif
};
struct ev_redis_cb_s {
SV* cb;
ngx_queue_t queue;
int persist;
int skipped;
int sub_count; /* subscription channels remaining (for persistent commands) */
};
struct ev_redis_wait_s {
char** argv;
size_t* argvlen;
int argc;
SV* cb;
int persist;
ngx_queue_t queue;
ev_tstamp queued_at;
};
/* Shared error strings (initialized in BOOT) */
static SV* err_skipped = NULL;
static SV* err_waiting_timeout = NULL;
static SV* err_disconnected = NULL;
/* Check for unsubscribe-family commands. These are persistent (stay in cb_queue)
* but hiredis ignores their callbacks â replies go through the subscribe callback. */
static int is_unsubscribe_command(const char* cmd) {
char c = cmd[0];
if (c == 'u' || c == 'U') return (0 == strcasecmp(cmd, "unsubscribe"));
if (c == 'p' || c == 'P') return (0 == strcasecmp(cmd, "punsubscribe"));
if (c == 's' || c == 'S') return (0 == strcasecmp(cmd, "sunsubscribe"));
return 0;
}
static int is_persistent_command(const char* cmd) {
char c = cmd[0];
if (c == 's' || c == 'S') {
if (0 == strcasecmp(cmd, "subscribe")) return 1;
if (0 == strcasecmp(cmd, "ssubscribe")) return 1;
if (0 == strcasecmp(cmd, "sunsubscribe")) return 1;
return 0;
}
if (c == 'u' || c == 'U') {
return (0 == strcasecmp(cmd, "unsubscribe"));
}
if (c == 'p' || c == 'P') {
if (0 == strcasecmp(cmd, "psubscribe")) return 1;
if (0 == strcasecmp(cmd, "punsubscribe")) return 1;
src/EV__Redis.xs view on Meta::CPAN
}
static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr) {
EV__Redis self = (EV__Redis)ac->data;
redisReply* reply = (redisReply*)reply_ptr;
if (self == NULL) return;
if (self->magic != EV_REDIS_MAGIC) return;
if (NULL == self->push_handler) return;
if (NULL == reply) return;
self->callback_depth++;
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(EV__redis_decode_reply(reply)));
PUTBACK;
call_sv(self->push_handler, G_DISCARD | G_EVAL);
if (SvTRUE(ERRSV)) {
warn("EV::Redis: exception in push handler: %s", SvPV_nolen(ERRSV));
}
FREETMPS;
LEAVE;
}
self->callback_depth--;
check_destroyed(self);
}
static void pre_connect_common(EV__Redis self, redisOptions* opts) {
if (NULL != self->connect_timeout) {
opts->connect_timeout = self->connect_timeout;
}
if (NULL != self->command_timeout) {
opts->command_timeout = self->command_timeout;
}
if (self->prefer_ipv4) {
opts->options |= REDIS_OPT_PREFER_IPV4;
}
else if (self->prefer_ipv6) {
opts->options |= REDIS_OPT_PREFER_IPV6;
}
if (self->cloexec) {
opts->options |= REDIS_OPT_SET_SOCK_CLOEXEC;
}
if (self->reuseaddr) {
opts->options |= REDIS_OPT_REUSEADDR;
}
if (NULL != self->source_addr && NULL == self->path) {
opts->endpoint.tcp.source_addr = self->source_addr;
}
}
/* Set up a newly allocated redisAsyncContext: SSL, keepalive, libev, callbacks.
* On failure: frees ac, nulls self->ac, emits error with err_prefix. */
static int post_connect_setup(EV__Redis self, const char* err_prefix) {
self->ac_saved = NULL;
self->ac->data = (void*)self;
#ifdef EV_REDIS_SSL
if (NULL != self->ssl_ctx) {
if (REDIS_OK != redisInitiateSSLWithContext(&self->ac->c, self->ssl_ctx)) {
SV* err = sv_2mortal(newSVpvf("%s: SSL initiation failed: %s",
err_prefix, self->ac->errstr[0] ? self->ac->errstr : "unknown error"));
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error(self, err);
return REDIS_ERR;
}
}
#endif
if (self->keepalive > 0) {
redisEnableKeepAliveWithInterval(&self->ac->c, self->keepalive);
}
if (self->tcp_user_timeout > 0) {
redisSetTcpUserTimeout(&self->ac->c, self->tcp_user_timeout);
}
if (REDIS_OK != redisLibevAttach(self->loop, self->ac)) {
SV* err = sv_2mortal(newSVpvf("%s: cannot attach libev", err_prefix));
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error(self, err);
return REDIS_ERR;
}
if (self->priority != 0) {
redisLibevSetPriority(self->ac, self->priority);
}
redisAsyncSetConnectCallbackNC(self->ac, EV__redis_connect_cb);
redisAsyncSetDisconnectCallback(self->ac, (redisDisconnectCallback*)EV__redis_disconnect_cb);
if (NULL != self->push_handler) {
redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
}
if (self->ac->err) {
SV* err = sv_2mortal(newSVpvf("%s: %s", err_prefix, self->ac->errstr));
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error(self, err);
return REDIS_ERR;
}
return REDIS_OK;
}
static SV* EV__redis_decode_reply(redisReply* reply) {
SV* res;
switch (reply->type) {
case REDIS_REPLY_STRING:
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_BIGNUM:
case REDIS_REPLY_VERB:
res = newSVpvn(reply->str, reply->len);
break;
case REDIS_REPLY_INTEGER:
res = newSViv(reply->integer);
break;
case REDIS_REPLY_DOUBLE:
res = newSVnv(reply->dval);
break;
case REDIS_REPLY_BOOL:
res = newSViv(reply->integer ? 1 : 0);
break;
case REDIS_REPLY_NIL:
res = newSV(0);
src/EV__Redis.xs view on Meta::CPAN
case REDIS_REPLY_MAP:
case REDIS_REPLY_SET:
case REDIS_REPLY_ATTR:
case REDIS_REPLY_PUSH: {
AV* av = newAV();
size_t i;
if (reply->elements > 0) {
av_extend(av, (SSize_t)(reply->elements - 1));
for (i = 0; i < reply->elements; i++) {
if (NULL != reply->element[i]) {
av_push(av, EV__redis_decode_reply(reply->element[i]));
}
else {
av_push(av, newSV(0));
}
}
}
res = newRV_noinc((SV*)av);
break;
}
default:
/* Unknown type, return undef */
res = newSV(0);
break;
}
return res;
}
static void EV__redis_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
EV__Redis self = (EV__Redis)c->data;
ev_redis_cb_t* cbt;
SV* sv_reply;
SV* sv_err;
cbt = (ev_redis_cb_t*)privdata;
if (cbt->skipped) {
if (!cbt->persist || NULL == reply) {
/* Multi-channel persistent: hiredis fires once per channel with
* same cbt. Decrement sub_count, free only on last call. */
if (cbt->persist && NULL == reply && cbt->sub_count > 1) {
cbt->sub_count--;
return;
}
Safefree(cbt);
}
else if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
cbt->sub_count--;
if (cbt->sub_count <= 0) {
Safefree(cbt);
}
}
return;
}
/* self is NULL when DESTROY nulled ac->data (deferred free inside
* REDIS_IN_CALLBACK) or during PL_dirty. Still invoke the callback
* with a disconnect error so users can clean up resources. The hiredis
* context (c) is still alive here â safe to read c->errstr.
* cb may be NULL during PL_dirty where we pre-null it.
* For persistent commands (multi-channel subscribe), hiredis fires
* reply_cb once per channel with the same cbt. Invoke the callback
* only once (null cb after), use sub_count to track when to free. */
if (self == NULL) {
if (NULL != cbt->cb) {
invoke_callback_error(cbt->cb,
sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
SvREFCNT_dec(cbt->cb);
cbt->cb = NULL;
}
if (cbt->persist && reply == NULL && cbt->sub_count > 1) {
cbt->sub_count--;
return;
}
Safefree(cbt);
return;
}
/* If self is marked as freed (during DESTROY), we still invoke the
* callback with an error, but skip any self->field access afterward.
* For persistent commands, don't free cbt here â leave it in the queue
* for remove_cb_queue_sv to clean up after redisAsyncFree returns. */
if (self->magic == EV_REDIS_FREED) {
if (NULL != cbt->cb) {
self->callback_depth++;
invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
self->callback_depth--;
SvREFCNT_dec(cbt->cb);
cbt->cb = NULL;
}
if (!cbt->persist) {
ngx_queue_remove(&cbt->queue);
Safefree(cbt);
}
check_destroyed(self);
return;
}
/* Unknown magic - memory corruption, skip.
* Don't touch queue pointers (self's memory may be garbage).
* Always decrement refcount since callback will never be invoked again. */
if (self->magic != EV_REDIS_MAGIC) {
if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
Safefree(cbt);
return;
}
self->current_cb = cbt;
self->callback_depth++;
if (NULL != cbt->cb) {
if (NULL == reply) {
sv_err = sv_2mortal(newSVpv(
c->errstr[0] ? c->errstr : "disconnected", 0));
invoke_callback_error(cbt->cb, sv_err);
}
else {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, 2);
sv_reply = sv_2mortal(EV__redis_decode_reply((redisReply*)reply));
if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
PUSHs(&PL_sv_undef);
PUSHs(sv_reply);
}
else {
PUSHs(sv_reply);
}
PUTBACK;
call_sv(cbt->cb, G_DISCARD | G_EVAL);
if (SvTRUE(ERRSV)) {
warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
}
FREETMPS;
LEAVE;
}
}
self->callback_depth--;
self->current_cb = NULL;
/* If DESTROY was called during our callback (e.g., user undef'd $redis),
* self->magic is EV_REDIS_FREED but self is still valid (DESTROY defers
* Safefree when callback_depth > 0). Complete cleanup here.
* For persistent commands (multi-channel subscribe), hiredis will fire
* reply_cb again for remaining channels via __redisAsyncFree. Null the
* callback to prevent double invocation, but leave cbt alive so those
* later calls see it and can track sub_count for proper cleanup. */
if (self->magic == EV_REDIS_FREED) {
if (NULL != cbt->cb) {
SvREFCNT_dec(cbt->cb);
cbt->cb = NULL;
}
if (!cbt->persist) {
Safefree(cbt);
}
check_destroyed(self);
return;
}
if (cbt->skipped) {
/* Defensive check: handles edge case where callback is marked skipped
* during its own execution (e.g., via reentrant event loop where a
* nested callback overwrites current_cb, allowing skip_pending to
* process this callback). ngx_queue_remove is safe here due to
* ngx_queue_init in skip_pending. Don't decrement pending_count since
* skip_pending already did when it set skipped=1. */
ngx_queue_remove(&cbt->queue);
/* For persistent commands (e.g., SUBSCRIBE), hiredis fires reply_cb
* once per subscribed channel during disconnect. Only free cbt on the
* last channel to prevent use-after-free. */
if (cbt->persist && cbt->sub_count > 1) {
cbt->sub_count--;
return;
}
Safefree(cbt);
self->callback_depth++;
send_next_waiting(self);
self->callback_depth--;
check_destroyed(self);
return;
}
/* Detect end of persistent subscription: when all channels from a
* SUBSCRIBE command have been unsubscribed, hiredis removes its internal
* callback entry. Clean up our cbt to prevent orphaned queue entries. */
if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
cbt->sub_count--;
if (cbt->sub_count <= 0) {
/* All channels unsubscribed â persistent commands are not counted
* in pending_count, so don't decrement it. */
ngx_queue_remove(&cbt->queue);
self->callback_depth++;
if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
Safefree(cbt);
self->callback_depth--;
check_destroyed(self);
return;
}
}
/* Connection teardown with active subscription: hiredis fires reply_cb
* once per subscribed channel (from dict iteration in __redisAsyncFree).
* Track sub_count and remove from queue on last channel to prevent
* disconnect_cb's remove_cb_queue_sv from invoking the callback again. */
if (cbt->persist && NULL == reply) {
if (cbt->sub_count > 1) {
src/EV__Redis.xs view on Meta::CPAN
redisOptions opts;
if (NULL != self->ac) {
croak("already connected");
}
self->intentional_disconnect = 0;
self->reconnect_attempts = 0;
clear_connection_params(self);
self->host = savepv(hostname);
self->port = port;
memset(&opts, 0, sizeof(opts));
pre_connect_common(self, &opts);
REDIS_OPTIONS_SET_TCP(&opts, hostname, port);
self->ac = redisAsyncConnectWithOptions(&opts);
if (NULL == self->ac) {
croak("cannot allocate memory");
}
(void)post_connect_setup(self, "connect error");
}
void
connect_unix(EV::Redis self, const char* path);
CODE:
{
redisOptions opts;
if (NULL != self->ac) {
croak("already connected");
}
self->intentional_disconnect = 0;
self->reconnect_attempts = 0;
clear_connection_params(self);
self->path = savepv(path);
memset(&opts, 0, sizeof(opts));
pre_connect_common(self, &opts);
REDIS_OPTIONS_SET_UNIX(&opts, path);
self->ac = redisAsyncConnectWithOptions(&opts);
if (NULL == self->ac) {
croak("cannot allocate memory");
}
(void)post_connect_setup(self, "connect error");
}
void
disconnect(EV::Redis self);
CODE:
{
/* Stop any pending reconnect timer on explicit disconnect */
self->intentional_disconnect = 1;
stop_reconnect_timer(self);
self->reconnect_attempts = 0;
if (NULL == self->ac) {
/* Already disconnected â still stop waiting timer and clear
* wait queue (e.g., resume_waiting_on_reconnect kept them alive
* after a connection drop, but user now explicitly disconnects). */
stop_waiting_timer(self);
if (!ngx_queue_empty(&self->wait_queue)) {
self->callback_depth++;
clear_wait_queue_sv(self, err_disconnected);
self->callback_depth--;
check_destroyed(self);
}
return;
}
/* Save ac pointer for deferred disconnect: when inside a hiredis
* callback, redisAsyncDisconnect only sets REDIS_DISCONNECTING and
* returns. DESTROY needs ac_saved to NULL ac->data if the Perl object
* is freed before the deferred disconnect completes.
* Only set when REDIS_IN_CALLBACK: in the synchronous path,
* disconnect_cb fires during redisAsyncDisconnect and clears ac_saved;
* but if DESTROY fires nested during that processing (SvREFCNT_dec
* dropping last ref), it would NULL ac->data, causing disconnect_cb to
* skip cleanup, leaving ac_saved dangling after ac is freed. */
if (self->ac->c.flags & REDIS_IN_CALLBACK) {
self->ac_saved = self->ac;
}
/* Protect against Safefree(self) if disconnect_cb fires synchronously
* and user's on_disconnect handler drops the last Perl reference. */
self->callback_depth++;
redisAsyncDisconnect(self->ac);
self->ac = NULL;
self->callback_depth--;
if (check_destroyed(self)) return;
}
int
is_connected(EV::Redis self);
CODE:
{
RETVAL = (NULL != self->ac) ? 1 : 0;
}
OUTPUT:
RETVAL
SV*
connect_timeout(EV::Redis self, SV* timeout_ms = NULL);
CODE:
{
RETVAL = timeout_accessor(&self->connect_timeout, timeout_ms, "connect_timeout");
}
OUTPUT:
RETVAL
SV*
command_timeout(EV::Redis self, SV* timeout_ms = NULL);
CODE:
{
RETVAL = timeout_accessor(&self->command_timeout, timeout_ms, "command_timeout");
/* Apply to active connection immediately */
if (NULL != timeout_ms && SvOK(timeout_ms) && NULL != self->ac && NULL != self->command_timeout) {
redisAsyncSetTimeout(self->ac, *self->command_timeout);
}
}
OUTPUT:
src/EV__Redis.xs view on Meta::CPAN
* callback_depth protects against DESTROY if a failed command's
* error callback drops the last Perl reference to self. */
self->callback_depth++;
send_next_waiting(self);
self->callback_depth--;
if (check_destroyed(self)) XSRETURN_IV(0);
}
RETVAL = self->max_pending;
}
OUTPUT:
RETVAL
SV*
waiting_timeout(EV::Redis self, SV* timeout_ms = NULL);
CODE:
{
if (NULL != timeout_ms && SvOK(timeout_ms)) {
IV ms = SvIV(timeout_ms);
validate_timeout_ms(ms, "waiting_timeout");
self->waiting_timeout_ms = (int)ms;
schedule_waiting_timer(self);
}
RETVAL = newSViv((IV)self->waiting_timeout_ms);
}
OUTPUT:
RETVAL
int
resume_waiting_on_reconnect(EV::Redis self, SV* value = NULL);
CODE:
{
if (NULL != value && SvOK(value)) {
self->resume_waiting_on_reconnect = SvTRUE(value) ? 1 : 0;
}
RETVAL = self->resume_waiting_on_reconnect;
}
OUTPUT:
RETVAL
int
priority(EV::Redis self, SV* value = NULL);
CODE:
{
if (NULL != value && SvOK(value)) {
int prio = SvIV(value);
/* Clamp priority to libev valid range: EV_MINPRI (-2) to EV_MAXPRI (+2) */
if (prio < -2) prio = -2;
if (prio > 2) prio = 2;
self->priority = prio;
if (NULL != self->ac) {
redisLibevSetPriority(self->ac, prio);
}
}
RETVAL = self->priority;
}
OUTPUT:
RETVAL
int
keepalive(EV::Redis self, SV* value = NULL);
CODE:
{
if (NULL != value && SvOK(value)) {
int interval = SvIV(value);
if (interval < 0) croak("keepalive interval must be non-negative");
if (interval > MAX_TIMEOUT_MS / 1000) croak("keepalive interval too large");
self->keepalive = interval;
if (NULL != self->ac && interval > 0) {
redisEnableKeepAliveWithInterval(&self->ac->c, interval);
}
}
RETVAL = self->keepalive;
}
OUTPUT:
RETVAL
int
prefer_ipv4(EV::Redis self, SV* value = NULL);
CODE:
{
if (NULL != value && SvOK(value)) {
self->prefer_ipv4 = SvTRUE(value) ? 1 : 0;
if (self->prefer_ipv4) self->prefer_ipv6 = 0;
}
RETVAL = self->prefer_ipv4;
}
OUTPUT:
RETVAL
int
prefer_ipv6(EV::Redis self, SV* value = NULL);
CODE:
{
if (NULL != value && SvOK(value)) {
self->prefer_ipv6 = SvTRUE(value) ? 1 : 0;
if (self->prefer_ipv6) self->prefer_ipv4 = 0;
}
RETVAL = self->prefer_ipv6;
}
OUTPUT:
RETVAL
SV*
source_addr(EV::Redis self, SV* value = NULL);
CODE:
{
if (items > 1) {
if (NULL != self->source_addr) {
Safefree(self->source_addr);
self->source_addr = NULL;
}
if (NULL != value && SvOK(value)) {
self->source_addr = savepv(SvPV_nolen(value));
}
}
if (NULL != self->source_addr) {
RETVAL = newSVpv(self->source_addr, 0);
} else {
RETVAL = &PL_sv_undef;
}
}
OUTPUT:
RETVAL
unsigned int
tcp_user_timeout(EV::Redis self, SV* value = NULL);
CODE:
{
if (NULL != value && SvOK(value)) {
IV ms = SvIV(value);
validate_timeout_ms(ms, "tcp_user_timeout");
self->tcp_user_timeout = (unsigned int)ms;
( run in 0.609 second using v1.01-cache-2.11-cpan-39bf76dae61 )