EV-Hiredis

 view release on metacpan or  search on metacpan

deps/hiredis/README.md  view on Meta::CPAN

Version 1.0.0 marks the first stable release of Hiredis.
It includes some minor breaking changes, mostly to make the exposed API more uniform and self-explanatory.
It also bundles the updated `sds` library, to sync up with upstream and Redis.
For code changes see the [Changelog](CHANGELOG.md).

_Note:  As described below, a few member names have been changed but most applications should be able to upgrade with minor code changes and recompiling._

## IMPORTANT:  Breaking changes from `0.14.1` -> `1.0.0`

* `redisContext` has two additional members (`free_privdata`, and `privctx`).
* `redisOptions.timeout` has been renamed to `redisOptions.connect_timeout`, and we've added `redisOptions.command_timeout`.
* `redisReplyObjectFunctions.createArray` now takes `size_t` instead of `int` for its length parameter.

## IMPORTANT:  Breaking changes when upgrading from 0.13.x -> 0.14.x

Bulk and multi-bulk lengths less than -1 or greater than `LLONG_MAX` are now
protocol errors. This is consistent with the RESP specification. On 32-bit
platforms, the upper bound is lowered to `SIZE_MAX`.

Change `redisReply.len` to `size_t`, as it denotes the the size of a string

deps/hiredis/async.c  view on Meta::CPAN

    redisCallback cb;
    /* must not be called from a callback */
    assert(!(c->flags & REDIS_IN_CALLBACK));

    if ((c->flags & REDIS_CONNECTED)) {
        if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
            /* Nothing to do - just an idle timeout */
            return;
        }

        if (!ac->c.command_timeout ||
            (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) {
            /* A belated connect timeout arriving, ignore */
            return;
        }
    }

    if (!c->err) {
        __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
        __redisAsyncCopyError(ac);
    }

deps/hiredis/async.c  view on Meta::CPAN

    return status;
}

redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
    redisAsyncPushFn *old = ac->push_cb;
    ac->push_cb = fn;
    return old;
}

int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
    if (!ac->c.command_timeout) {
        ac->c.command_timeout = hi_calloc(1, sizeof(tv));
        if (ac->c.command_timeout == NULL) {
            __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
            __redisAsyncCopyError(ac);
            return REDIS_ERR;
        }
    }

    if (tv.tv_sec != ac->c.command_timeout->tv_sec ||
        tv.tv_usec != ac->c.command_timeout->tv_usec)
    {
        *ac->c.command_timeout = tv;
    }

    return REDIS_OK;
}

deps/hiredis/async_private.h  view on Meta::CPAN

static inline void refreshTimeout(redisAsyncContext *ctx) {
    #define REDIS_TIMER_ISSET(tvp) \
        (tvp && ((tvp)->tv_sec || (tvp)->tv_usec))

    #define REDIS_EL_TIMER(ac, tvp) \
        if ((ac)->ev.scheduleTimer && REDIS_TIMER_ISSET(tvp)) { \
            (ac)->ev.scheduleTimer((ac)->ev.data, *(tvp)); \
        }

    if (ctx->c.flags & REDIS_CONNECTED) {
        REDIS_EL_TIMER(ctx, ctx->c.command_timeout);
    } else {
        REDIS_EL_TIMER(ctx, ctx->c.connect_timeout);
    }
}

void __redisAsyncDisconnect(redisAsyncContext *ac);
void redisProcessCallbacks(redisAsyncContext *ac);

#endif  /* __HIREDIS_ASYNC_PRIVATE_H */

deps/hiredis/hiredis.c  view on Meta::CPAN

    if (c->funcs && c->funcs->close) {
        c->funcs->close(c);
    }

    sdsfree(c->obuf);
    redisReaderFree(c->reader);
    hi_free(c->tcp.host);
    hi_free(c->tcp.source_addr);
    hi_free(c->unix_sock.path);
    hi_free(c->connect_timeout);
    hi_free(c->command_timeout);
    hi_free(c->saddr);

    if (c->privdata && c->free_privdata)
        c->free_privdata(c->privdata);

    if (c->funcs && c->funcs->free_privctx)
        c->funcs->free_privctx(c->privctx);

    memset(c, 0xff, sizeof(*c));
    hi_free(c);

deps/hiredis/hiredis.c  view on Meta::CPAN

               c->connect_timeout, c->tcp.source_addr);
    } else if (c->connection_type == REDIS_CONN_UNIX) {
        ret = redisContextConnectUnix(c, c->unix_sock.path, c->connect_timeout);
    } else {
        /* Something bad happened here and shouldn't have. There isn't
           enough information in the context to reconnect. */
        __redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect");
        ret = REDIS_ERR;
    }

    if (c->command_timeout != NULL && (c->flags & REDIS_BLOCK) && c->fd != REDIS_INVALID_FD) {
        redisContextSetTimeout(c, *c->command_timeout);
    }

    return ret;
}

redisContext *redisConnectWithOptions(const redisOptions *options) {
    redisContext *c = redisContextInit();
    if (c == NULL) {
        return NULL;
    }

deps/hiredis/hiredis.c  view on Meta::CPAN

     * as a default unless specifically flagged that we don't want one. */
    if (options->push_cb != NULL)
        redisSetPushCallback(c, options->push_cb);
    else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE))
        redisSetPushCallback(c, redisPushAutoFree);

    c->privdata = options->privdata;
    c->free_privdata = options->free_privdata;

    if (redisContextUpdateConnectTimeout(c, options->connect_timeout) != REDIS_OK ||
        redisContextUpdateCommandTimeout(c, options->command_timeout) != REDIS_OK) {
        __redisSetError(c, REDIS_ERR_OOM, "Out of memory");
        return c;
    }

    if (options->type == REDIS_CONN_TCP) {
        redisContextConnectBindTcp(c, options->endpoint.tcp.ip,
                                   options->endpoint.tcp.port, options->connect_timeout,
                                   options->endpoint.tcp.source_addr);
    } else if (options->type == REDIS_CONN_UNIX) {
        redisContextConnectUnix(c, options->endpoint.unix_socket,
                                options->connect_timeout);
    } else if (options->type == REDIS_CONN_USERFD) {
        c->fd = options->endpoint.fd;
        c->flags |= REDIS_CONNECTED;
    } else {
        redisFree(c);
        return NULL;
    }

    if (options->command_timeout != NULL && (c->flags & REDIS_BLOCK) && c->fd != REDIS_INVALID_FD) {
        redisContextSetTimeout(c, *options->command_timeout);
    }

    return c;
}

/* Connect to a Redis instance. On error the field error in the returned
 * context will be set to the return value of the error function.
 * When no set of reply functions is given, the default set will be used. */
redisContext *redisConnect(const char *ip, int port) {
    redisOptions options = {0};

deps/hiredis/hiredis.h  view on Meta::CPAN

     * the type of connection to use. This also indicates which
     * `endpoint` member field to use
     */
    int type;
    /* bit field of REDIS_OPT_xxx */
    int options;
    /* timeout value for connect operation. If NULL, no timeout is used */
    const struct timeval *connect_timeout;
    /* timeout value for commands. If NULL, no timeout is used.  This can be
     * updated at runtime with redisSetTimeout/redisAsyncSetTimeout. */
    const struct timeval *command_timeout;
    union {
        /** use this field for tcp/ip connections */
        struct {
            const char *source_addr;
            const char *ip;
            int port;
        } tcp;
        /** use this field for unix domain sockets */
        const char *unix_socket;
        /**

deps/hiredis/hiredis.h  view on Meta::CPAN


    int err; /* Error flags, 0 when there is no error */
    char errstr[128]; /* String representation of error when applicable */
    redisFD fd;
    int flags;
    char *obuf; /* Write buffer */
    redisReader *reader; /* Protocol reader */

    enum redisConnectionType connection_type;
    struct timeval *connect_timeout;
    struct timeval *command_timeout;

    struct {
        char *host;
        char *source_addr;
        int port;
    } tcp;

    struct {
        char *path;
    } unix_sock;

deps/hiredis/net.c  view on Meta::CPAN

        if (c->connect_timeout == NULL)
            return REDIS_ERR;
    }

    memcpy(c->connect_timeout, timeout, sizeof(*c->connect_timeout));
    return REDIS_OK;
}

int redisContextUpdateCommandTimeout(redisContext *c, const struct timeval *timeout) {
    /* Same timeval struct, short circuit */
    if (c->command_timeout == timeout)
        return REDIS_OK;

    /* Allocate context timeval if we need to */
    if (c->command_timeout == NULL) {
        c->command_timeout = hi_malloc(sizeof(*c->command_timeout));
        if (c->command_timeout == NULL)
            return REDIS_ERR;
    }

    memcpy(c->command_timeout, timeout, sizeof(*c->command_timeout));
    return REDIS_OK;
}

static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
                                   const struct timeval *timeout,
                                   const char *source_addr) {
    redisFD s;
    int rv, n;
    char _port[6];  /* strlen("65535"); */
    struct addrinfo hints, *servinfo, *bservinfo, *p, *b;

deps/hiredis/test.c  view on Meta::CPAN


    /* Start event dispatching loop */
    test_cond(event_base_dispatch(base) == 0);
    event_free(timeout);
    event_base_free(base);

    /* Verify test checkpoints */
    assert(state.checkpoint == 6);
}

/* Subscribe callback for test_command_timeout_during_pubsub:
 * - a subscribe response triggers a published message
 * - the published message triggers a command that times out
 * - the command timeout triggers a disconnect */
void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) {
    redisReply *reply = r;
    TestState *state = privdata;

    /* The non-clean disconnect should trigger the
     * subscription callback with a NULL reply. */
    if (reply == NULL) {

deps/hiredis/test.c  view on Meta::CPAN


        /* Send a command that will trigger a timeout */
        redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3");
        redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo");
    } else {
        printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
        exit(1);
    }
}

static void test_command_timeout_during_pubsub(struct config config) {
    test("Command timeout during Pub/Sub: ");
    /* Setup event dispatcher with a testcase timeout */
    base = event_base_new();
    struct event *timeout = evtimer_new(base,timeout_cb,NULL);
    assert(timeout != NULL);

    evtimer_assign(timeout,base,timeout_cb,NULL);
    struct timeval timeout_tv = {.tv_sec = 10};
    evtimer_add(timeout,&timeout_tv);

    /* Connect */
    redisOptions options = get_redis_tcp_options(config);
    redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
    assert(ac != NULL && ac->err == 0);
    redisLibeventAttach(ac,base);

    /* Configure a command timout */
    struct timeval command_timeout = {.tv_sec = 2};
    redisAsyncSetTimeout(ac,command_timeout);

    /* Not expecting any push messages in this test */
    redisAsyncSetPushCallback(ac,unexpected_push_cb);

    /* Switch protocol */
    redisAsyncCommand(ac,NULL,NULL,"HELLO 3");

    /* Start subscribe */
    TestState state = {.options = &options, .resp3 = 1};
    redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel");

deps/hiredis/test.c  view on Meta::CPAN

    int major;
    redisContext *c = do_connect(cfg);
    get_redis_version(c, &major, NULL);
    disconnect(c, 0);

    test_pubsub_handling(cfg);
    test_pubsub_multiple_channels(cfg);
    test_monitor(cfg);
    if (major >= 6) {
        test_pubsub_handling_resp3(cfg);
        test_command_timeout_during_pubsub(cfg);
    }
#endif /* HIREDIS_TEST_ASYNC */

    cfg.type = CONN_TCP;
    printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
    test_async_polling(cfg);
    if (test_unix_socket) {
        cfg.type = CONN_UNIX;
        printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path);
        test_async_polling(cfg);

lib/EV/Hiredis.pm  view on Meta::CPAN


sub new {
    my ($class, %args) = @_;

    my $loop = $args{loop} || EV::default_loop;
    my $self = $class->_new($loop);

    $self->on_error($args{on_error} || sub { die @_ });
    $self->on_connect($args{on_connect}) if $args{on_connect};
    $self->connect_timeout($args{connect_timeout}) if $args{connect_timeout};
    $self->command_timeout($args{command_timeout}) if $args{command_timeout};

    if (exists $args{host}) {
        $self->connect($args{host}, defined $args{port} ? $args{port} : 6379);
    }
    elsif (exists $args{path}) {
        $self->connect_unix($args{path});
    }

    $self;
}

lib/EV/Hiredis.pm  view on Meta::CPAN

=item * on_connect => $cb->()

Connection callback will be called when connection successful and completed to redis server.

This callback can be set by C<< $obj->on_connect($cb) >> method any time.

=item * connect_timeout => $num_of_milliseconds

Connection timeout.

=item * command_timeout => $num_of_milliseconds

Command timeout.

=item * loop => 'EV::loop',

EV loop for running this instance. Default is C<EV::default_loop>.

=back

All parameters are optional.

src/EV__Hiredis.xs  view on Meta::CPAN


typedef ev_hiredis_t* EV__Hiredis;
typedef struct ev_loop* EV__Loop;

struct ev_hiredis_s {
    struct ev_loop* loop;
    redisAsyncContext* ac;
    SV* error_handler;
    SV* connect_handler;
    struct timeval* connect_timeout;
    struct timeval* command_timeout;
    ngx_queue_t cb_queue; /* for long term callbacks such as subscribe */
};

struct ev_hiredis_cb_s {
    SV* cb;
    ngx_queue_t queue;
    int persist;
};

static void emit_error(EV__Hiredis self, SV* error) {

src/EV__Hiredis.xs  view on Meta::CPAN

        emit_error(self, sv_error);
    }

    remove_cb_queue(self);
}

static void pre_connect_common(EV__Hiredis self, redisOptions* opts) {
    if (NULL != self->connect_timeout) {
        opts->connect_timeout = self->connect_timeout;
    }
    if (NULL != self->command_timeout) {
        opts->command_timeout = self->command_timeout;
    }
}

static void connect_common(EV__Hiredis self) {
    int r;
    SV* sv_error = NULL;

    self->ac->data = (void*)self;

    r = redisLibevAttach(self->loop, self->ac);

src/EV__Hiredis.xs  view on Meta::CPAN

        self->error_handler = NULL;
    }
    if (NULL != self->connect_handler) {
        SvREFCNT_dec(self->connect_handler);
        self->connect_handler = NULL;
    }
    if (NULL != self->connect_timeout) {
        Safefree(self->connect_timeout);
        self->connect_timeout = NULL;
    }
    if (NULL != self->command_timeout) {
        Safefree(self->command_timeout);
        self->command_timeout = NULL;
    }

    remove_cb_queue(self);

    Safefree(self);
}

void
connect(EV::Hiredis self, char* hostname, int port = 6379);
CODE:

src/EV__Hiredis.xs  view on Meta::CPAN

CODE:
{
    if (NULL == self->connect_timeout) {
        Newx(self->connect_timeout, 1, struct timeval);
    }
    self->connect_timeout->tv_sec = timeout_ms / 1000;
    self->connect_timeout->tv_usec = (timeout_ms % 1000) * 1000;
}

void
command_timeout(EV::Hiredis self, int timeout_ms);
CODE:
{
    if (NULL == self->command_timeout) {
        Newx(self->command_timeout, 1, struct timeval);
    }
    self->command_timeout->tv_sec = timeout_ms / 1000;
    self->command_timeout->tv_usec = (timeout_ms % 1000) * 1000;
}

CV*
on_error(EV::Hiredis self, CV* handler = NULL);
CODE:
{
    if (NULL != self->error_handler) {
        SvREFCNT_dec(self->error_handler);
        self->error_handler = NULL;
    }

t/connect.t  view on Meta::CPAN

});

$r->connect('127.0.0.1', $port);

EV::run;

is $connected, 1;
is $error, 0;


$r = EV::Hiredis->new(connect_timeout => 1000, command_timeout => 1000);

$connected = 0;
$error = 0;

$r->on_error(sub { $error++ });
$r->on_connect(sub {
    $connected++;

    my $t; $t = EV::timer .1, 0, sub {
        $r->disconnect;



( run in 0.369 second using v1.01-cache-2.11-cpan-4d50c553e7e )