Redis-Cluster-Fast
view release on metacpan or search on metacpan
deps/hiredis-cluster/hircluster.c view on Meta::CPAN
redisReply *reply = NULL;
int result = redisGetReply(c, (void **)&reply);
if (result != REDIS_OK) {
if (c->err == REDIS_ERR_TIMEOUT) {
__redisClusterSetError(cc, c->err,
"Command (cluster nodes) reply error "
"(socket timeout)");
} else {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command (cluster nodes) reply error "
"(NULL).");
}
return REDIS_ERR;
} else if (reply->type != REDIS_REPLY_STRING) {
if (reply->type == REDIS_REPLY_ERROR) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, reply->str);
} else {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster nodes) reply error: "
"type is not string.");
}
freeReplyObject(reply);
return REDIS_ERR;
}
dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}
/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with
* context c. */
static int clusterUpdateRouteHandleReply(redisClusterContext *cc,
redisContext *c) {
if (cc->flags & HIRCLUSTER_FLAG_ROUTE_USE_SLOTS) {
return handleClusterSlotsReply(cc, c);
} else {
return handleClusterNodesReply(cc, c);
}
}
/**
* Update route with the "cluster nodes" or "cluster slots" command reply.
*/
static int cluster_update_route_by_addr(redisClusterContext *cc, const char *ip,
int port) {
redisContext *c = NULL;
if (cc == NULL) {
return REDIS_ERR;
}
if (ip == NULL || port <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "Ip or port error!");
goto error;
}
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, ip, port);
options.connect_timeout = cc->connect_timeout;
options.command_timeout = cc->command_timeout;
c = redisConnectWithOptions(&options);
if (c == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
if (cc->on_connect) {
cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
}
if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
if (authenticate(cc, c) != REDIS_OK) {
goto error;
}
if (clusterUpdateRouteSendCommand(cc, c) != REDIS_OK) {
goto error;
}
if (clusterUpdateRouteHandleReply(cc, c) != REDIS_OK) {
goto error;
}
redisFree(c);
return REDIS_OK;
error:
redisFree(c);
return REDIS_ERR;
}
/* Update known cluster nodes with a new collection of redisClusterNodes.
* Will also update the slot-to-node lookup table for the new nodes. */
static int updateNodesAndSlotmap(redisClusterContext *cc, dict *nodes) {
if (nodes == NULL) {
return REDIS_ERR;
}
/* Create a slot to redisClusterNode lookup table */
redisClusterNode **table;
table = hi_calloc(REDIS_CLUSTER_SLOTS, sizeof(redisClusterNode *));
if (table == NULL) {
goto oom;
}
dictIterator di;
dictInitIterator(&di, nodes);
dictEntry *de;
while ((de = dictNext(&di))) {
deps/hiredis-cluster/hircluster.c view on Meta::CPAN
if (cc->nodes == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "no server address");
return REDIS_ERR;
}
dictIterator di;
dictInitIterator(&di, cc->nodes);
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (node == NULL || node->host == NULL) {
continue;
}
ret = cluster_update_route_by_addr(cc, node->host, node->port);
if (ret == REDIS_OK) {
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
}
return REDIS_OK;
}
flag_err_not_set = 0;
}
if (flag_err_not_set) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "no valid server address");
}
return REDIS_ERR;
}
redisClusterContext *redisClusterContextInit(void) {
redisClusterContext *cc;
cc = hi_calloc(1, sizeof(redisClusterContext));
if (cc == NULL)
return NULL;
cc->max_retry_count = CLUSTER_DEFAULT_MAX_RETRY_COUNT;
return cc;
}
void redisClusterFree(redisClusterContext *cc) {
if (cc == NULL)
return;
if (cc->event_callback) {
cc->event_callback(cc, HIRCLUSTER_EVENT_FREE_CONTEXT,
cc->event_privdata);
}
if (cc->connect_timeout) {
hi_free(cc->connect_timeout);
cc->connect_timeout = NULL;
}
if (cc->command_timeout) {
hi_free(cc->command_timeout);
cc->command_timeout = NULL;
}
if (cc->table != NULL) {
hi_free(cc->table);
cc->table = NULL;
}
if (cc->nodes != NULL) {
/* Clear cc->nodes before releasing the dict since the release procedure
might access cc->nodes. When a node and its hiredis context are freed
all pending callbacks are executed. Clearing cc->nodes prevents a pending
slotmap update command callback to trigger additional slotmap updates. */
dict *nodes = cc->nodes;
cc->nodes = NULL;
dictRelease(nodes);
}
if (cc->requests != NULL) {
listRelease(cc->requests);
}
if (cc->username != NULL) {
hi_free(cc->username);
cc->username = NULL;
}
if (cc->password != NULL) {
hi_free(cc->password);
cc->password = NULL;
}
hi_free(cc);
}
/* Connect to a Redis cluster. On error the field error in the returned
* context will be set to the return value of the error function.
* When no set of reply functions is given, the default set will be used. */
static int _redisClusterConnect2(redisClusterContext *cc) {
if (cc->nodes == NULL || dictSize(cc->nodes) == 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"servers address does not set up");
return REDIS_ERR;
}
return redisClusterUpdateSlotmap(cc);
}
/* Connect to a Redis cluster. On error the field error in the returned
* context will be set to the return value of the error function.
* When no set of reply functions is given, the default set will be used. */
static redisClusterContext *_redisClusterConnect(redisClusterContext *cc,
const char *addrs) {
int ret;
ret = redisClusterSetOptionAddNodes(cc, addrs);
if (ret != REDIS_OK) {
return cc;
}
deps/hiredis-cluster/hircluster.c view on Meta::CPAN
int redisClusterSetOptionParseSlaves(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
cc->flags |= HIRCLUSTER_FLAG_ADD_SLAVE;
return REDIS_OK;
}
int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
cc->flags |= HIRCLUSTER_FLAG_ADD_OPENSLOT;
return REDIS_OK;
}
int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
cc->flags |= HIRCLUSTER_FLAG_ROUTE_USE_SLOTS;
return REDIS_OK;
}
int redisClusterSetOptionConnectTimeout(redisClusterContext *cc,
const struct timeval tv) {
if (cc == NULL) {
return REDIS_ERR;
}
if (cc->connect_timeout == NULL) {
cc->connect_timeout = hi_malloc(sizeof(struct timeval));
if (cc->connect_timeout == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
}
memcpy(cc->connect_timeout, &tv, sizeof(struct timeval));
return REDIS_OK;
}
int redisClusterSetOptionTimeout(redisClusterContext *cc,
const struct timeval tv) {
if (cc == NULL) {
return REDIS_ERR;
}
if (cc->command_timeout == NULL ||
cc->command_timeout->tv_sec != tv.tv_sec ||
cc->command_timeout->tv_usec != tv.tv_usec) {
if (cc->command_timeout == NULL) {
cc->command_timeout = hi_malloc(sizeof(struct timeval));
if (cc->command_timeout == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
}
memcpy(cc->command_timeout, &tv, sizeof(struct timeval));
/* Set timeout on already connected nodes */
if (cc->nodes && dictSize(cc->nodes) > 0) {
dictEntry *de;
redisClusterNode *node;
dictIterator di;
dictInitIterator(&di, cc->nodes);
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (node->acon) {
redisAsyncSetTimeout(node->acon, tv);
}
if (node->con && node->con->err == 0) {
redisSetTimeout(node->con, tv);
}
if (node->slaves && listLength(node->slaves) > 0) {
redisClusterNode *slave;
listNode *ln;
listIter li;
listRewind(node->slaves, &li);
while ((ln = listNext(&li)) != NULL) {
slave = listNodeValue(ln);
if (slave->acon) {
redisAsyncSetTimeout(slave->acon, tv);
}
if (slave->con && slave->con->err == 0) {
redisSetTimeout(slave->con, tv);
}
}
}
}
}
}
return REDIS_OK;
}
int redisClusterSetOptionMaxRetry(redisClusterContext *cc,
int max_retry_count) {
if (cc == NULL || max_retry_count <= 0) {
return REDIS_ERR;
}
cc->max_retry_count = max_retry_count;
return REDIS_OK;
}
int redisClusterConnect2(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
/* Clear a previously set shutdown flag since we allow a
* reconnection of an async context using this API (legacy). */
cc->flags &= ~HIRCLUSTER_FLAG_SHUTDOWN;
return _redisClusterConnect2(cc);
}
redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node) {
redisContext *c = NULL;
if (node == NULL) {
return NULL;
}
c = node->con;
if (c != NULL) {
if (c->err) {
redisReconnect(c);
if (cc->on_connect) {
cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
}
if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
}
authenticate(cc, c); // err and errstr handled in function
}
return c;
}
if (node->host == NULL || node->port <= 0) {
return NULL;
}
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, node->host, node->port);
options.connect_timeout = cc->connect_timeout;
options.command_timeout = cc->command_timeout;
c = redisConnectWithOptions(&options);
if (c == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return NULL;
}
if (cc->on_connect) {
cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
}
if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
redisFree(c);
return NULL;
}
if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
redisFree(c);
return NULL;
}
if (authenticate(cc, c) != REDIS_OK) {
redisFree(c);
return NULL;
}
node->con = c;
return c;
}
static redisClusterNode *node_get_by_table(redisClusterContext *cc,
uint32_t slot_num) {
if (cc == NULL) {
return NULL;
}
if (slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "invalid slot");
return NULL;
}
if (cc->table == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "slotmap not available");
return NULL;
}
if (cc->table[slot_num] == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"slot not served by any node");
return NULL;
}
return cc->table[slot_num];
}
/* Helper function for the redisClusterAppendCommand* family of functions.
*
deps/hiredis-cluster/hircluster.c view on Meta::CPAN
static cluster_async_data *cluster_async_data_create(void) {
/* use calloc to guarantee all fields are zeroed */
return hi_calloc(1, sizeof(cluster_async_data));
}
static void cluster_async_data_free(cluster_async_data *cad) {
if (cad == NULL) {
return;
}
command_destroy(cad->command);
hi_free(cad);
}
static void unlinkAsyncContextAndNode(void *data) {
redisClusterNode *node;
if (data) {
node = (redisClusterNode *)(data);
node->acon = NULL;
}
}
redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc,
redisClusterNode *node) {
redisAsyncContext *ac;
int ret;
if (node == NULL) {
return NULL;
}
ac = node->acon;
if (ac != NULL) {
if (ac->c.err == 0) {
return ac;
} else {
/* The cluster node has a hiredis context with errors. Hiredis
* will asynchronously destruct the context and unlink it from
* the cluster node object. Return an error until done.
* An example scenario is when sending a command from a command
* callback, which has a NULL reply due to a disconnect. */
__redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
return NULL;
}
}
// No async context exists, perform a connect
if (node->host == NULL || node->port <= 0) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"node host or port is error");
return NULL;
}
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, node->host, node->port);
options.connect_timeout = acc->cc->connect_timeout;
options.command_timeout = acc->cc->command_timeout;
node->lastConnectionAttempt = hi_usec_now();
ac = redisAsyncConnectWithOptions(&options);
if (ac == NULL) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OOM, "Out of memory");
return NULL;
}
if (ac->err) {
__redisClusterAsyncSetError(acc, ac->err, ac->errstr);
redisAsyncFree(ac);
return NULL;
}
if (acc->cc->ssl &&
acc->cc->ssl_init_fn(&ac->c, acc->cc->ssl) != REDIS_OK) {
__redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
redisAsyncFree(ac);
return NULL;
}
// Authenticate when needed
if (acc->cc->password != NULL) {
if (acc->cc->username != NULL) {
ret = redisAsyncCommand(ac, NULL, NULL, "AUTH %s %s",
acc->cc->username, acc->cc->password);
} else {
ret =
redisAsyncCommand(ac, NULL, NULL, "AUTH %s", acc->cc->password);
}
if (ret != REDIS_OK) {
__redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
redisAsyncFree(ac);
return NULL;
}
}
if (acc->adapter) {
ret = acc->attach_fn(ac, acc->adapter);
if (ret != REDIS_OK) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"Failed to attach event adapter");
redisAsyncFree(ac);
return NULL;
}
}
if (acc->onConnect) {
redisAsyncSetConnectCallback(ac, acc->onConnect);
}
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
else if (acc->onConnectNC) {
redisAsyncSetConnectCallbackNC(ac, acc->onConnectNC);
}
#endif
if (acc->onDisconnect) {
redisAsyncSetDisconnectCallback(ac, acc->onDisconnect);
( run in 0.883 second using v1.01-cache-2.11-cpan-39bf76dae61 )