Redis-Cluster-Fast
view release on metacpan or search on metacpan
src/Fast.xs view on Meta::CPAN
#ifdef __cplusplus
} /* extern "C" */
#endif
#define NEED_newRV_noinc
#define NEED_my_strlcpy
#include "ppport.h"
#define ONE_SECOND_TO_MICRO 1000000
#define NANO_SECOND_TO_MICRO 1000
#define MIN_ATTEMPT_TO_GET_RESULT 2
/* libevent adapter priority configuration
Uses 2 priority levels to ensure I/O events are processed before timeouts:
- Priority 0: I/O events (Redis responses) - highest priority
- Priority 1: Timer events (timeouts) - lower priority
EVENT_BASE_PRIORITY_NUMBER sets the total priorities for event_base_priority_init() */
#define EVENT_BASE_PRIORITY_NUMBER 2
#define DEBUG_MSG(fmt, ...) \
if (self->debug) { \
fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \
fprintf(stderr, fmt, __VA_ARGS__); \
fprintf(stderr, "\n"); \
}
#define DEBUG_EVENT_BASE() \
if (self->debug) { \
event_base_dump_events(self->cluster_event_base, stderr); \
}
typedef struct redis_cluster_fast_reply_s {
SV *result;
SV *error;
} redis_cluster_fast_reply_t;
typedef struct cmd_reply_context_s {
void *self;
SV *result;
SV *error;
int done;
} cmd_reply_context_t;
typedef struct cmd_reply_context_pipeline_s {
void *self;
SV *result;
SV *error;
SV *cb;
} cmd_reply_context_pipeline_t;
typedef struct redis_cluster_fast_s {
redisClusterAsyncContext *acc;
struct event_base *cluster_event_base;
char *hostnames;
int debug;
int max_retry;
int use_cluster_slots;
int event_ready;
struct timeval connect_timeout;
struct timeval command_timeout;
int64_t discovery_timeout_usec;
pid_t pid;
int64_t pipeline_callback_remain;
} redis_cluster_fast_t, *Redis__Cluster__Fast;
int64_t get_usec_timestamp(void) {
struct timespec ts;
int status;
status = clock_gettime(CLOCK_MONOTONIC, &ts);
if (status < 0) {
return -1;
}
return (int64_t) ts.tv_sec * ONE_SECOND_TO_MICRO + (int64_t) (ts.tv_nsec / NANO_SECOND_TO_MICRO);
}
static redis_cluster_fast_reply_t
Redis__Cluster__Fast_decode_reply(pTHX_ Redis__Cluster__Fast self, redisReply *reply) {
redis_cluster_fast_reply_t res = {NULL, NULL};
switch (reply->type) {
case REDIS_REPLY_ERROR:
res.error = newSVpvn(reply->str, reply->len);
break;
case REDIS_REPLY_BIGNUM:
case REDIS_REPLY_DOUBLE:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB:
res.result = newSVpvn(reply->str, reply->len);
break;
case REDIS_REPLY_INTEGER:
case REDIS_REPLY_BOOL:
res.result = newSViv(reply->integer);
break;
case REDIS_REPLY_NIL:
res.result = &PL_sv_undef;
break;
case REDIS_REPLY_MAP:
case REDIS_REPLY_SET:
case REDIS_REPLY_ATTR: {
size_t i;
char *key;
HV *hv = newHV();
res.result = newRV_noinc((SV *) hv);
for (i = 0; i < reply->elements; i++) {
if (i % 2 == 0) {
key = reply->element[i]->str;
} else {
redis_cluster_fast_reply_t elem = {NULL, NULL};
elem = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply->element[i]);
if (elem.result) {
hv_store(hv, key, strlen(key), SvREFCNT_inc(elem.result), 0);
} else {
hv_store(hv, key, strlen(key), newSV(0), 0);
}
src/Fast.xs view on Meta::CPAN
reply = (redisReply *) r;
if (reply) {
redis_cluster_fast_reply_t res;
res = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply);
reply_pipeline_t->result = res.result;
reply_pipeline_t->error = res.error;
} else {
DEBUG_MSG("error: err=%d errstr=%s", cc->err, cc->errstr);
reply_pipeline_t->error = newSVpvf("%s", cc->errstr);
}
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, 2);
PUSHs(reply_pipeline_t->result ? sv_2mortal(reply_pipeline_t->result) : &PL_sv_undef);
PUSHs(reply_pipeline_t->error ? sv_2mortal(reply_pipeline_t->error) : &PL_sv_undef);
PUTBACK;
call_sv(reply_pipeline_t->cb, G_DISCARD);
FREETMPS;
LEAVE;
}
SvREFCNT_dec(reply_pipeline_t->cb);
Safefree(reply_pipeline_t);
self->pipeline_callback_remain--;
DEBUG_MSG("pipeline callback remain: %ld", self->pipeline_callback_remain);
}
void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
Redis__Cluster__Fast self = (Redis__Cluster__Fast) privdata;
DEBUG_MSG("event: %d", event);
if (event == HIRCLUSTER_EVENT_READY) {
self->event_ready = 1;
}
}
SV *Redis__Cluster__Fast_connect(pTHX_ Redis__Cluster__Fast self) {
DEBUG_MSG("%s", "start connect");
if (self->cluster_event_base && self->acc) {
return newSVpvf("%s", "already connected");
}
self->pipeline_callback_remain = 0;
self->pid = getpid();
self->acc = redisClusterAsyncContextInit();
if (redisClusterSetOptionAddNodes(self->acc->cc, self->hostnames) != REDIS_OK) {
return newSVpvf("failed to add nodes: %s", self->acc->cc->errstr);
}
if (redisClusterSetOptionConnectTimeout(self->acc->cc, self->connect_timeout) != REDIS_OK) {
return newSVpvf("failed to set connect timeout: %s", self->acc->cc->errstr);
}
if (redisClusterSetOptionTimeout(self->acc->cc, self->command_timeout) != REDIS_OK) {
return newSVpvf("failed to set command timeout: %s", self->acc->cc->errstr);
}
if (redisClusterSetOptionMaxRetry(self->acc->cc, self->max_retry) != REDIS_OK) {
return newSVpvf("%s", "failed to set max retry");
}
if (self->use_cluster_slots) {
DEBUG_MSG("%s", "use cluster slots");
if (redisClusterSetOptionRouteUseSlots(self->acc->cc) != REDIS_OK) {
return newSVpvf("%s", "failed to set redisClusterSetOptionRouteUseSlots");
}
}
self->cluster_event_base = event_base_new();
if (event_base_priority_init(self->cluster_event_base, EVENT_BASE_PRIORITY_NUMBER) != 0) {
return newSVpvf("%s", "failed to initialize event base priorities");
}
if (redisClusterLibeventAttach(self->acc, self->cluster_event_base) != REDIS_OK) {
return newSVpvf("%s", "failed to attach event base");
}
self->event_ready = 0;
if (redisClusterSetEventCallback(self->acc->cc, eventCallback, self) != REDIS_OK) {
return newSVpvf("%s", "failed to set event callback");
}
if (redisClusterAsyncConnect2(self->acc) != REDIS_OK) {
return newSVpvf("failed to connect async: %s", self->acc->cc->errstr);
}
DEBUG_MSG("%s", "done connect");
return NULL;
}
SV *Redis__Cluster__Fast_disconnect(pTHX_ Redis__Cluster__Fast self) {
if (self->cluster_event_base == NULL && self->acc == NULL) {
return NULL;
}
if (event_reinit(self->cluster_event_base) != 0) {
return newSVpvf("%s", "event reinit failed");
}
redisClusterAsyncDisconnect(self->acc);
if (event_base_dispatch(self->cluster_event_base) == -1) {
return newSVpvf("%s", "event_base_dispatch failed after forking");
}
event_base_free(self->cluster_event_base);
self->cluster_event_base = NULL;
redisClusterAsyncFree(self->acc);
self->acc = NULL;
return NULL;
}
SV *Redis__Cluster__Fast_wait_until_event_ready(pTHX_ Redis__Cluster__Fast self) {
int event_loop_error;
int count = 0;
src/Fast.xs view on Meta::CPAN
} else {
run_cmd_impl(aTHX_ self, argc, argv, argvlen, reply_t);
}
}
MODULE = Redis::Cluster::Fast PACKAGE = Redis::Cluster::Fast
PROTOTYPES: DISABLE
Redis::Cluster::Fast
_new(char* cls);
PREINIT:
redis_cluster_fast_t* self;
CODE:
Newxz(self, sizeof(redis_cluster_fast_t), redis_cluster_fast_t);
RETVAL = self;
OUTPUT:
RETVAL
void
__srandom(char *cls, unsigned int seed_value)
CODE:
srandom(seed_value);
int
__set_debug(Redis::Cluster::Fast self, int val)
CODE:
DEBUG_MSG("%s", "DEBUG true");
RETVAL = self->debug = val;
OUTPUT:
RETVAL
void
__set_servers(Redis::Cluster::Fast self, char* hostnames)
CODE:
if (self->hostnames) {
Safefree(self->hostnames);
self->hostnames = NULL;
}
if (hostnames) {
Newx(self->hostnames, strlen(hostnames) + 1, char);
my_strlcpy(self->hostnames, hostnames, strlen(hostnames) + 1);
DEBUG_MSG("%s %s", "set hostnames", self->hostnames);
}
void
__set_connect_timeout(Redis::Cluster::Fast self, double double_sec)
PREINIT:
int second, micro_second;
struct timeval timeout;
CODE:
second = (int) (double_sec);
micro_second = (int) (fmod(double_sec * ONE_SECOND_TO_MICRO, ONE_SECOND_TO_MICRO) + 0.999);
timeout.tv_sec = second;
timeout.tv_usec = micro_second;
self->connect_timeout = timeout;
DEBUG_MSG("connect timeout %d, %d", second, micro_second);
void
__set_command_timeout(Redis::Cluster::Fast self, double double_sec)
PREINIT:
int second, micro_second;
struct timeval timeout;
CODE:
second = (int) (double_sec);
micro_second = (int) (fmod(double_sec * ONE_SECOND_TO_MICRO, ONE_SECOND_TO_MICRO) + 0.999);
timeout.tv_sec = second;
timeout.tv_usec = micro_second;
self->command_timeout = timeout;
DEBUG_MSG("command timeout %d, %d", second, micro_second);
void
__set_max_retry(Redis::Cluster::Fast self, int max_retry)
CODE:
self->max_retry = max_retry;
DEBUG_MSG("max_retry %d", max_retry);
void
__set_route_use_slots(Redis::Cluster::Fast self, int use_slot)
CODE:
self->use_cluster_slots = use_slot;
void
__set_cluster_discovery_retry_timeout(Redis::Cluster::Fast self, double double_sec)
CODE:
self->discovery_timeout_usec = (int64_t) (double_sec * ONE_SECOND_TO_MICRO);
DEBUG_MSG("discovery timeout %ld", self->discovery_timeout_usec);
SV*
__connect(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_connect(aTHX_ self);
if (RETVAL == NULL) {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
SV*
__disconnect(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_disconnect(aTHX_ self);
if (RETVAL == NULL) {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
SV*
__wait_until_event_ready(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_wait_until_event_ready(aTHX_ self);
if (RETVAL == NULL) {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
void
__std_cmd(Redis::Cluster::Fast self, ...)
PREINIT:
cmd_reply_context_t* result_context;
char** argv;
size_t* argvlen;
STRLEN len;
int argc, i;
SV* cb;
PPCODE:
if (!self->acc) {
( run in 1.193 second using v1.01-cache-2.11-cpan-677af5a14d3 )