EV-Redis
view release on metacpan or search on metacpan
src/EV__Redis.xs view on Meta::CPAN
self->callback_depth--;
check_destroyed(self);
}
static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr) {
EV__Redis self = (EV__Redis)ac->data;
redisReply* reply = (redisReply*)reply_ptr;
if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
SV* handler = self->push_handler;
if (NULL == handler || NULL == reply) return;
/* pin: the callback may clear its own handler ($r->on_push(undef)) */
SvREFCNT_inc_simple_void_NN(handler);
self->callback_depth++;
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(EV__redis_decode_reply(reply)));
PUTBACK;
call_sv(handler, G_DISCARD | G_EVAL);
if (SvTRUE(ERRSV)) {
warn("EV::Redis: exception in push handler: %s", SvPV_nolen(ERRSV));
}
FREETMPS;
LEAVE;
}
SvREFCNT_dec(handler);
self->callback_depth--;
check_destroyed(self);
}
static void pre_connect_common(EV__Redis self, redisOptions* opts) {
if (NULL != self->connect_timeout) {
opts->connect_timeout = self->connect_timeout;
}
if (NULL != self->command_timeout) {
opts->command_timeout = self->command_timeout;
}
if (self->prefer_ipv4) {
opts->options |= REDIS_OPT_PREFER_IPV4;
}
else if (self->prefer_ipv6) {
opts->options |= REDIS_OPT_PREFER_IPV6;
}
if (self->cloexec) {
opts->options |= REDIS_OPT_SET_SOCK_CLOEXEC;
}
if (self->reuseaddr) {
opts->options |= REDIS_OPT_REUSEADDR;
}
if (NULL != self->source_addr && NULL == self->path) {
opts->endpoint.tcp.source_addr = self->source_addr;
}
}
/* Set up a newly allocated redisAsyncContext: SSL, keepalive, libev, callbacks.
* On failure: frees ac, nulls self->ac, emits error with err_prefix. */
static int post_connect_setup(EV__Redis self, const char* err_prefix) {
self->ac_saved = NULL;
self->ac->data = (void*)self;
#ifdef EV_REDIS_SSL
if (NULL != self->ssl_ctx) {
if (REDIS_OK != redisInitiateSSLWithContext(&self->ac->c, self->ssl_ctx)) {
SV* err = sv_2mortal(newSVpvf("%s: SSL initiation failed: %s",
err_prefix, self->ac->errstr[0] ? self->ac->errstr : "unknown error"));
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error(self, err);
return REDIS_ERR;
}
}
#endif
if (self->keepalive > 0) {
redisEnableKeepAliveWithInterval(&self->ac->c, self->keepalive);
}
if (self->tcp_user_timeout > 0) {
redisSetTcpUserTimeout(&self->ac->c, self->tcp_user_timeout);
}
if (REDIS_OK != redisLibevAttach(self->loop, self->ac)) {
SV* err = sv_2mortal(newSVpvf("%s: cannot attach libev", err_prefix));
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error(self, err);
return REDIS_ERR;
}
if (self->priority != 0) {
redisLibevSetPriority(self->ac, self->priority);
}
redisAsyncSetConnectCallbackNC(self->ac, EV__redis_connect_cb);
redisAsyncSetDisconnectCallback(self->ac, EV__redis_disconnect_cb);
if (NULL != self->push_handler) {
redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
}
if (self->ac->err) {
SV* err = sv_2mortal(newSVpvf("%s: %s", err_prefix, self->ac->errstr));
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error(self, err);
return REDIS_ERR;
}
return REDIS_OK;
}
static SV* decode_reply_depth(redisReply* reply, int depth) {
SV* res;
( run in 0.647 second using v1.01-cache-2.11-cpan-524268b4103 )