Redis-Cluster-Fast

 view release on metacpan or  search on metacpan

src/Fast.xs  view on Meta::CPAN

#ifdef __cplusplus
} /* extern "C" */
#endif

#define NEED_newRV_noinc
#define NEED_my_strlcpy
#include "ppport.h"

#define ONE_SECOND_TO_MICRO 1000000
#define NANO_SECOND_TO_MICRO 1000

#define MIN_ATTEMPT_TO_GET_RESULT 2

/* libevent adapter priority configuration
   Uses 2 priority levels to ensure I/O events are processed before timeouts:
   - Priority 0: I/O events (Redis responses) - highest priority
   - Priority 1: Timer events (timeouts) - lower priority
   EVENT_BASE_PRIORITY_NUMBER sets the total priorities for event_base_priority_init() */
#define EVENT_BASE_PRIORITY_NUMBER 2

#define DEBUG_MSG(fmt, ...) \
    if (self->debug) {                                                  \
        fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__);  \
        fprintf(stderr, fmt, __VA_ARGS__);                              \
        fprintf(stderr, "\n");                                          \
    }

#define DEBUG_EVENT_BASE() \
    if (self->debug) {                                                  \
        event_base_dump_events(self->cluster_event_base, stderr);       \
    }

typedef struct redis_cluster_fast_reply_s {
    SV *result;
    SV *error;
} redis_cluster_fast_reply_t;

typedef struct cmd_reply_context_s {
    void *self;
    SV *result;
    SV *error;
    int done;
} cmd_reply_context_t;

typedef struct cmd_reply_context_pipeline_s {
    void *self;
    SV *result;
    SV *error;
    SV *cb;
} cmd_reply_context_pipeline_t;

typedef struct redis_cluster_fast_s {
    redisClusterAsyncContext *acc;
    struct event_base *cluster_event_base;
    char *hostnames;
    int debug;
    int max_retry;
    int use_cluster_slots;
    int event_ready;
    struct timeval connect_timeout;
    struct timeval command_timeout;
    int64_t discovery_timeout_usec;
    pid_t pid;
    int64_t pipeline_callback_remain;
} redis_cluster_fast_t, *Redis__Cluster__Fast;

int64_t get_usec_timestamp(void) {
    struct timespec ts;
    int status;
    status = clock_gettime(CLOCK_MONOTONIC, &ts);
    if (status < 0) {
        return -1;
    }
    return (int64_t) ts.tv_sec * ONE_SECOND_TO_MICRO + (int64_t) (ts.tv_nsec / NANO_SECOND_TO_MICRO);
}

static redis_cluster_fast_reply_t
Redis__Cluster__Fast_decode_reply(pTHX_ Redis__Cluster__Fast self, redisReply *reply) {
    redis_cluster_fast_reply_t res = {NULL, NULL};

    switch (reply->type) {
        case REDIS_REPLY_ERROR:
            res.error = newSVpvn(reply->str, reply->len);
            break;

        case REDIS_REPLY_BIGNUM:
        case REDIS_REPLY_DOUBLE:
        case REDIS_REPLY_STATUS:
        case REDIS_REPLY_STRING:
        case REDIS_REPLY_VERB:
            res.result = newSVpvn(reply->str, reply->len);
            break;

        case REDIS_REPLY_INTEGER:
        case REDIS_REPLY_BOOL:
            res.result = newSViv(reply->integer);
            break;
        case REDIS_REPLY_NIL:
            res.result = &PL_sv_undef;
            break;

        case REDIS_REPLY_MAP:
        case REDIS_REPLY_SET:
        case REDIS_REPLY_ATTR: {
            size_t i;
            char *key;
            HV *hv = newHV();

            res.result = newRV_noinc((SV *) hv);

            for (i = 0; i < reply->elements; i++) {
                if (i % 2 == 0) {
                    key = reply->element[i]->str;
                } else {
                    redis_cluster_fast_reply_t elem = {NULL, NULL};
                    elem = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply->element[i]);
                    if (elem.result) {
                        hv_store(hv, key, strlen(key), SvREFCNT_inc(elem.result), 0);
                    } else {
                        hv_store(hv, key, strlen(key), newSV(0), 0);
                    }

src/Fast.xs  view on Meta::CPAN

    reply = (redisReply *) r;
    if (reply) {
        redis_cluster_fast_reply_t res;
        res = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply);
        reply_pipeline_t->result = res.result;
        reply_pipeline_t->error = res.error;
    } else {
        DEBUG_MSG("error: err=%d errstr=%s", cc->err, cc->errstr);
        reply_pipeline_t->error = newSVpvf("%s", cc->errstr);
    }

    {
        dSP;

        ENTER;
        SAVETMPS;

        PUSHMARK(SP);
        EXTEND(SP, 2);
        PUSHs(reply_pipeline_t->result ? sv_2mortal(reply_pipeline_t->result) : &PL_sv_undef);
        PUSHs(reply_pipeline_t->error ? sv_2mortal(reply_pipeline_t->error) : &PL_sv_undef);
        PUTBACK;

        call_sv(reply_pipeline_t->cb, G_DISCARD);

        FREETMPS;
        LEAVE;
    }

    SvREFCNT_dec(reply_pipeline_t->cb);
    Safefree(reply_pipeline_t);

    self->pipeline_callback_remain--;
    DEBUG_MSG("pipeline callback remain: %ld", self->pipeline_callback_remain);
}

void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
    Redis__Cluster__Fast self = (Redis__Cluster__Fast) privdata;
    DEBUG_MSG("event: %d", event);
    if (event == HIRCLUSTER_EVENT_READY) {
        self->event_ready = 1;
    }
}

SV *Redis__Cluster__Fast_connect(pTHX_ Redis__Cluster__Fast self) {
    DEBUG_MSG("%s", "start connect");
    if (self->cluster_event_base && self->acc) {
        return newSVpvf("%s", "already connected");
    }

    self->pipeline_callback_remain = 0;
    self->pid = getpid();

    self->acc = redisClusterAsyncContextInit();
    if (redisClusterSetOptionAddNodes(self->acc->cc, self->hostnames) != REDIS_OK) {
        return newSVpvf("failed to add nodes: %s", self->acc->cc->errstr);
    }
    if (redisClusterSetOptionConnectTimeout(self->acc->cc, self->connect_timeout) != REDIS_OK) {
        return newSVpvf("failed to set connect timeout: %s", self->acc->cc->errstr);
    }
    if (redisClusterSetOptionTimeout(self->acc->cc, self->command_timeout) != REDIS_OK) {
        return newSVpvf("failed to set command timeout: %s", self->acc->cc->errstr);
    }
    if (redisClusterSetOptionMaxRetry(self->acc->cc, self->max_retry) != REDIS_OK) {
        return newSVpvf("%s", "failed to set max retry");
    }

    if (self->use_cluster_slots) {
        DEBUG_MSG("%s", "use cluster slots");
        if (redisClusterSetOptionRouteUseSlots(self->acc->cc) != REDIS_OK) {
            return newSVpvf("%s", "failed to set redisClusterSetOptionRouteUseSlots");
        }
    }

    self->cluster_event_base = event_base_new();

    if (event_base_priority_init(self->cluster_event_base, EVENT_BASE_PRIORITY_NUMBER) != 0) {
      return newSVpvf("%s", "failed to initialize event base priorities");
    }

    if (redisClusterLibeventAttach(self->acc, self->cluster_event_base) != REDIS_OK) {
        return newSVpvf("%s", "failed to attach event base");
    }

    self->event_ready = 0;
    if (redisClusterSetEventCallback(self->acc->cc, eventCallback, self) != REDIS_OK) {
        return newSVpvf("%s", "failed to set event callback");
    }

    if (redisClusterAsyncConnect2(self->acc) != REDIS_OK) {
        return newSVpvf("failed to connect async: %s", self->acc->cc->errstr);
    }

    DEBUG_MSG("%s", "done connect");
    return NULL;
}

SV *Redis__Cluster__Fast_disconnect(pTHX_ Redis__Cluster__Fast self) {
    if (self->cluster_event_base == NULL && self->acc == NULL) {
        return NULL;
    }

    if (event_reinit(self->cluster_event_base) != 0) {
        return newSVpvf("%s", "event reinit failed");
    }
    redisClusterAsyncDisconnect(self->acc);

    if (event_base_dispatch(self->cluster_event_base) == -1) {
        return newSVpvf("%s", "event_base_dispatch failed after forking");
    }
    event_base_free(self->cluster_event_base);
    self->cluster_event_base = NULL;

    redisClusterAsyncFree(self->acc);
    self->acc = NULL;
    return NULL;
}

SV *Redis__Cluster__Fast_wait_until_event_ready(pTHX_ Redis__Cluster__Fast self) {
    int event_loop_error;
    int count = 0;

src/Fast.xs  view on Meta::CPAN

    } else {
        run_cmd_impl(aTHX_ self, argc, argv, argvlen, reply_t);
    }
}

MODULE = Redis::Cluster::Fast    PACKAGE = Redis::Cluster::Fast

PROTOTYPES: DISABLE

Redis::Cluster::Fast
_new(char* cls);
PREINIT:
    redis_cluster_fast_t* self;
CODE:
    Newxz(self, sizeof(redis_cluster_fast_t), redis_cluster_fast_t);
    RETVAL = self;
OUTPUT:
    RETVAL

void
__srandom(char *cls, unsigned int seed_value)
CODE:
    srandom(seed_value);

int
__set_debug(Redis::Cluster::Fast self, int val)
CODE:
    DEBUG_MSG("%s", "DEBUG true");
    RETVAL = self->debug = val;
OUTPUT:
    RETVAL

void
__set_servers(Redis::Cluster::Fast self, char* hostnames)
CODE:
    if (self->hostnames) {
        Safefree(self->hostnames);
        self->hostnames = NULL;
    }

    if (hostnames) {
        Newx(self->hostnames, strlen(hostnames) + 1, char);
        my_strlcpy(self->hostnames, hostnames, strlen(hostnames) + 1);
        DEBUG_MSG("%s %s", "set hostnames", self->hostnames);
    }

void
__set_connect_timeout(Redis::Cluster::Fast self, double double_sec)
PREINIT:
    int second, micro_second;
    struct timeval timeout;
CODE:
    second = (int) (double_sec);
    micro_second = (int) (fmod(double_sec * ONE_SECOND_TO_MICRO, ONE_SECOND_TO_MICRO) + 0.999);
    timeout.tv_sec = second;
    timeout.tv_usec = micro_second;
    self->connect_timeout = timeout;
    DEBUG_MSG("connect timeout %d, %d", second, micro_second);

void
__set_command_timeout(Redis::Cluster::Fast self, double double_sec)
PREINIT:
    int second, micro_second;
    struct timeval timeout;
CODE:
    second = (int) (double_sec);
    micro_second = (int) (fmod(double_sec * ONE_SECOND_TO_MICRO, ONE_SECOND_TO_MICRO) + 0.999);
    timeout.tv_sec = second;
    timeout.tv_usec = micro_second;
    self->command_timeout = timeout;
    DEBUG_MSG("command timeout %d, %d", second, micro_second);

void
__set_max_retry(Redis::Cluster::Fast self, int max_retry)
CODE:
    self->max_retry = max_retry;
    DEBUG_MSG("max_retry %d", max_retry);

void
__set_route_use_slots(Redis::Cluster::Fast self, int use_slot)
CODE:
    self->use_cluster_slots = use_slot;

void
__set_cluster_discovery_retry_timeout(Redis::Cluster::Fast self, double double_sec)
CODE:
    self->discovery_timeout_usec = (int64_t) (double_sec * ONE_SECOND_TO_MICRO);
    DEBUG_MSG("discovery timeout %ld", self->discovery_timeout_usec);

SV*
__connect(Redis::Cluster::Fast self)
CODE:
    RETVAL = Redis__Cluster__Fast_connect(aTHX_ self);
    if (RETVAL == NULL) {
        RETVAL = &PL_sv_undef;
    }
OUTPUT:
    RETVAL

SV*
__disconnect(Redis::Cluster::Fast self)
CODE:
    RETVAL = Redis__Cluster__Fast_disconnect(aTHX_ self);
    if (RETVAL == NULL) {
        RETVAL = &PL_sv_undef;
    }
OUTPUT:
    RETVAL

SV*
__wait_until_event_ready(Redis::Cluster::Fast self)
CODE:
    RETVAL = Redis__Cluster__Fast_wait_until_event_ready(aTHX_ self);
    if (RETVAL == NULL) {
        RETVAL = &PL_sv_undef;
    }
OUTPUT:
    RETVAL

void
__std_cmd(Redis::Cluster::Fast self, ...)
PREINIT:
    cmd_reply_context_t* result_context;
    char** argv;
    size_t* argvlen;
    STRLEN len;
    int argc, i;
    SV* cb;
PPCODE:
    if (!self->acc) {



( run in 1.193 second using v1.01-cache-2.11-cpan-677af5a14d3 )