Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 67 additions & 32 deletions modules/cachedb_redis/cachedb_redis_dbase.c
Original file line number Diff line number Diff line change
Expand Up @@ -564,48 +564,83 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke
reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE",
node->context->errstr);

if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) {
// It's a MOVED response
if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN) ||
match_prefix(reply->str, reply->len, ASK_PREFIX, ASK_PREFIX_LEN)) {
int is_ask = match_prefix(reply->str, reply->len,
ASK_PREFIX, ASK_PREFIX_LEN);
redis_moved *moved_info = pkg_malloc(sizeof(redis_moved));
if (!moved_info) {
LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n");
freeReplyObject(reply);
reply = NULL;
goto try_next_con;
} else {
if (parse_moved_reply(reply, moved_info) < 0) {
LM_ERR("cachedb_redis: Unable to parse MOVED reply\n");
pkg_free(moved_info);
moved_info = NULL;
freeReplyObject(reply);
goto try_next_con;
}

LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port);
node = get_redis_connection_by_endpoint(con, moved_info);
if (!moved_info) {
LM_ERR("Unable to allocate redis_moved struct,"
" no more pkg memory\n");
freeReplyObject(reply);
reply = NULL;
goto try_next_con;
}

if ((is_ask ?
parse_ask_reply(reply, moved_info) :
parse_moved_reply(reply, moved_info)) < 0) {
LM_ERR("Unable to parse %s reply\n",
is_ask ? "ASK" : "MOVED");
pkg_free(moved_info);
moved_info = NULL;
freeReplyObject(reply);
reply = NULL;
goto try_next_con;
}

if (node == NULL) {
LM_ERR("Unable to locate connection by endpoint\n");
last_err = -10;
LM_DBG("%s slot: [%d] endpoint: [%.*s]"
" port: [%d]\n",
is_ask ? "ASK" : "MOVED",
moved_info->slot,
moved_info->endpoint.len,
moved_info->endpoint.s,
moved_info->port);
node = get_redis_connection_by_endpoint(
con, moved_info);

pkg_free(moved_info);
freeReplyObject(reply);
reply = NULL;

if (node == NULL) {
LM_ERR("Unable to locate connection"
" by endpoint\n");
last_err = -10;
goto try_next_con;
}

if (node->context == NULL) {
if (redis_reconnect_node(con,node) < 0) {
LM_ERR("Unable to reconnect to"
" node %s:%d\n",
node->ip, node->port);
last_err = -1;
goto try_next_con;
}
}

if (node->context == NULL) {
if (redis_reconnect_node(con,node) < 0) {
LM_ERR("Unable to reconnect to node %p endpoint: %s:%d\n", node, node->ip, node->port);
last_err = -1;
goto try_next_con;
}
if (is_ask) {
/* ASK requires sending ASKING before
* retrying the command on the new node */
redisReply *asking_reply;
asking_reply = redisCommand(
node->context, "ASKING");
if (!asking_reply ||
asking_reply->type ==
REDIS_REPLY_ERROR) {
LM_ERR("ASKING command failed"
" on %s:%d\n",
node->ip, node->port);
if (asking_reply)
freeReplyObject(
asking_reply);
last_err = -1;
goto try_next_con;
}

i = QUERY_ATTEMPTS; // New node that is the target being MOVED to, should have the attempts reset
continue;
freeReplyObject(asking_reply);
}

i = QUERY_ATTEMPTS;
continue;
}

freeReplyObject(reply);
Expand Down
33 changes: 17 additions & 16 deletions modules/cachedb_redis/cachedb_redis_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,54 +348,55 @@ int build_cluster_nodes(redis_con *con,char *info,int size)
}

/*
When Redis is operating as a cluster, it is possible (very likely)
that a MOVED redirection will be returned by the Redis nodes that
received the request. The general format of the reply from Redis is:
MOVED slot [IP|FQDN]:port
When Redis is operating as a cluster, MOVED or ASK redirections may
be returned by the Redis nodes that received the request. The
general format of both redirect replies is:
MOVED|ASK slot [IP|FQDN]:port

This routine will parse the Redis MOVED reply into its components.
This routine parses a redirect reply into its components given
the expected prefix (e.g. "MOVED " or "ASK ").
Note that the redisReply struct MUST be released outside of this routine
to avoid a memory leak. The out->endpoint pointer must not be used after
the redisReply has been released.

The parsed data is stored into the following redis_moved struct:

typedef struct {
int slot;
const_str endpoint;
int port;
} redis_moved;

*/
int parse_moved_reply(redisReply *reply, redis_moved *out) {
int i;
int parse_redirect_reply(redisReply *reply, redis_moved *out,
const char *prefix, size_t prefix_len) {
size_t i;
int slot = 0;
const char *p;
const char *end;
const char *host_start;
const char *colon = NULL;
const char *port_start;
int port = REDIS_DF_PORT; // Default to Redis standard port
int port = REDIS_DF_PORT;

if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out)
if (!reply || !reply->str || (size_t)reply->len < prefix_len || !out)
return ERR_INVALID_REPLY;

p = reply->str;
end = reply->str + reply->len;

for (i = 0; i < MOVED_PREFIX_LEN; ++i) {
if (p[i] != MOVED_PREFIX[i]) {
return ERR_INVALID_REPLY;
}
for (i = 0; i < prefix_len; ++i) {
if (p[i] != prefix[i])
return ERR_INVALID_REPLY;
}
p += MOVED_PREFIX_LEN;
p += prefix_len;

// Parse slot number
while (p < end && *p >= '0' && *p <= '9') {
slot = slot * 10 + (*p - '0');
p++;
}
if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9'))
if (slot == 0 && (p == reply->str + prefix_len || *(p - 1) < '0' || *(p - 1) > '9'))
return ERR_INVALID_SLOT;

// Skip spaces
Expand Down
14 changes: 13 additions & 1 deletion modules/cachedb_redis/cachedb_redis_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#define MOVED_PREFIX "MOVED "
#define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1)

#define ASK_PREFIX "ASK "
#define ASK_PREFIX_LEN (sizeof(ASK_PREFIX) - 1)

#define ERR_INVALID_REPLY -1
#define ERR_INVALID_SLOT -2
#define ERR_INVALID_PORT -3
Expand All @@ -41,7 +44,16 @@ int build_cluster_nodes(redis_con *con,char *info,int size);
cluster_node *get_redis_connection(redis_con *con,str *key);
cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info);
void destroy_cluster_nodes(redis_con *con);
int parse_moved_reply(redisReply *reply, redis_moved *out);
int parse_redirect_reply(redisReply *reply, redis_moved *out,
const char *prefix, size_t prefix_len);

static inline int parse_moved_reply(redisReply *reply, redis_moved *out) {
return parse_redirect_reply(reply, out, MOVED_PREFIX, MOVED_PREFIX_LEN);
}

static inline int parse_ask_reply(redisReply *reply, redis_moved *out) {
return parse_redirect_reply(reply, out, ASK_PREFIX, ASK_PREFIX_LEN);
}

static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) {
size_t i;
Expand Down