Redis-Cluster-Fast

 view release on metacpan or  search on metacpan

deps/hiredis-cluster/hircluster.c  view on Meta::CPAN

    redisReply *reply = NULL;
    int result = redisGetReply(c, (void **)&reply);
    if (result != REDIS_OK) {
        if (c->err == REDIS_ERR_TIMEOUT) {
            __redisClusterSetError(cc, c->err,
                                   "Command (cluster nodes) reply error "
                                   "(socket timeout)");
        } else {
            __redisClusterSetError(cc, REDIS_ERR_OTHER,
                                   "Command (cluster nodes) reply error "
                                   "(NULL).");
        }
        return REDIS_ERR;
    } else if (reply->type != REDIS_REPLY_STRING) {
        if (reply->type == REDIS_REPLY_ERROR) {
            __redisClusterSetError(cc, REDIS_ERR_OTHER, reply->str);
        } else {
            __redisClusterSetError(cc, REDIS_ERR_OTHER,
                                   "Command(cluster nodes) reply error: "
                                   "type is not string.");
        }
        freeReplyObject(reply);
        return REDIS_ERR;
    }

    dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
    freeReplyObject(reply);
    return updateNodesAndSlotmap(cc, nodes);
}

/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with
 * context c. */
static int clusterUpdateRouteHandleReply(redisClusterContext *cc,
                                         redisContext *c) {
    if (cc->flags & HIRCLUSTER_FLAG_ROUTE_USE_SLOTS) {
        return handleClusterSlotsReply(cc, c);
    } else {
        return handleClusterNodesReply(cc, c);
    }
}

/**
 * Update route with the "cluster nodes" or "cluster slots" command reply.
 */
static int cluster_update_route_by_addr(redisClusterContext *cc, const char *ip,
                                        int port) {
    redisContext *c = NULL;

    if (cc == NULL) {
        return REDIS_ERR;
    }

    if (ip == NULL || port <= 0) {
        __redisClusterSetError(cc, REDIS_ERR_OTHER, "Ip or port error!");
        goto error;
    }

    redisOptions options = {0};
    REDIS_OPTIONS_SET_TCP(&options, ip, port);
    options.connect_timeout = cc->connect_timeout;
    options.command_timeout = cc->command_timeout;

    c = redisConnectWithOptions(&options);
    if (c == NULL) {
        __redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
        return REDIS_ERR;
    }

    if (cc->on_connect) {
        cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
    }

    if (c->err) {
        __redisClusterSetError(cc, c->err, c->errstr);
        goto error;
    }

    if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
        __redisClusterSetError(cc, c->err, c->errstr);
        goto error;
    }

    if (authenticate(cc, c) != REDIS_OK) {
        goto error;
    }

    if (clusterUpdateRouteSendCommand(cc, c) != REDIS_OK) {
        goto error;
    }

    if (clusterUpdateRouteHandleReply(cc, c) != REDIS_OK) {
        goto error;
    }

    redisFree(c);
    return REDIS_OK;

error:
    redisFree(c);
    return REDIS_ERR;
}

/* Update known cluster nodes with a new collection of redisClusterNodes.
 * Will also update the slot-to-node lookup table for the new nodes. */
static int updateNodesAndSlotmap(redisClusterContext *cc, dict *nodes) {
    if (nodes == NULL) {
        return REDIS_ERR;
    }

    /* Create a slot to redisClusterNode lookup table */
    redisClusterNode **table;
    table = hi_calloc(REDIS_CLUSTER_SLOTS, sizeof(redisClusterNode *));
    if (table == NULL) {
        goto oom;
    }

    dictIterator di;
    dictInitIterator(&di, nodes);

    dictEntry *de;
    while ((de = dictNext(&di))) {

deps/hiredis-cluster/hircluster.c  view on Meta::CPAN


    if (cc->nodes == NULL) {
        __redisClusterSetError(cc, REDIS_ERR_OTHER, "no server address");
        return REDIS_ERR;
    }

    dictIterator di;
    dictInitIterator(&di, cc->nodes);

    while ((de = dictNext(&di)) != NULL) {
        node = dictGetEntryVal(de);
        if (node == NULL || node->host == NULL) {
            continue;
        }

        ret = cluster_update_route_by_addr(cc, node->host, node->port);
        if (ret == REDIS_OK) {
            if (cc->err) {
                cc->err = 0;
                memset(cc->errstr, '\0', strlen(cc->errstr));
            }
            return REDIS_OK;
        }

        flag_err_not_set = 0;
    }

    if (flag_err_not_set) {
        __redisClusterSetError(cc, REDIS_ERR_OTHER, "no valid server address");
    }

    return REDIS_ERR;
}

redisClusterContext *redisClusterContextInit(void) {
    redisClusterContext *cc;

    cc = hi_calloc(1, sizeof(redisClusterContext));
    if (cc == NULL)
        return NULL;

    cc->max_retry_count = CLUSTER_DEFAULT_MAX_RETRY_COUNT;
    return cc;
}

void redisClusterFree(redisClusterContext *cc) {

    if (cc == NULL)
        return;

    if (cc->event_callback) {
        cc->event_callback(cc, HIRCLUSTER_EVENT_FREE_CONTEXT,
                           cc->event_privdata);
    }

    if (cc->connect_timeout) {
        hi_free(cc->connect_timeout);
        cc->connect_timeout = NULL;
    }

    if (cc->command_timeout) {
        hi_free(cc->command_timeout);
        cc->command_timeout = NULL;
    }

    if (cc->table != NULL) {
        hi_free(cc->table);
        cc->table = NULL;
    }

    if (cc->nodes != NULL) {
        /* Clear cc->nodes before releasing the dict since the release procedure
           might access cc->nodes. When a node and its hiredis context are freed
           all pending callbacks are executed. Clearing cc->nodes prevents a pending
           slotmap update command callback to trigger additional slotmap updates. */
        dict *nodes = cc->nodes;
        cc->nodes = NULL;
        dictRelease(nodes);
    }

    if (cc->requests != NULL) {
        listRelease(cc->requests);
    }

    if (cc->username != NULL) {
        hi_free(cc->username);
        cc->username = NULL;
    }

    if (cc->password != NULL) {
        hi_free(cc->password);
        cc->password = NULL;
    }

    hi_free(cc);
}

/* Connect to a Redis cluster. On error the field error in the returned
 * context will be set to the return value of the error function.
 * When no set of reply functions is given, the default set will be used. */
static int _redisClusterConnect2(redisClusterContext *cc) {

    if (cc->nodes == NULL || dictSize(cc->nodes) == 0) {
        __redisClusterSetError(cc, REDIS_ERR_OTHER,
                               "servers address does not set up");
        return REDIS_ERR;
    }

    return redisClusterUpdateSlotmap(cc);
}

/* Connect to a Redis cluster. On error the field error in the returned
 * context will be set to the return value of the error function.
 * When no set of reply functions is given, the default set will be used. */
static redisClusterContext *_redisClusterConnect(redisClusterContext *cc,
                                                 const char *addrs) {

    int ret;

    ret = redisClusterSetOptionAddNodes(cc, addrs);
    if (ret != REDIS_OK) {
        return cc;
    }

deps/hiredis-cluster/hircluster.c  view on Meta::CPAN


int redisClusterSetOptionParseSlaves(redisClusterContext *cc) {

    if (cc == NULL) {
        return REDIS_ERR;
    }

    cc->flags |= HIRCLUSTER_FLAG_ADD_SLAVE;

    return REDIS_OK;
}

int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc) {

    if (cc == NULL) {
        return REDIS_ERR;
    }

    cc->flags |= HIRCLUSTER_FLAG_ADD_OPENSLOT;

    return REDIS_OK;
}

int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc) {

    if (cc == NULL) {
        return REDIS_ERR;
    }

    cc->flags |= HIRCLUSTER_FLAG_ROUTE_USE_SLOTS;

    return REDIS_OK;
}

int redisClusterSetOptionConnectTimeout(redisClusterContext *cc,
                                        const struct timeval tv) {

    if (cc == NULL) {
        return REDIS_ERR;
    }

    if (cc->connect_timeout == NULL) {
        cc->connect_timeout = hi_malloc(sizeof(struct timeval));
        if (cc->connect_timeout == NULL) {
            __redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
            return REDIS_ERR;
        }
    }

    memcpy(cc->connect_timeout, &tv, sizeof(struct timeval));

    return REDIS_OK;
}

int redisClusterSetOptionTimeout(redisClusterContext *cc,
                                 const struct timeval tv) {
    if (cc == NULL) {
        return REDIS_ERR;
    }

    if (cc->command_timeout == NULL ||
        cc->command_timeout->tv_sec != tv.tv_sec ||
        cc->command_timeout->tv_usec != tv.tv_usec) {

        if (cc->command_timeout == NULL) {
            cc->command_timeout = hi_malloc(sizeof(struct timeval));
            if (cc->command_timeout == NULL) {
                __redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
                return REDIS_ERR;
            }
        }

        memcpy(cc->command_timeout, &tv, sizeof(struct timeval));

        /* Set timeout on already connected nodes */
        if (cc->nodes && dictSize(cc->nodes) > 0) {
            dictEntry *de;
            redisClusterNode *node;

            dictIterator di;
            dictInitIterator(&di, cc->nodes);

            while ((de = dictNext(&di)) != NULL) {
                node = dictGetEntryVal(de);
                if (node->acon) {
                    redisAsyncSetTimeout(node->acon, tv);
                }
                if (node->con && node->con->err == 0) {
                    redisSetTimeout(node->con, tv);
                }

                if (node->slaves && listLength(node->slaves) > 0) {
                    redisClusterNode *slave;
                    listNode *ln;

                    listIter li;
                    listRewind(node->slaves, &li);

                    while ((ln = listNext(&li)) != NULL) {
                        slave = listNodeValue(ln);
                        if (slave->acon) {
                            redisAsyncSetTimeout(slave->acon, tv);
                        }
                        if (slave->con && slave->con->err == 0) {
                            redisSetTimeout(slave->con, tv);
                        }
                    }
                }
            }
        }
    }

    return REDIS_OK;
}

int redisClusterSetOptionMaxRetry(redisClusterContext *cc,
                                  int max_retry_count) {
    if (cc == NULL || max_retry_count <= 0) {
        return REDIS_ERR;
    }

    cc->max_retry_count = max_retry_count;

    return REDIS_OK;
}

int redisClusterConnect2(redisClusterContext *cc) {

    if (cc == NULL) {
        return REDIS_ERR;
    }
    /* Clear a previously set shutdown flag since we allow a
     * reconnection of an async context using this API (legacy). */
    cc->flags &= ~HIRCLUSTER_FLAG_SHUTDOWN;

    return _redisClusterConnect2(cc);
}

redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node) {
    redisContext *c = NULL;
    if (node == NULL) {
        return NULL;
    }

    c = node->con;
    if (c != NULL) {
        if (c->err) {
            redisReconnect(c);

            if (cc->on_connect) {
                cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
            }

            if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
                __redisClusterSetError(cc, c->err, c->errstr);
            }

            authenticate(cc, c); // err and errstr handled in function
        }

        return c;
    }

    if (node->host == NULL || node->port <= 0) {
        return NULL;
    }

    redisOptions options = {0};
    REDIS_OPTIONS_SET_TCP(&options, node->host, node->port);
    options.connect_timeout = cc->connect_timeout;
    options.command_timeout = cc->command_timeout;

    c = redisConnectWithOptions(&options);
    if (c == NULL) {
        __redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
        return NULL;
    }

    if (cc->on_connect) {
        cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
    }

    if (c->err) {
        __redisClusterSetError(cc, c->err, c->errstr);
        redisFree(c);
        return NULL;
    }

    if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
        __redisClusterSetError(cc, c->err, c->errstr);
        redisFree(c);
        return NULL;
    }

    if (authenticate(cc, c) != REDIS_OK) {
        redisFree(c);
        return NULL;
    }

    node->con = c;

    return c;
}

static redisClusterNode *node_get_by_table(redisClusterContext *cc,
                                           uint32_t slot_num) {
    if (cc == NULL) {
        return NULL;
    }

    if (slot_num >= REDIS_CLUSTER_SLOTS) {
        __redisClusterSetError(cc, REDIS_ERR_OTHER, "invalid slot");
        return NULL;
    }

    if (cc->table == NULL) {
        __redisClusterSetError(cc, REDIS_ERR_OTHER, "slotmap not available");
        return NULL;
    }

    if (cc->table[slot_num] == NULL) {
        __redisClusterSetError(cc, REDIS_ERR_OTHER,
                               "slot not served by any node");
        return NULL;
    }

    return cc->table[slot_num];
}

/* Helper function for the redisClusterAppendCommand* family of functions.
 *

deps/hiredis-cluster/hircluster.c  view on Meta::CPAN


static cluster_async_data *cluster_async_data_create(void) {
    /* use calloc to guarantee all fields are zeroed */
    return hi_calloc(1, sizeof(cluster_async_data));
}

static void cluster_async_data_free(cluster_async_data *cad) {
    if (cad == NULL) {
        return;
    }

    command_destroy(cad->command);

    hi_free(cad);
}

static void unlinkAsyncContextAndNode(void *data) {
    redisClusterNode *node;

    if (data) {
        node = (redisClusterNode *)(data);
        node->acon = NULL;
    }
}

redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc,
                                    redisClusterNode *node) {
    redisAsyncContext *ac;
    int ret;

    if (node == NULL) {
        return NULL;
    }

    ac = node->acon;
    if (ac != NULL) {
        if (ac->c.err == 0) {
            return ac;
        } else {
            /* The cluster node has a hiredis context with errors. Hiredis
             * will asynchronously destruct the context and unlink it from
             * the cluster node object. Return an error until done.
             * An example scenario is when sending a command from a command
             * callback, which has a NULL reply due to a disconnect. */
            __redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
            return NULL;
        }
    }

    // No async context exists, perform a connect

    if (node->host == NULL || node->port <= 0) {
        __redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
                                    "node host or port is error");
        return NULL;
    }

    redisOptions options = {0};
    REDIS_OPTIONS_SET_TCP(&options, node->host, node->port);
    options.connect_timeout = acc->cc->connect_timeout;
    options.command_timeout = acc->cc->command_timeout;

    node->lastConnectionAttempt = hi_usec_now();

    ac = redisAsyncConnectWithOptions(&options);
    if (ac == NULL) {
        __redisClusterAsyncSetError(acc, REDIS_ERR_OOM, "Out of memory");
        return NULL;
    }

    if (ac->err) {
        __redisClusterAsyncSetError(acc, ac->err, ac->errstr);
        redisAsyncFree(ac);
        return NULL;
    }

    if (acc->cc->ssl &&
        acc->cc->ssl_init_fn(&ac->c, acc->cc->ssl) != REDIS_OK) {
        __redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
        redisAsyncFree(ac);
        return NULL;
    }

    // Authenticate when needed
    if (acc->cc->password != NULL) {
        if (acc->cc->username != NULL) {
            ret = redisAsyncCommand(ac, NULL, NULL, "AUTH %s %s",
                                    acc->cc->username, acc->cc->password);
        } else {
            ret =
                redisAsyncCommand(ac, NULL, NULL, "AUTH %s", acc->cc->password);
        }

        if (ret != REDIS_OK) {
            __redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
            redisAsyncFree(ac);
            return NULL;
        }
    }

    if (acc->adapter) {
        ret = acc->attach_fn(ac, acc->adapter);
        if (ret != REDIS_OK) {
            __redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
                                        "Failed to attach event adapter");
            redisAsyncFree(ac);
            return NULL;
        }
    }

    if (acc->onConnect) {
        redisAsyncSetConnectCallback(ac, acc->onConnect);
    }
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
    else if (acc->onConnectNC) {
        redisAsyncSetConnectCallbackNC(ac, acc->onConnectNC);
    }
#endif

    if (acc->onDisconnect) {
        redisAsyncSetDisconnectCallback(ac, acc->onDisconnect);



( run in 0.883 second using v1.01-cache-2.11-cpan-39bf76dae61 )