EV-Redis

 view release on metacpan or  search on metacpan

src/EV__Redis.xs  view on Meta::CPAN

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"

#include "EVAPI.h"

#include "hiredis.h"
#include "async.h"
#include "libev_adapter.h"
#include "ngx-queue.h"

#ifdef EV_REDIS_SSL
#include "hiredis_ssl.h"
#endif

typedef struct ev_redis_s ev_redis_t;
typedef struct ev_redis_cb_s ev_redis_cb_t;
typedef struct ev_redis_wait_s ev_redis_wait_t;

typedef ev_redis_t* EV__Redis;
typedef struct ev_loop* EV__Loop;

#define EV_REDIS_MAGIC 0xDEADBEEF
#define EV_REDIS_FREED 0xFEEDFACE

#define CLEAR_HANDLER(field) \
    do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)

struct ev_redis_s {
    unsigned int magic;  /* Set to EV_REDIS_MAGIC when alive */
    struct ev_loop* loop;
    redisAsyncContext* ac;
    SV* error_handler;
    SV* connect_handler;
    SV* disconnect_handler;
    SV* push_handler;
    struct timeval* connect_timeout;
    struct timeval* command_timeout;
    ngx_queue_t cb_queue;
    ngx_queue_t wait_queue;
    int pending_count;
    int waiting_count;
    int max_pending; /* 0 = unlimited */
    ev_redis_cb_t* current_cb; /* callback currently executing */
    int resume_waiting_on_reconnect; /* keep waiting queue on disconnect */
    int waiting_timeout_ms; /* max ms in waiting queue, 0 = unlimited */
    ev_timer waiting_timer;
    int waiting_timer_active;

    /* Reconnect settings */
    char* host;
    int port;
    char* path;
    int reconnect;              /* 0 = disabled, 1 = enabled */
    int reconnect_delay_ms;     /* delay between reconnect attempts */
    int max_reconnect_attempts; /* 0 = unlimited */
    int reconnect_attempts;     /* current attempt count */
    ev_timer reconnect_timer;
    int reconnect_timer_active;
    int intentional_disconnect; /* set before explicit disconnect() */
    int priority; /* libev watcher priority, default 0 */
    int in_cb_cleanup; /* prevent re-entrant cb_queue modification */
    int in_wait_cleanup; /* prevent re-entrant wait_queue modification */
    int callback_depth; /* nesting depth of C-level callbacks invoking Perl code */
    int keepalive; /* TCP keepalive interval in seconds, 0 = disabled */
    int prefer_ipv4; /* prefer IPv4 DNS resolution */
    int prefer_ipv6; /* prefer IPv6 DNS resolution */
    char* source_addr; /* local address to bind to */
    unsigned int tcp_user_timeout; /* TCP_USER_TIMEOUT in ms, 0 = OS default */
    int cloexec; /* set SOCK_CLOEXEC on socket */
    int reuseaddr; /* set SO_REUSEADDR on socket */
    redisAsyncContext* ac_saved; /* saved ac pointer for deferred disconnect cleanup */
#ifdef EV_REDIS_SSL
    redisSSLContext* ssl_ctx;
#endif
};

struct ev_redis_cb_s {
    SV* cb;
    ngx_queue_t queue;
    int persist;
    int skipped;
    int sub_count; /* subscription channels remaining (for persistent commands) */
};

struct ev_redis_wait_s {
    char** argv;
    size_t* argvlen;
    int argc;
    SV* cb;
    int persist;
    ngx_queue_t queue;
    ev_tstamp queued_at;
};

/* Shared error strings (initialized in BOOT) */
static SV* err_skipped = NULL;
static SV* err_waiting_timeout = NULL;
static SV* err_disconnected = NULL;

/* Check for unsubscribe-family commands. These are persistent (stay in cb_queue)
 * but hiredis ignores their callbacks — replies go through the subscribe callback. */
static int is_unsubscribe_command(const char* cmd) {
    char c = cmd[0];
    if (c == 'u' || c == 'U') return (0 == strcasecmp(cmd, "unsubscribe"));
    if (c == 'p' || c == 'P') return (0 == strcasecmp(cmd, "punsubscribe"));
    if (c == 's' || c == 'S') return (0 == strcasecmp(cmd, "sunsubscribe"));
    return 0;
}

static int is_persistent_command(const char* cmd) {
    char c = cmd[0];

    if (c == 's' || c == 'S') {
        if (0 == strcasecmp(cmd, "subscribe")) return 1;
        if (0 == strcasecmp(cmd, "ssubscribe")) return 1;
        if (0 == strcasecmp(cmd, "sunsubscribe")) return 1;
        return 0;
    }
    if (c == 'u' || c == 'U') {
        return (0 == strcasecmp(cmd, "unsubscribe"));
    }
    if (c == 'p' || c == 'P') {
        if (0 == strcasecmp(cmd, "psubscribe")) return 1;
        if (0 == strcasecmp(cmd, "punsubscribe")) return 1;

src/EV__Redis.xs  view on Meta::CPAN

}

static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr) {
    EV__Redis self = (EV__Redis)ac->data;
    redisReply* reply = (redisReply*)reply_ptr;

    if (self == NULL) return;
    if (self->magic != EV_REDIS_MAGIC) return;
    if (NULL == self->push_handler) return;
    if (NULL == reply) return;

    self->callback_depth++;

    {
        dSP;

        ENTER;
        SAVETMPS;

        PUSHMARK(SP);
        XPUSHs(sv_2mortal(EV__redis_decode_reply(reply)));
        PUTBACK;

        call_sv(self->push_handler, G_DISCARD | G_EVAL);
        if (SvTRUE(ERRSV)) {
            warn("EV::Redis: exception in push handler: %s", SvPV_nolen(ERRSV));
        }

        FREETMPS;
        LEAVE;
    }

    self->callback_depth--;
    check_destroyed(self);
}

static void pre_connect_common(EV__Redis self, redisOptions* opts) {
    if (NULL != self->connect_timeout) {
        opts->connect_timeout = self->connect_timeout;
    }
    if (NULL != self->command_timeout) {
        opts->command_timeout = self->command_timeout;
    }
    if (self->prefer_ipv4) {
        opts->options |= REDIS_OPT_PREFER_IPV4;
    }
    else if (self->prefer_ipv6) {
        opts->options |= REDIS_OPT_PREFER_IPV6;
    }
    if (self->cloexec) {
        opts->options |= REDIS_OPT_SET_SOCK_CLOEXEC;
    }
    if (self->reuseaddr) {
        opts->options |= REDIS_OPT_REUSEADDR;
    }
    if (NULL != self->source_addr && NULL == self->path) {
        opts->endpoint.tcp.source_addr = self->source_addr;
    }
}

/* Set up a newly allocated redisAsyncContext: SSL, keepalive, libev, callbacks.
 * On failure: frees ac, nulls self->ac, emits error with err_prefix. */
static int post_connect_setup(EV__Redis self, const char* err_prefix) {
    self->ac_saved = NULL;
    self->ac->data = (void*)self;

#ifdef EV_REDIS_SSL
    if (NULL != self->ssl_ctx) {
        if (REDIS_OK != redisInitiateSSLWithContext(&self->ac->c, self->ssl_ctx)) {
            SV* err = sv_2mortal(newSVpvf("%s: SSL initiation failed: %s",
                err_prefix, self->ac->errstr[0] ? self->ac->errstr : "unknown error"));
            redisAsyncFree(self->ac);
            self->ac = NULL;
            emit_error(self, err);
            return REDIS_ERR;
        }
    }
#endif

    if (self->keepalive > 0) {
        redisEnableKeepAliveWithInterval(&self->ac->c, self->keepalive);
    }
    if (self->tcp_user_timeout > 0) {
        redisSetTcpUserTimeout(&self->ac->c, self->tcp_user_timeout);
    }

    if (REDIS_OK != redisLibevAttach(self->loop, self->ac)) {
        SV* err = sv_2mortal(newSVpvf("%s: cannot attach libev", err_prefix));
        redisAsyncFree(self->ac);
        self->ac = NULL;
        emit_error(self, err);
        return REDIS_ERR;
    }

    if (self->priority != 0) {
        redisLibevSetPriority(self->ac, self->priority);
    }

    redisAsyncSetConnectCallbackNC(self->ac, EV__redis_connect_cb);
    redisAsyncSetDisconnectCallback(self->ac, (redisDisconnectCallback*)EV__redis_disconnect_cb);
    if (NULL != self->push_handler) {
        redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
    }

    if (self->ac->err) {
        SV* err = sv_2mortal(newSVpvf("%s: %s", err_prefix, self->ac->errstr));
        redisAsyncFree(self->ac);
        self->ac = NULL;
        emit_error(self, err);
        return REDIS_ERR;
    }

    return REDIS_OK;
}

static SV* EV__redis_decode_reply(redisReply* reply) {
    SV* res;

    switch (reply->type) {
        case REDIS_REPLY_STRING:
        case REDIS_REPLY_ERROR:
        case REDIS_REPLY_STATUS:
        case REDIS_REPLY_BIGNUM:
        case REDIS_REPLY_VERB:
            res = newSVpvn(reply->str, reply->len);
            break;

        case REDIS_REPLY_INTEGER:
            res = newSViv(reply->integer);
            break;

        case REDIS_REPLY_DOUBLE:
            res = newSVnv(reply->dval);
            break;

        case REDIS_REPLY_BOOL:
            res = newSViv(reply->integer ? 1 : 0);
            break;

        case REDIS_REPLY_NIL:
            res = newSV(0);

src/EV__Redis.xs  view on Meta::CPAN

        case REDIS_REPLY_MAP:
        case REDIS_REPLY_SET:
        case REDIS_REPLY_ATTR:
        case REDIS_REPLY_PUSH: {
            AV* av = newAV();
            size_t i;
            if (reply->elements > 0) {
                av_extend(av, (SSize_t)(reply->elements - 1));
                for (i = 0; i < reply->elements; i++) {
                    if (NULL != reply->element[i]) {
                        av_push(av, EV__redis_decode_reply(reply->element[i]));
                    }
                    else {
                        av_push(av, newSV(0));
                    }
                }
            }
            res = newRV_noinc((SV*)av);
            break;
        }

        default:
            /* Unknown type, return undef */
            res = newSV(0);
            break;
    }

    return res;
}

static void EV__redis_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
    EV__Redis self = (EV__Redis)c->data;
    ev_redis_cb_t* cbt;
    SV* sv_reply;
    SV* sv_err;

    cbt = (ev_redis_cb_t*)privdata;

    if (cbt->skipped) {
        if (!cbt->persist || NULL == reply) {
            /* Multi-channel persistent: hiredis fires once per channel with
             * same cbt. Decrement sub_count, free only on last call. */
            if (cbt->persist && NULL == reply && cbt->sub_count > 1) {
                cbt->sub_count--;
                return;
            }
            Safefree(cbt);
        }
        else if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
            cbt->sub_count--;
            if (cbt->sub_count <= 0) {
                Safefree(cbt);
            }
        }
        return;
    }

    /* self is NULL when DESTROY nulled ac->data (deferred free inside
     * REDIS_IN_CALLBACK) or during PL_dirty. Still invoke the callback
     * with a disconnect error so users can clean up resources. The hiredis
     * context (c) is still alive here — safe to read c->errstr.
     * cb may be NULL during PL_dirty where we pre-null it.
     * For persistent commands (multi-channel subscribe), hiredis fires
     * reply_cb once per channel with the same cbt. Invoke the callback
     * only once (null cb after), use sub_count to track when to free. */
    if (self == NULL) {
        if (NULL != cbt->cb) {
            invoke_callback_error(cbt->cb,
                sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
            SvREFCNT_dec(cbt->cb);
            cbt->cb = NULL;
        }
        if (cbt->persist && reply == NULL && cbt->sub_count > 1) {
            cbt->sub_count--;
            return;
        }
        Safefree(cbt);
        return;
    }

    /* If self is marked as freed (during DESTROY), we still invoke the
     * callback with an error, but skip any self->field access afterward.
     * For persistent commands, don't free cbt here — leave it in the queue
     * for remove_cb_queue_sv to clean up after redisAsyncFree returns. */
    if (self->magic == EV_REDIS_FREED) {
        if (NULL != cbt->cb) {
            self->callback_depth++;
            invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
            self->callback_depth--;
            SvREFCNT_dec(cbt->cb);
            cbt->cb = NULL;
        }
        if (!cbt->persist) {
            ngx_queue_remove(&cbt->queue);
            Safefree(cbt);
        }
        check_destroyed(self);
        return;
    }

    /* Unknown magic - memory corruption, skip.
     * Don't touch queue pointers (self's memory may be garbage).
     * Always decrement refcount since callback will never be invoked again. */
    if (self->magic != EV_REDIS_MAGIC) {
        if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
        Safefree(cbt);
        return;
    }

    self->current_cb = cbt;
    self->callback_depth++;

    if (NULL != cbt->cb) {
        if (NULL == reply) {
            sv_err = sv_2mortal(newSVpv(
                c->errstr[0] ? c->errstr : "disconnected", 0));
            invoke_callback_error(cbt->cb, sv_err);
        }
        else {
            dSP;

            ENTER;
            SAVETMPS;

            PUSHMARK(SP);
            EXTEND(SP, 2);
            sv_reply = sv_2mortal(EV__redis_decode_reply((redisReply*)reply));
            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
                PUSHs(&PL_sv_undef);
                PUSHs(sv_reply);
            }
            else {
                PUSHs(sv_reply);
            }
            PUTBACK;

            call_sv(cbt->cb, G_DISCARD | G_EVAL);
            if (SvTRUE(ERRSV)) {
                warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
            }

            FREETMPS;
            LEAVE;
        }
    }

    self->callback_depth--;
    self->current_cb = NULL;

    /* If DESTROY was called during our callback (e.g., user undef'd $redis),
     * self->magic is EV_REDIS_FREED but self is still valid (DESTROY defers
     * Safefree when callback_depth > 0). Complete cleanup here.
     * For persistent commands (multi-channel subscribe), hiredis will fire
     * reply_cb again for remaining channels via __redisAsyncFree. Null the
     * callback to prevent double invocation, but leave cbt alive so those
     * later calls see it and can track sub_count for proper cleanup. */
    if (self->magic == EV_REDIS_FREED) {
        if (NULL != cbt->cb) {
            SvREFCNT_dec(cbt->cb);
            cbt->cb = NULL;
        }
        if (!cbt->persist) {
            Safefree(cbt);
        }
        check_destroyed(self);
        return;
    }

    if (cbt->skipped) {
        /* Defensive check: handles edge case where callback is marked skipped
         * during its own execution (e.g., via reentrant event loop where a
         * nested callback overwrites current_cb, allowing skip_pending to
         * process this callback). ngx_queue_remove is safe here due to
         * ngx_queue_init in skip_pending. Don't decrement pending_count since
         * skip_pending already did when it set skipped=1. */
        ngx_queue_remove(&cbt->queue);
        /* For persistent commands (e.g., SUBSCRIBE), hiredis fires reply_cb
         * once per subscribed channel during disconnect. Only free cbt on the
         * last channel to prevent use-after-free. */
        if (cbt->persist && cbt->sub_count > 1) {
            cbt->sub_count--;
            return;
        }
        Safefree(cbt);
        self->callback_depth++;
        send_next_waiting(self);
        self->callback_depth--;
        check_destroyed(self);
        return;
    }

    /* Detect end of persistent subscription: when all channels from a
     * SUBSCRIBE command have been unsubscribed, hiredis removes its internal
     * callback entry. Clean up our cbt to prevent orphaned queue entries. */
    if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
        cbt->sub_count--;
        if (cbt->sub_count <= 0) {
            /* All channels unsubscribed — persistent commands are not counted
             * in pending_count, so don't decrement it. */
            ngx_queue_remove(&cbt->queue);
            self->callback_depth++;
            if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
            Safefree(cbt);
            self->callback_depth--;
            check_destroyed(self);
            return;
        }
    }

    /* Connection teardown with active subscription: hiredis fires reply_cb
     * once per subscribed channel (from dict iteration in __redisAsyncFree).
     * Track sub_count and remove from queue on last channel to prevent
     * disconnect_cb's remove_cb_queue_sv from invoking the callback again. */
    if (cbt->persist && NULL == reply) {
        if (cbt->sub_count > 1) {

src/EV__Redis.xs  view on Meta::CPAN

    redisOptions opts;

    if (NULL != self->ac) {
        croak("already connected");
    }

    self->intentional_disconnect = 0;
    self->reconnect_attempts = 0;
    clear_connection_params(self);
    self->host = savepv(hostname);
    self->port = port;

    memset(&opts, 0, sizeof(opts));
    pre_connect_common(self, &opts);
    REDIS_OPTIONS_SET_TCP(&opts, hostname, port);
    self->ac = redisAsyncConnectWithOptions(&opts);
    if (NULL == self->ac) {
        croak("cannot allocate memory");
    }

    (void)post_connect_setup(self, "connect error");
}

void
connect_unix(EV::Redis self, const char* path);
CODE:
{
    redisOptions opts;

    if (NULL != self->ac) {
        croak("already connected");
    }

    self->intentional_disconnect = 0;
    self->reconnect_attempts = 0;
    clear_connection_params(self);
    self->path = savepv(path);

    memset(&opts, 0, sizeof(opts));
    pre_connect_common(self, &opts);
    REDIS_OPTIONS_SET_UNIX(&opts, path);
    self->ac = redisAsyncConnectWithOptions(&opts);
    if (NULL == self->ac) {
        croak("cannot allocate memory");
    }

    (void)post_connect_setup(self, "connect error");
}

void
disconnect(EV::Redis self);
CODE:
{
    /* Stop any pending reconnect timer on explicit disconnect */
    self->intentional_disconnect = 1;
    stop_reconnect_timer(self);
    self->reconnect_attempts = 0;

    if (NULL == self->ac) {
        /* Already disconnected — still stop waiting timer and clear
         * wait queue (e.g., resume_waiting_on_reconnect kept them alive
         * after a connection drop, but user now explicitly disconnects). */
        stop_waiting_timer(self);
        if (!ngx_queue_empty(&self->wait_queue)) {
            self->callback_depth++;
            clear_wait_queue_sv(self, err_disconnected);
            self->callback_depth--;
            check_destroyed(self);
        }
        return;
    }
    /* Save ac pointer for deferred disconnect: when inside a hiredis
     * callback, redisAsyncDisconnect only sets REDIS_DISCONNECTING and
     * returns. DESTROY needs ac_saved to NULL ac->data if the Perl object
     * is freed before the deferred disconnect completes.
     * Only set when REDIS_IN_CALLBACK: in the synchronous path,
     * disconnect_cb fires during redisAsyncDisconnect and clears ac_saved;
     * but if DESTROY fires nested during that processing (SvREFCNT_dec
     * dropping last ref), it would NULL ac->data, causing disconnect_cb to
     * skip cleanup, leaving ac_saved dangling after ac is freed. */
    if (self->ac->c.flags & REDIS_IN_CALLBACK) {
        self->ac_saved = self->ac;
    }
    /* Protect against Safefree(self) if disconnect_cb fires synchronously
     * and user's on_disconnect handler drops the last Perl reference. */
    self->callback_depth++;
    redisAsyncDisconnect(self->ac);
    self->ac = NULL;
    self->callback_depth--;
    if (check_destroyed(self)) return;
}

int
is_connected(EV::Redis self);
CODE:
{
    RETVAL = (NULL != self->ac) ? 1 : 0;
}
OUTPUT:
    RETVAL

SV*
connect_timeout(EV::Redis self, SV* timeout_ms = NULL);
CODE:
{
    RETVAL = timeout_accessor(&self->connect_timeout, timeout_ms, "connect_timeout");
}
OUTPUT:
    RETVAL

SV*
command_timeout(EV::Redis self, SV* timeout_ms = NULL);
CODE:
{
    RETVAL = timeout_accessor(&self->command_timeout, timeout_ms, "command_timeout");
    /* Apply to active connection immediately */
    if (NULL != timeout_ms && SvOK(timeout_ms) && NULL != self->ac && NULL != self->command_timeout) {
        redisAsyncSetTimeout(self->ac, *self->command_timeout);
    }
}
OUTPUT:

src/EV__Redis.xs  view on Meta::CPAN

         * callback_depth protects against DESTROY if a failed command's
         * error callback drops the last Perl reference to self. */
        self->callback_depth++;
        send_next_waiting(self);
        self->callback_depth--;
        if (check_destroyed(self)) XSRETURN_IV(0);
    }
    RETVAL = self->max_pending;
}
OUTPUT:
    RETVAL

SV*
waiting_timeout(EV::Redis self, SV* timeout_ms = NULL);
CODE:
{
    if (NULL != timeout_ms && SvOK(timeout_ms)) {
        IV ms = SvIV(timeout_ms);
        validate_timeout_ms(ms, "waiting_timeout");
        self->waiting_timeout_ms = (int)ms;
        schedule_waiting_timer(self);
    }

    RETVAL = newSViv((IV)self->waiting_timeout_ms);
}
OUTPUT:
    RETVAL

int
resume_waiting_on_reconnect(EV::Redis self, SV* value = NULL);
CODE:
{
    if (NULL != value && SvOK(value)) {
        self->resume_waiting_on_reconnect = SvTRUE(value) ? 1 : 0;
    }
    RETVAL = self->resume_waiting_on_reconnect;
}
OUTPUT:
    RETVAL

int
priority(EV::Redis self, SV* value = NULL);
CODE:
{
    if (NULL != value && SvOK(value)) {
        int prio = SvIV(value);
        /* Clamp priority to libev valid range: EV_MINPRI (-2) to EV_MAXPRI (+2) */
        if (prio < -2) prio = -2;
        if (prio > 2) prio = 2;
        self->priority = prio;
        if (NULL != self->ac) {
            redisLibevSetPriority(self->ac, prio);
        }
    }
    RETVAL = self->priority;
}
OUTPUT:
    RETVAL

int
keepalive(EV::Redis self, SV* value = NULL);
CODE:
{
    if (NULL != value && SvOK(value)) {
        int interval = SvIV(value);
        if (interval < 0) croak("keepalive interval must be non-negative");
        if (interval > MAX_TIMEOUT_MS / 1000) croak("keepalive interval too large");
        self->keepalive = interval;
        if (NULL != self->ac && interval > 0) {
            redisEnableKeepAliveWithInterval(&self->ac->c, interval);
        }
    }
    RETVAL = self->keepalive;
}
OUTPUT:
    RETVAL

int
prefer_ipv4(EV::Redis self, SV* value = NULL);
CODE:
{
    if (NULL != value && SvOK(value)) {
        self->prefer_ipv4 = SvTRUE(value) ? 1 : 0;
        if (self->prefer_ipv4) self->prefer_ipv6 = 0;
    }
    RETVAL = self->prefer_ipv4;
}
OUTPUT:
    RETVAL

int
prefer_ipv6(EV::Redis self, SV* value = NULL);
CODE:
{
    if (NULL != value && SvOK(value)) {
        self->prefer_ipv6 = SvTRUE(value) ? 1 : 0;
        if (self->prefer_ipv6) self->prefer_ipv4 = 0;
    }
    RETVAL = self->prefer_ipv6;
}
OUTPUT:
    RETVAL

SV*
source_addr(EV::Redis self, SV* value = NULL);
CODE:
{
    if (items > 1) {
        if (NULL != self->source_addr) {
            Safefree(self->source_addr);
            self->source_addr = NULL;
        }
        if (NULL != value && SvOK(value)) {
            self->source_addr = savepv(SvPV_nolen(value));
        }
    }
    if (NULL != self->source_addr) {
        RETVAL = newSVpv(self->source_addr, 0);
    } else {
        RETVAL = &PL_sv_undef;
    }
}
OUTPUT:
    RETVAL

unsigned int
tcp_user_timeout(EV::Redis self, SV* value = NULL);
CODE:
{
    if (NULL != value && SvOK(value)) {
        IV ms = SvIV(value);
        validate_timeout_ms(ms, "tcp_user_timeout");
        self->tcp_user_timeout = (unsigned int)ms;



( run in 0.609 second using v1.01-cache-2.11-cpan-39bf76dae61 )