EV-Hiredis

 view release on metacpan or  search on metacpan

deps/hiredis/README.md  view on Meta::CPAN

{
    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
    if (c->err) {
        printf("Error: %s\n", c->errstr);
        // handle error
        redisAsyncFree(c);
        c = NULL;
    } else {
        appData->context = c;
        appData->connecting = 1;
        c->data = appData; /* store application pointer for the callbacks */
        redisAsyncSetConnectCallback(c, appOnConnect);
        redisAsyncSetDisconnectCallback(c, appOnDisconnect);
    }
}

```


The asynchronous context _should_ hold a *connect* callback function that is called when the connection
attempt completes, either successfully or with an error.
It _can_ also hold a *disconnect* callback function that is called when the
connection is disconnected (either because of an error or per user request). Both callbacks should
have the following prototype:
```c
void(const redisAsyncContext *c, int status);
```

On a *connect*, the `status` argument is set to `REDIS_OK` if the connection attempt succeeded.  In this
case, the context is ready to accept commands.  If it is called with `REDIS_ERR` then the
connection attempt failed. The `err` field in the context can be accessed to find out the cause of the error.
After a failed connection attempt, the context object is automatically freed by the library after calling
the connect callback.  This may be a good point to create a new context and retry the connection.

On a disconnect, the `status` argument is set to `REDIS_OK` when disconnection was initiated by the
user, or `REDIS_ERR` when the disconnection was caused by an error. When it is `REDIS_ERR`, the `err`
field in the context can be accessed to find out the cause of the error.

The context object is always freed after the disconnect callback fired. When a reconnect is needed,
the disconnect callback is a good point to do so.

Setting the connect or disconnect callbacks can only be done once per context. For subsequent calls the
api will return `REDIS_ERR`. The function to set the callbacks have the following prototype:
```c
/* Alternatively you can use redisAsyncSetConnectCallbackNC which will be passed a non-const
   redisAsyncContext* on invocation (e.g. allowing writes to the privdata member). */
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
```
`ac->data` may be used to pass user data to both callbacks.  A typical implementation
might look something like this:
```c
void appOnConnect(redisAsyncContext *c, int status)
{
    myAppData *appData = (myAppData*)c->data; /* get my application specific context*/
    appData->connecting = 0;
    if (status == REDIS_OK) {
        appData->connected = 1;
    } else {
        appData->connected = 0;

deps/hiredis/README.md  view on Meta::CPAN

    if (status == REDIS_OK) {
        appNotifyDisconnectCompleted(mydata);
    } else {
        appNotifyUnexpectedDisconnect(mydata);
        appAttemptReconnect();
    }
}
```


### Sending commands and their callbacks

In an asynchronous context, commands are automatically pipelined due to the nature of an event loop.
Therefore, unlike the synchronous API, there is only a single way to send commands.
Because commands are sent to Redis asynchronously, issuing a command requires a callback function
that is called when the reply is received. Reply callbacks should have the following prototype:
```c
void(redisAsyncContext *c, void *reply, void *privdata);
```
The `privdata` argument can be used to curry arbitrary data to the callback from the point where
the command is initially queued for execution.

The functions that can be used to issue commands in an asynchronous context are:
```c
int redisAsyncCommand(
  redisAsyncContext *ac, redisCallbackFn *fn, void *privdata,

deps/hiredis/README.md  view on Meta::CPAN

```
Both functions work like their blocking counterparts. The return value is `REDIS_OK` when the command
was successfully added to the output buffer and `REDIS_ERR` otherwise. Example: when the connection
is being disconnected per user-request, no new commands may be added to the output buffer and `REDIS_ERR` is
returned on calls to the `redisAsyncCommand` family.

If the reply for a command with a `NULL` callback is read, it is immediately freed. When the callback
for a command is non-`NULL`, the memory is freed immediately following the callback: the reply is only
valid for the duration of the callback.

All pending callbacks are called with a `NULL` reply when the context encountered an error.

For every command issued, with the exception of **SUBSCRIBE** and **PSUBSCRIBE**, the callback is
called exactly once.  Even if the context object id disconnected or deleted, every pending callback
will be called with a `NULL` reply.

For **SUBSCRIBE** and **PSUBSCRIBE**, the callbacks may be called repeatedly until an `unsubscribe`
message arrives.  This will be the last invocation of the callback. In case of error, the callbacks
may receive a final `NULL` reply instead.

### Disconnecting

An asynchronous connection can be terminated using:
```c
void redisAsyncDisconnect(redisAsyncContext *ac);
```
When this function is called, the connection is **not** immediately terminated. Instead, new
commands are no longer accepted and the connection is only terminated when all pending commands
have been written to the socket, their respective replies have been read and their respective
callbacks have been executed. After this, the disconnection callback is executed with the
`REDIS_OK` status and the context object is freed.

The connection can be forcefully disconnected using
```c
void redisAsyncFree(redisAsyncContext *ac);
```
In this case, nothing more is written to the socket, all pending callbacks are called with a `NULL`
reply and the disconnection callback is called with `REDIS_OK`, after which the context object
is freed.


### Hooking it up to event library *X*

There are a few hooks that need to be set on the context object after it is created.
See the `adapters/` directory for bindings to *libev* and *libevent*.

## Reply parsing API

deps/hiredis/README.md  view on Meta::CPAN

    /* Handle error and abort... */
}

/* Negotiate SSL/TLS */
if (redisInitiateSSLWithContext(c, ssl_context) != REDIS_OK) {
    /* Handle error, in c->err / c->errstr */
}
```

## RESP3 PUSH replies
Redis 6.0 introduced PUSH replies with the reply-type `>`.  These messages are generated spontaneously and can arrive at any time, so must be handled using callbacks.

### Default behavior
Hiredis installs handlers on `redisContext` and `redisAsyncContext` by default, which will intercept and free any PUSH replies detected.  This means existing code will work as-is after upgrading to Redis 6 and switching to `RESP3`.

### Custom PUSH handler prototypes
The callback prototypes differ between `redisContext` and `redisAsyncContext`.

#### redisContext
```c
void my_push_handler(void *privdata, void *reply) {

deps/hiredis/adapters/poll.h  view on Meta::CPAN

#else
    FILETIME ft;
    ULARGE_INTEGER li;
    GetSystemTimeAsFileTime(&ft);
    li.HighPart = ft.dwHighDateTime;
    li.LowPart = ft.dwLowDateTime;
    return (double)li.QuadPart * 1e-7;
#endif
}

/* Poll for io, handling any pending callbacks.  The timeout argument can be
 * positive to wait for a maximum given time for IO, zero to poll, or negative
 * to wait forever */
static int redisPollTick(redisAsyncContext *ac, double timeout) {
    int reading, writing;
    struct pollfd pfd;
    int handled;
    int ns;
    int itimeout;

    redisPollEvents *e = (redisPollEvents*)ac->ev.data;
    if (!e)
        return 0;

    /* local flags, won't get changed during callbacks */
    reading = e->reading;
    writing = e->writing;
    if (!reading && !writing)
        return 0;

    pfd.fd = e->fd;
    pfd.events = 0;
    if (reading)
        pfd.events = POLLIN;   
    if (writing)

deps/hiredis/async.c  view on Meta::CPAN


#ifdef NDEBUG
#undef assert
#define assert(e) (void)(e)
#endif

/* Forward declarations of hiredis.c functions */
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
void __redisSetError(redisContext *c, int type, const char *str);

/* Functions managing dictionary of callbacks for pub/sub. */
static unsigned int callbackHash(const void *key) {
    return dictGenHashFunction((const unsigned char *)key,
                               sdslen((const sds)key));
}

static void *callbackValDup(void *privdata, const void *src) {
    ((void) privdata);
    redisCallback *dup;

    dup = hi_malloc(sizeof(*dup));

deps/hiredis/async.c  view on Meta::CPAN

}

int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
    if (ac->onDisconnect == NULL) {
        ac->onDisconnect = fn;
        return REDIS_OK;
    }
    return REDIS_ERR;
}

/* Helper functions to push/shift callbacks */
static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
    redisCallback *cb;

    /* Copy callback from stack to heap */
    cb = hi_malloc(sizeof(*cb));
    if (cb == NULL)
        return REDIS_ERR_OOM;

    if (source != NULL) {
        memcpy(cb,source,sizeof(*cb));

deps/hiredis/async.c  view on Meta::CPAN

    }
}

/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    redisCallback cb;
    dictIterator it;
    dictEntry *de;

    /* Execute pending callbacks with NULL reply. */
    while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
        __redisRunCallback(ac,&cb,NULL);
    while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
        __redisRunCallback(ac,&cb,NULL);

    /* Run subscription callbacks with NULL reply */
    if (ac->sub.channels) {
        dictInitIterator(&it,ac->sub.channels);
        while ((de = dictNext(&it)) != NULL)
            __redisRunCallback(ac,dictGetEntryVal(de),NULL);

        dictRelease(ac->sub.channels);
    }

    if (ac->sub.patterns) {
        dictInitIterator(&it,ac->sub.patterns);

deps/hiredis/async.c  view on Meta::CPAN

}

/* Helper function to make the disconnect happen and clean up. */
void __redisAsyncDisconnect(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);

    /* Make sure error is accessible if there is any */
    __redisAsyncCopyError(ac);

    if (ac->err == 0) {
        /* For clean disconnects, there should be no pending callbacks. */
        int ret = __redisShiftCallback(&ac->replies,NULL);
        assert(ret == REDIS_ERR);
    } else {
        /* Disconnection is caused by an error, make sure that pending
         * callbacks cannot call new commands. */
        c->flags |= REDIS_DISCONNECTING;
    }

    /* cleanup event library on disconnect.
     * this is safe to call multiple times */
    _EL_CLEANUP(ac);

    /* For non-clean disconnects, __redisAsyncFree() will execute pending
     * callbacks with a NULL-reply. */
    if (!(c->flags & REDIS_NO_AUTO_FREE)) {
      __redisAsyncFree(ac);
    }
}

/* Tries to do a clean disconnect from Redis, meaning it stops new commands
 * from being issued, but tries to flush the output buffer and execute
 * callbacks for all remaining replies. When this function is called from a
 * callback, there might be more replies and we can safely defer disconnecting
 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
 * when there are no pending callbacks. */
void redisAsyncDisconnect(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    c->flags |= REDIS_DISCONNECTING;

    /** unset the auto-free flag here, because disconnect undoes this */
    c->flags &= ~REDIS_NO_AUTO_FREE;
    if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
        __redisAsyncDisconnect(ac);
}

static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
    redisContext *c = &(ac->c);
    dict *callbacks;
    redisCallback *cb = NULL;
    dictEntry *de;
    int pvariant;
    char *stype;
    sds sname = NULL;

    /* Match reply with the expected format of a pushed message.
     * The type and number of elements (3 to 4) are specified at:
     * https://redis.io/topics/pubsub#format-of-pushed-messages */
    if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
        reply->type == REDIS_REPLY_PUSH) {
        assert(reply->element[0]->type == REDIS_REPLY_STRING);
        stype = reply->element[0]->str;
        pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;

        if (pvariant)
            callbacks = ac->sub.patterns;
        else
            callbacks = ac->sub.channels;

        /* Locate the right callback */
        if (reply->element[1]->type == REDIS_REPLY_STRING) {
            sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
            if (sname == NULL) goto oom;

            if ((de = dictFind(callbacks,sname)) != NULL) {
                cb = dictGetEntryVal(de);
                memcpy(dstcb,cb,sizeof(*dstcb));
            }
        }

        /* If this is an subscribe reply decrease pending counter. */
        if (strcasecmp(stype+pvariant,"subscribe") == 0) {
            assert(cb != NULL);
            cb->pending_subs -= 1;

        } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
            if (cb == NULL)
                ac->sub.pending_unsubs -= 1;
            else if (cb->pending_subs == 0)
                dictDelete(callbacks,sname);

            /* If this was the last unsubscribe message, revert to
             * non-subscribe mode. */
            assert(reply->element[2]->type == REDIS_REPLY_INTEGER);

            /* Unset subscribed flag only when no pipelined pending subscribe
             * or pending unsubscribe replies. */
            if (reply->element[2]->integer == 0
                && dictSize(ac->sub.channels) == 0
                && dictSize(ac->sub.patterns) == 0
                && ac->sub.pending_unsubs == 0) {
                c->flags &= ~REDIS_SUBSCRIBED;

                /* Move ongoing regular command callbacks. */
                redisCallback cb;
                while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
                    __redisPushCallback(&ac->replies,&cb);
                }
            }
        }
        sdsfree(sname);
    } else {
        /* Shift callback for pending command in subscribed context. */
        __redisShiftCallback(&ac->sub.replies,dstcb);

deps/hiredis/async.c  view on Meta::CPAN

         * while allowing subscribe related PUSH messages to pass through.
         * This allows existing code to be backward compatible and work in
         * either RESP2 or RESP3 mode. */
        if (redisIsSpontaneousPushReply(reply)) {
            __redisRunPushCallback(ac, reply);
            c->reader->fn->freeObject(reply);
            continue;
        }

        /* Even if the context is subscribed, pending regular
         * callbacks will get a reply before pub/sub messages arrive. */
        redisCallback cb = {NULL, NULL, 0, 0, NULL};
        if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
            /*
             * A spontaneous reply in a not-subscribed context can be the error
             * reply that is sent when a new connection exceeds the maximum
             * number of allowed connections on the server side.
             *
             * This is seen as an error instead of a regular reply because the
             * server closes the connection after sending it.
             *

deps/hiredis/async.c  view on Meta::CPAN

             * In this case we also want to close the connection, and have the
             * user wait until the server is ready to take our request.
             */
            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
                c->err = REDIS_ERR_OTHER;
                snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
                c->reader->fn->freeObject(reply);
                __redisAsyncDisconnect(ac);
                return;
            }
            /* No more regular callbacks and no errors, the context *must* be subscribed. */
            assert(c->flags & REDIS_SUBSCRIBED);
            if (c->flags & REDIS_SUBSCRIBED)
                __redisGetSubscribeCallback(ac,reply,&cb);
        }

        if (cb.fn != NULL) {
            __redisRunCallback(ac,&cb,reply);
            if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
                c->reader->fn->freeObject(reply);
            }

            /* Proceed with free'ing when redisAsyncFree() was called. */
            if (c->flags & REDIS_FREEING) {
                __redisAsyncFree(ac);
                return;
            }
        } else {
            /* No callback for this reply. This can either be a NULL callback,
             * or there were no callbacks to begin with. Either way, don't
             * abort with an error, but simply ignore it because the client
             * doesn't know what the server will spit out over the wire. */
            c->reader->fn->freeObject(reply);
        }

        /* If in monitor mode, repush the callback */
        if (c->flags & REDIS_MONITORING) {
            __redisPushCallback(&ac->replies,&cb);
        }
    }

deps/hiredis/async.c  view on Meta::CPAN

    if (redisBufferRead(c) == REDIS_ERR) {
        __redisAsyncDisconnect(ac);
    } else {
        /* Always re-schedule reads */
        _EL_ADD_READ(ac);
        redisProcessCallbacks(ac);
    }
}

/* This function should be called when the socket is readable.
 * It processes all replies that can be read and executes their callbacks.
 */
void redisAsyncHandleRead(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    /* must not be called from a callback */
    assert(!(c->flags & REDIS_IN_CALLBACK));

    if (!(c->flags & REDIS_CONNECTED)) {
        /* Abort connect was not successful. */
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
            return;

deps/hiredis/async.c  view on Meta::CPAN

    p = nextArgument(cmd,&cstr,&clen);
    assert(p != NULL);
    hasnext = (p[0] == '$');
    pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
    cstr += pvariant;
    clen -= pvariant;

    if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
        c->flags |= REDIS_SUBSCRIBED;

        /* Add every channel/pattern to the list of subscription callbacks. */
        while ((p = nextArgument(p,&astr,&alen)) != NULL) {
            sname = sdsnewlen(astr,alen);
            if (sname == NULL)
                goto oom;

            if (pvariant)
                cbdict = ac->sub.patterns;
            else
                cbdict = ac->sub.channels;

deps/hiredis/async.h  view on Meta::CPAN

/* Reply callback prototype and container */
typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
typedef struct redisCallback {
    struct redisCallback *next; /* simple singly linked list */
    redisCallbackFn *fn;
    int pending_subs;
    int unsubscribe_sent;
    void *privdata;
} redisCallback;

/* List of callbacks for either regular replies or pub/sub */
typedef struct redisCallbackList {
    redisCallback *head, *tail;
} redisCallbackList;

/* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallbackNC)(struct redisAsyncContext *, int status);
typedef void(redisTimerCallback)(void *timer, void *privdata);

deps/hiredis/async.h  view on Meta::CPAN

    } ev;

    /* Called when either the connection is terminated due to an error or per
     * user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */
    redisDisconnectCallback *onDisconnect;

    /* Called when the first write event was received. */
    redisConnectCallback *onConnect;
    redisConnectCallbackNC *onConnectNC;

    /* Regular command callbacks */
    redisCallbackList replies;

    /* Address used for connect() */
    struct sockaddr *saddr;
    size_t addrlen;

    /* Subscription callbacks */
    struct {
        redisCallbackList replies;
        struct dict *channels;
        struct dict *patterns;
        int pending_unsubs;
    } sub;

    /* Any configured RESP3 PUSH handler */
    redisAsyncPushFn *push_cb;
} redisAsyncContext;

deps/hiredis/ssl.c  view on Meta::CPAN

     * Whether a write was requested prior to a read. If set, the write()
     * should resume whenever a read takes place, if possible
     */
    int pendingWrite;
} redisSSL;

/* Forward declaration */
redisContextFuncs redisContextSSLFuncs;

/**
 * OpenSSL global initialization and locking handling callbacks.
 * Note that this is only required for OpenSSL < 1.1.0.
 */

#if OPENSSL_VERSION_NUMBER < 0x10100000L
#define HIREDIS_USE_CRYPTO_LOCKS
#endif

#ifdef HIREDIS_USE_CRYPTO_LOCKS
#ifdef _WIN32
typedef CRITICAL_SECTION sslLockType;

deps/hiredis/test.c  view on Meta::CPAN

//     redisFree(c);
//
//     test("redisBufferWrite against closed fd: ");
//     c = __connect_nonblock();
//     redisCommand(c,"PING");
//     redisDisconnect(c);
//     test_cond(redisBufferWrite(c,NULL) == REDIS_ERR &&
//               strncmp(c->error,"write:",6) == 0);
//     redisFree(c);
//
//     test("Process callbacks in the right sequence: ");
//     c = __connect_nonblock();
//     redisCommandWithCallback(c,__test_reply_callback,(void*)1,"PING");
//     redisCommandWithCallback(c,__test_reply_callback,(void*)2,"PING");
//     redisCommandWithCallback(c,__test_reply_callback,(void*)3,"PING");
//
//     /* Write output buffer */
//     wdone = 0;
//     while(!wdone) {
//         usleep(500);
//         redisBufferWrite(c,&wdone);
//     }
//
//     /* Read until at least one callback is executed (the 3 replies will
//      * arrive in a single packet, causing all callbacks to be executed in
//      * a single pass). */
//     while(__test_callback_flags == 0) {
//         assert(redisBufferRead(c) == REDIS_OK);
//         redisProcessCallbacks(c);
//     }
//     test_cond(__test_callback_flags == 0x010203);
//     redisFree(c);
//
//     test("redisDisconnect executes pending callbacks with NULL reply: ");
//     c = __connect_nonblock();
//     redisSetDisconnectCallback(c,__test_callback,(void*)1);
//     redisCommandWithCallback(c,__test_reply_callback,(void*)2,"PING");
//     redisDisconnect(c);
//     test_cond(__test_callback_flags == 0x0201);
//     redisFree(c);
// }

#ifdef HIREDIS_TEST_ASYNC
struct event_base *base;

deps/hiredis/test.c  view on Meta::CPAN

    event_free(timeout);
    event_base_free(base);

    /* Verify test checkpoints */
    assert(state.checkpoint == 3);
}
#endif /* HIREDIS_TEST_ASYNC */

/* tests for async api using polling adapter, requires no extra libraries*/

/* enum for the test cases, the callbacks have different logic based on them */
typedef enum astest_no
{
    ASTEST_CONNECT=0,
    ASTEST_CONN_TIMEOUT,
    ASTEST_PINGPONG,
    ASTEST_PINGPONG_TIMEOUT,
    ASTEST_ISSUE_931,
    ASTEST_ISSUE_931_PING
}astest_no;

deps/hiredis/test.c  view on Meta::CPAN

    int connect_status;
    int disconnects;
    int pongs;
    int disconnect_status;
    int connected;
    int err;
    char errstr[256];
};
static struct _astest astest;

/* async callbacks */
static void asCleanup(void* data)
{
    struct _astest *t = (struct _astest *)data;
    t->ac = NULL;
}

static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata);

static void connectCallback(redisAsyncContext *c, int status) {
    struct _astest *t = (struct _astest *)c->data;

src/EV__Hiredis.xs  view on Meta::CPAN

typedef ev_hiredis_t* EV__Hiredis;
typedef struct ev_loop* EV__Loop;

struct ev_hiredis_s {
    struct ev_loop* loop;
    redisAsyncContext* ac;
    SV* error_handler;
    SV* connect_handler;
    struct timeval* connect_timeout;
    struct timeval* command_timeout;
    ngx_queue_t cb_queue; /* for long term callbacks such as subscribe */
};

struct ev_hiredis_cb_s {
    SV* cb;
    ngx_queue_t queue;
    int persist;
};

static void emit_error(EV__Hiredis self, SV* error) {
    if (NULL == self->error_handler) return;



( run in 0.293 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )