EV-Hiredis
view release on metacpan or search on metacpan
src/EV__Hiredis.xs view on Meta::CPAN
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
#include "EVAPI.h"
#include "hiredis.h"
#include "async.h"
#include "libev_adapter.h"
#include "ngx-queue.h"
typedef struct ev_hiredis_s ev_hiredis_t;
typedef struct ev_hiredis_cb_s ev_hiredis_cb_t;
typedef ev_hiredis_t* EV__Hiredis;
typedef struct ev_loop* EV__Loop;
struct ev_hiredis_s {
struct ev_loop* loop;
redisAsyncContext* ac;
SV* error_handler;
SV* connect_handler;
struct timeval* connect_timeout;
struct timeval* command_timeout;
ngx_queue_t cb_queue; /* for long term callbacks such as subscribe */
};
struct ev_hiredis_cb_s {
SV* cb;
ngx_queue_t queue;
int persist;
};
static void emit_error(EV__Hiredis self, SV* error) {
if (NULL == self->error_handler) return;
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(error);
PUTBACK;
call_sv(self->error_handler, G_DISCARD);
FREETMPS;
LEAVE;
}
static void emit_error_str(EV__Hiredis self, char* error) {
if (NULL == self->error_handler) return;
emit_error(self, sv_2mortal(newSVpv(error, 0)));
}
static void remove_cb_queue(EV__Hiredis self) {
ngx_queue_t* q;
ev_hiredis_cb_t* cbt;
while (!ngx_queue_empty(&self->cb_queue)) {
q = ngx_queue_last(&self->cb_queue);
cbt = ngx_queue_data(q, ev_hiredis_cb_t, queue);
ngx_queue_remove(q);
SvREFCNT_dec(cbt->cb);
Safefree(cbt);
}
}
static void EV__hiredis_connect_cb(redisAsyncContext* c, int status) {
EV__Hiredis self = (EV__Hiredis)c->data;
if (REDIS_OK != status) {
self->ac = NULL;
emit_error_str(self, c->errstr);
}
else {
if (NULL == self->connect_handler) return;
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
PUTBACK;
call_sv(self->connect_handler, G_DISCARD);
FREETMPS;
LEAVE;
}
}
static void EV__hiredis_disconnect_cb(redisAsyncContext* c, int status) {
EV__Hiredis self = (EV__Hiredis)c->data;
SV* sv_error;
if (REDIS_OK == status) {
self->ac = NULL;
}
else {
sv_error = sv_2mortal(newSVpv(c->errstr, 0));
self->ac = NULL;
emit_error(self, sv_error);
}
remove_cb_queue(self);
}
static void pre_connect_common(EV__Hiredis self, redisOptions* opts) {
if (NULL != self->connect_timeout) {
opts->connect_timeout = self->connect_timeout;
}
if (NULL != self->command_timeout) {
opts->command_timeout = self->command_timeout;
}
}
static void connect_common(EV__Hiredis self) {
int r;
SV* sv_error = NULL;
self->ac->data = (void*)self;
r = redisLibevAttach(self->loop, self->ac);
if (REDIS_OK != r) {
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error_str(self, "connect error: cannot attach libev");
return;
}
redisAsyncSetConnectCallback(self->ac, (redisConnectCallback*)EV__hiredis_connect_cb);
redisAsyncSetDisconnectCallback(self->ac, (redisDisconnectCallback*)EV__hiredis_disconnect_cb);
if (self->ac->err) {
sv_error = sv_2mortal(newSVpvf("connect error: %s", self->ac->errstr));
redisAsyncFree(self->ac);
self->ac = NULL;
emit_error(self, sv_error);
return;
}
}
static SV* EV__hiredis_decode_reply(redisReply* reply) {
SV* res = NULL;
switch (reply->type) {
case REDIS_REPLY_STRING:
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
res = newSVpvn(reply->str, reply->len);
break;
case REDIS_REPLY_INTEGER:
res = newSViv(reply->integer);
break;
case REDIS_REPLY_NIL:
res = newSV(0);
break;
case REDIS_REPLY_ARRAY: {
AV* av = newAV();
av_extend(av, (SSize_t)reply->elements);
size_t i;
for (i = 0; i < reply->elements; i++) {
av_push(av, EV__hiredis_decode_reply(reply->element[i]));
}
res = newRV_noinc((SV*)av);
break;
}
}
return res;
}
src/EV__Hiredis.xs view on Meta::CPAN
PUSHs(sv_reply);
}
else {
PUSHs(sv_reply);
}
PUTBACK;
call_sv(cbt->cb, G_DISCARD);
FREETMPS;
LEAVE;
}
if (0 == cbt->persist) {
SvREFCNT_dec(cbt->cb);
ngx_queue_remove(&cbt->queue);
Safefree(cbt);
}
}
MODULE = EV::Hiredis PACKAGE = EV::Hiredis
BOOT:
{
I_EV_API("EV::Hiredis");
}
EV::Hiredis
_new(char* class, EV::Loop loop);
CODE:
{
PERL_UNUSED_VAR(class);
Newxz(RETVAL, 1, ev_hiredis_t);
ngx_queue_init(&RETVAL->cb_queue);
RETVAL->loop = loop;
}
OUTPUT:
RETVAL
void
DESTROY(EV::Hiredis self);
CODE:
{
self->loop = NULL;
if (NULL != self->ac) {
redisAsyncFree(self->ac);
self->ac = NULL;
}
if (NULL != self->error_handler) {
SvREFCNT_dec(self->error_handler);
self->error_handler = NULL;
}
if (NULL != self->connect_handler) {
SvREFCNT_dec(self->connect_handler);
self->connect_handler = NULL;
}
if (NULL != self->connect_timeout) {
Safefree(self->connect_timeout);
self->connect_timeout = NULL;
}
if (NULL != self->command_timeout) {
Safefree(self->command_timeout);
self->command_timeout = NULL;
}
remove_cb_queue(self);
Safefree(self);
}
void
connect(EV::Hiredis self, char* hostname, int port = 6379);
CODE:
{
if (NULL != self->ac) {
croak("already connected");
return;
}
redisOptions opts = {0};
pre_connect_common(self, &opts);
REDIS_OPTIONS_SET_TCP(&opts, hostname, port);
self->ac = redisAsyncConnectWithOptions(&opts);
if (NULL == self->ac) {
croak("cannot allocate memory");
return;
}
connect_common(self);
}
void
connect_unix(EV::Hiredis self, const char* path);
CODE:
{
if (NULL != self->ac) {
croak("already connected");
return;
}
redisOptions opts = {0};
pre_connect_common(self, &opts);
REDIS_OPTIONS_SET_UNIX(&opts, path);
self->ac = redisAsyncConnectWithOptions(&opts);
if (NULL == self->ac) {
croak("cannot allocate memory");
return;
}
connect_common(self);
}
void
disconnect(EV::Hiredis self);
CODE:
{
if (NULL == self->ac) {
emit_error_str(self, "not connected");
return;
}
redisAsyncDisconnect(self->ac);
}
void
connect_timeout(EV::Hiredis self, int timeout_ms);
CODE:
{
if (NULL == self->connect_timeout) {
Newx(self->connect_timeout, 1, struct timeval);
}
self->connect_timeout->tv_sec = timeout_ms / 1000;
self->connect_timeout->tv_usec = (timeout_ms % 1000) * 1000;
}
void
command_timeout(EV::Hiredis self, int timeout_ms);
CODE:
{
if (NULL == self->command_timeout) {
Newx(self->command_timeout, 1, struct timeval);
}
self->command_timeout->tv_sec = timeout_ms / 1000;
self->command_timeout->tv_usec = (timeout_ms % 1000) * 1000;
}
CV*
on_error(EV::Hiredis self, CV* handler = NULL);
CODE:
{
if (NULL != self->error_handler) {
SvREFCNT_dec(self->error_handler);
self->error_handler = NULL;
}
if (NULL != handler) {
self->error_handler = SvREFCNT_inc(handler);
}
RETVAL = (CV*)self->error_handler;
}
OUTPUT:
RETVAL
void
on_connect(EV::Hiredis self, CV* handler = NULL);
CODE:
{
if (NULL != handler) {
if (NULL != self->connect_handler) {
SvREFCNT_dec(self->connect_handler);
self->connect_handler = NULL;
}
self->connect_handler = SvREFCNT_inc(handler);
}
if (self->connect_handler) {
ST(0) = self->connect_handler;
XSRETURN(1);
}
else {
XSRETURN(0);
}
}
int
command(EV::Hiredis self, ...);
PREINIT:
SV* cb;
char** argv;
size_t* argvlen;
STRLEN len;
int argc, i;
ev_hiredis_cb_t* cbt;
CODE:
{
if (items <= 2) {
croak("Usage: command(\"command\", ..., $callback)");
}
cb = ST(items - 1);
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) {
croak("last arguments should be CODE reference");
( run in 1.709 second using v1.01-cache-2.11-cpan-39bf76dae61 )