Skip to content

Commit fc8a736

Browse files
committed
cachedb_redis: add ASK redirect handling for cluster resharding
Add support for Redis ASK redirects during cluster resharding. When a slot is being migrated between nodes, Redis returns an ASK response instead of MOVED. Unlike MOVED (permanent redirect), ASK is a one-time redirect that requires sending the ASKING command to the target node before retrying the original query. The implementation: - Detects ASK responses alongside existing MOVED handling - Sends ASKING command to the target node before retrying - Reuses the MOVED redirect infrastructure (endpoint lookup, reconnection, retry logic) Also refactor parse_moved_reply() into parse_redirect_reply() that accepts the prefix as a parameter, with inline wrappers parse_moved_reply() and parse_ask_reply() for backward compatibility. Partially addresses #2811
1 parent 4748fcc commit fc8a736

File tree

3 files changed

+97
-49
lines changed

3 files changed

+97
-49
lines changed

modules/cachedb_redis/cachedb_redis_dbase.c

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -564,48 +564,83 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke
564564
reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE",
565565
node->context->errstr);
566566

567-
if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) {
568-
// It's a MOVED response
567+
if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN) ||
568+
match_prefix(reply->str, reply->len, ASK_PREFIX, ASK_PREFIX_LEN)) {
569+
int is_ask = match_prefix(reply->str, reply->len,
570+
ASK_PREFIX, ASK_PREFIX_LEN);
569571
redis_moved *moved_info = pkg_malloc(sizeof(redis_moved));
570-
if (!moved_info) {
571-
LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n");
572-
freeReplyObject(reply);
573-
reply = NULL;
574-
goto try_next_con;
575-
} else {
576-
if (parse_moved_reply(reply, moved_info) < 0) {
577-
LM_ERR("cachedb_redis: Unable to parse MOVED reply\n");
578-
pkg_free(moved_info);
579-
moved_info = NULL;
580-
freeReplyObject(reply);
581-
goto try_next_con;
582-
}
583-
584-
LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port);
585-
node = get_redis_connection_by_endpoint(con, moved_info);
572+
if (!moved_info) {
573+
LM_ERR("Unable to allocate redis_moved struct,"
574+
" no more pkg memory\n");
575+
freeReplyObject(reply);
576+
reply = NULL;
577+
goto try_next_con;
578+
}
586579

580+
if ((is_ask ?
581+
parse_ask_reply(reply, moved_info) :
582+
parse_moved_reply(reply, moved_info)) < 0) {
583+
LM_ERR("Unable to parse %s reply\n",
584+
is_ask ? "ASK" : "MOVED");
587585
pkg_free(moved_info);
588-
moved_info = NULL;
589586
freeReplyObject(reply);
590-
reply = NULL;
587+
goto try_next_con;
588+
}
591589

592-
if (node == NULL) {
593-
LM_ERR("Unable to locate connection by endpoint\n");
594-
last_err = -10;
590+
LM_DBG("%s slot: [%d] endpoint: [%.*s]"
591+
" port: [%d]\n",
592+
is_ask ? "ASK" : "MOVED",
593+
moved_info->slot,
594+
moved_info->endpoint.len,
595+
moved_info->endpoint.s,
596+
moved_info->port);
597+
node = get_redis_connection_by_endpoint(
598+
con, moved_info);
599+
600+
pkg_free(moved_info);
601+
freeReplyObject(reply);
602+
reply = NULL;
603+
604+
if (node == NULL) {
605+
LM_ERR("Unable to locate connection"
606+
" by endpoint\n");
607+
last_err = -10;
608+
goto try_next_con;
609+
}
610+
611+
if (node->context == NULL) {
612+
if (redis_reconnect_node(con,node) < 0) {
613+
LM_ERR("Unable to reconnect to"
614+
" node %s:%d\n",
615+
node->ip, node->port);
616+
last_err = -1;
595617
goto try_next_con;
596618
}
619+
}
597620

598-
if (node->context == NULL) {
599-
if (redis_reconnect_node(con,node) < 0) {
600-
LM_ERR("Unable to reconnect to node %p endpoint: %s:%d\n", node, node->ip, node->port);
601-
last_err = -1;
602-
goto try_next_con;
603-
}
621+
if (is_ask) {
622+
/* ASK requires sending ASKING before
623+
* retrying the command on the new node */
624+
redisReply *asking_reply;
625+
asking_reply = redisCommand(
626+
node->context, "ASKING");
627+
if (!asking_reply ||
628+
asking_reply->type ==
629+
REDIS_REPLY_ERROR) {
630+
LM_ERR("ASKING command failed"
631+
" on %s:%d\n",
632+
node->ip, node->port);
633+
if (asking_reply)
634+
freeReplyObject(
635+
asking_reply);
636+
last_err = -1;
637+
goto try_next_con;
604638
}
605-
606-
i = QUERY_ATTEMPTS; // New node that is the target being MOVED to, should have the attempts reset
607-
continue;
639+
freeReplyObject(asking_reply);
608640
}
641+
642+
i = QUERY_ATTEMPTS;
643+
continue;
609644
}
610645

611646
freeReplyObject(reply);

modules/cachedb_redis/cachedb_redis_utils.c

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -348,54 +348,55 @@ int build_cluster_nodes(redis_con *con,char *info,int size)
348348
}
349349

350350
/*
351-
When Redis is operating as a cluster, it is possible (very likely)
352-
that a MOVED redirection will be returned by the Redis nodes that
353-
received the request. The general format of the reply from Redis is:
354-
MOVED slot [IP|FQDN]:port
351+
When Redis is operating as a cluster, MOVED or ASK redirections may
352+
be returned by the Redis nodes that received the request. The
353+
general format of both redirect replies is:
354+
MOVED|ASK slot [IP|FQDN]:port
355355
356-
This routine will parse the Redis MOVED reply into its components.
356+
This routine parses a redirect reply into its components given
357+
the expected prefix (e.g. "MOVED " or "ASK ").
357358
Note that the redisReply struct MUST be released outside of this routine
358359
to avoid a memory leak. The out->endpoint pointer must not be used after
359360
the redisReply has been released.
360361
361362
The parsed data is stored into the following redis_moved struct:
362-
363+
363364
typedef struct {
364365
int slot;
365366
const_str endpoint;
366367
int port;
367368
} redis_moved;
368369
369370
*/
370-
int parse_moved_reply(redisReply *reply, redis_moved *out) {
371-
int i;
371+
int parse_redirect_reply(redisReply *reply, redis_moved *out,
372+
const char *prefix, size_t prefix_len) {
373+
size_t i;
372374
int slot = 0;
373375
const char *p;
374376
const char *end;
375377
const char *host_start;
376378
const char *colon = NULL;
377379
const char *port_start;
378-
int port = REDIS_DF_PORT; // Default to Redis standard port
380+
int port = REDIS_DF_PORT;
379381

380-
if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out)
382+
if (!reply || !reply->str || (size_t)reply->len < prefix_len || !out)
381383
return ERR_INVALID_REPLY;
382384

383385
p = reply->str;
384386
end = reply->str + reply->len;
385387

386-
for (i = 0; i < MOVED_PREFIX_LEN; ++i) {
387-
if (p[i] != MOVED_PREFIX[i]) {
388-
return ERR_INVALID_REPLY;
389-
}
388+
for (i = 0; i < prefix_len; ++i) {
389+
if (p[i] != prefix[i])
390+
return ERR_INVALID_REPLY;
390391
}
391-
p += MOVED_PREFIX_LEN;
392+
p += prefix_len;
392393

393394
// Parse slot number
394395
while (p < end && *p >= '0' && *p <= '9') {
395396
slot = slot * 10 + (*p - '0');
396397
p++;
397398
}
398-
if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9'))
399+
if (slot == 0 && (p == reply->str + prefix_len || *(p - 1) < '0' || *(p - 1) > '9'))
399400
return ERR_INVALID_SLOT;
400401

401402
// Skip spaces

modules/cachedb_redis/cachedb_redis_utils.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
#define MOVED_PREFIX "MOVED "
3232
#define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1)
3333

34+
#define ASK_PREFIX "ASK "
35+
#define ASK_PREFIX_LEN (sizeof(ASK_PREFIX) - 1)
36+
3437
#define ERR_INVALID_REPLY -1
3538
#define ERR_INVALID_SLOT -2
3639
#define ERR_INVALID_PORT -3
@@ -41,7 +44,16 @@ int build_cluster_nodes(redis_con *con,char *info,int size);
4144
cluster_node *get_redis_connection(redis_con *con,str *key);
4245
cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info);
4346
void destroy_cluster_nodes(redis_con *con);
44-
int parse_moved_reply(redisReply *reply, redis_moved *out);
47+
int parse_redirect_reply(redisReply *reply, redis_moved *out,
48+
const char *prefix, size_t prefix_len);
49+
50+
static inline int parse_moved_reply(redisReply *reply, redis_moved *out) {
51+
return parse_redirect_reply(reply, out, MOVED_PREFIX, MOVED_PREFIX_LEN);
52+
}
53+
54+
static inline int parse_ask_reply(redisReply *reply, redis_moved *out) {
55+
return parse_redirect_reply(reply, out, ASK_PREFIX, ASK_PREFIX_LEN);
56+
}
4557

4658
static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) {
4759
size_t i;

0 commit comments

Comments
 (0)