EV-Redis

 view release on metacpan or  search on metacpan

src/EV__Redis.xs  view on Meta::CPAN

    self->callback_depth--;
    check_destroyed(self);
}

static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr) {
    EV__Redis self = (EV__Redis)ac->data;
    redisReply* reply = (redisReply*)reply_ptr;

    if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    SV* handler = self->push_handler;
    if (NULL == handler || NULL == reply) return;
    /* pin: the callback may clear its own handler ($r->on_push(undef)) */
    SvREFCNT_inc_simple_void_NN(handler);

    self->callback_depth++;

    {
        dSP;

        ENTER;
        SAVETMPS;

        PUSHMARK(SP);
        XPUSHs(sv_2mortal(EV__redis_decode_reply(reply)));
        PUTBACK;

        call_sv(handler, G_DISCARD | G_EVAL);
        if (SvTRUE(ERRSV)) {
            warn("EV::Redis: exception in push handler: %s", SvPV_nolen(ERRSV));
        }

        FREETMPS;
        LEAVE;
    }

    SvREFCNT_dec(handler);
    self->callback_depth--;
    check_destroyed(self);
}

static void pre_connect_common(EV__Redis self, redisOptions* opts) {
    if (NULL != self->connect_timeout) {
        opts->connect_timeout = self->connect_timeout;
    }
    if (NULL != self->command_timeout) {
        opts->command_timeout = self->command_timeout;
    }
    if (self->prefer_ipv4) {
        opts->options |= REDIS_OPT_PREFER_IPV4;
    }
    else if (self->prefer_ipv6) {
        opts->options |= REDIS_OPT_PREFER_IPV6;
    }
    if (self->cloexec) {
        opts->options |= REDIS_OPT_SET_SOCK_CLOEXEC;
    }
    if (self->reuseaddr) {
        opts->options |= REDIS_OPT_REUSEADDR;
    }
    if (NULL != self->source_addr && NULL == self->path) {
        opts->endpoint.tcp.source_addr = self->source_addr;
    }
}

/* Set up a newly allocated redisAsyncContext: SSL, keepalive, libev, callbacks.
 * On failure: frees ac, nulls self->ac, emits error with err_prefix. */
static int post_connect_setup(EV__Redis self, const char* err_prefix) {
    self->ac_saved = NULL;
    self->ac->data = (void*)self;

#ifdef EV_REDIS_SSL
    if (NULL != self->ssl_ctx) {
        if (REDIS_OK != redisInitiateSSLWithContext(&self->ac->c, self->ssl_ctx)) {
            SV* err = sv_2mortal(newSVpvf("%s: SSL initiation failed: %s",
                err_prefix, self->ac->errstr[0] ? self->ac->errstr : "unknown error"));
            redisAsyncFree(self->ac);
            self->ac = NULL;
            emit_error(self, err);
            return REDIS_ERR;
        }
    }
#endif

    if (self->keepalive > 0) {
        redisEnableKeepAliveWithInterval(&self->ac->c, self->keepalive);
    }
    if (self->tcp_user_timeout > 0) {
        redisSetTcpUserTimeout(&self->ac->c, self->tcp_user_timeout);
    }

    if (REDIS_OK != redisLibevAttach(self->loop, self->ac)) {
        SV* err = sv_2mortal(newSVpvf("%s: cannot attach libev", err_prefix));
        redisAsyncFree(self->ac);
        self->ac = NULL;
        emit_error(self, err);
        return REDIS_ERR;
    }

    if (self->priority != 0) {
        redisLibevSetPriority(self->ac, self->priority);
    }

    redisAsyncSetConnectCallbackNC(self->ac, EV__redis_connect_cb);
    redisAsyncSetDisconnectCallback(self->ac, EV__redis_disconnect_cb);
    if (NULL != self->push_handler) {
        redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
    }

    if (self->ac->err) {
        SV* err = sv_2mortal(newSVpvf("%s: %s", err_prefix, self->ac->errstr));
        redisAsyncFree(self->ac);
        self->ac = NULL;
        emit_error(self, err);
        return REDIS_ERR;
    }

    return REDIS_OK;
}

static SV* decode_reply_depth(redisReply* reply, int depth) {
    SV* res;



( run in 0.647 second using v1.01-cache-2.11-cpan-524268b4103 )