Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions redis/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,14 @@ def parse_oss_maintenance_start_msg(response):
@staticmethod
def parse_oss_maintenance_completed_msg(response):
# Expected message format is:
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
# SMIGRATED <seq_number> [<host:port> <slot, range1-range2,...>, ...]
id = response[1]
node_address = safe_str(response[2])
slots = response[3]
nodes_to_slots_mapping_data = response[2]
nodes_to_slots_mapping = {}
for node, slots in nodes_to_slots_mapping_data:
nodes_to_slots_mapping[safe_str(node)] = safe_str(slots)

return OSSNodeMigratedNotification(id, node_address, slots)
return OSSNodeMigratedNotification(id, nodes_to_slots_mapping)

@staticmethod
def parse_maintenance_start_msg(response, notification_type):
Expand Down
24 changes: 12 additions & 12 deletions redis/maint_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import threading
import time
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, List, Literal, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union

from redis.typing import Number

Expand Down Expand Up @@ -463,31 +463,26 @@ class OSSNodeMigratedNotification(MaintenanceNotification):

Args:
id (int): Unique identifier for this notification
node_address (Optional[str]): Address of the node that has completed migration
in the format "host:port"
slots (Optional[List[int]]): List of slots that have been migrated
nodes_to_slots_mapping (Dict[str, str]): Mapping of node addresses to slots
"""

DEFAULT_TTL = 30

def __init__(
self,
id: int,
node_address: str,
slots: Optional[List[int]] = None,
nodes_to_slots_mapping: Dict[str, str],
):
super().__init__(id, OSSNodeMigratedNotification.DEFAULT_TTL)
self.node_address = node_address
self.slots = slots
self.nodes_to_slots_mapping = nodes_to_slots_mapping

def __repr__(self) -> str:
expiry_time = self.creation_time + self.ttl
remaining = max(0, expiry_time - time.monotonic())
return (
f"{self.__class__.__name__}("
f"id={self.id}, "
f"node_address={self.node_address}, "
f"slots={self.slots}, "
f"nodes_to_slots_mapping={self.nodes_to_slots_mapping}, "
f"ttl={self.ttl}, "
f"creation_time={self.creation_time}, "
f"expires_at={expiry_time}, "
Expand Down Expand Up @@ -999,10 +994,15 @@ def handle_oss_maintenance_completed_notification(

# Updates the cluster slots cache with the new slots mapping
# This will also update the nodes cache with the new nodes mapping
new_node_host, new_node_port = notification.node_address.split(":")
additional_startup_nodes_info = []
for node_address, _ in notification.nodes_to_slots_mapping.items():
new_node_host, new_node_port = node_address.split(":")
additional_startup_nodes_info.append(
(new_node_host, int(new_node_port))
)
self.cluster_client.nodes_manager.initialize(
disconnect_startup_nodes_pools=False,
additional_startup_nodes_info=[(new_node_host, int(new_node_port))],
additional_startup_nodes_info=additional_startup_nodes_info,
)
# mark for reconnect all in use connections to the node - this will force them to
# disconnect after they complete their current commands
Expand Down
70 changes: 42 additions & 28 deletions tests/maint_notifications/proxy_server_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,51 @@ class RespTranslator:
"""Helper class to translate between RESP and other encodings."""

@staticmethod
def str_or_list_to_resp(txt: str) -> str:
"""
Convert specific string or list to RESP format.
"""
if re.match(r"^<.*>$", txt):
items = txt[1:-1].split(",")
return f"*{len(items)}\r\n" + "\r\n".join(
f"${len(x)}\r\n{x}" for x in items
def oss_maint_notification_to_resp(txt: str) -> str:
"""Convert query to RESP format."""
if txt.startswith("SMIGRATED"):
# Format: SMIGRATED SeqID host:port slot1,range1-range2 host1:port1 slot2,range3-range4
# SMIGRATED 93923 abc.com:6789 123,789-1000 abc.com:4545 1000-2000 abc.com:4323 900,910,920
# SMIGRATED - simple string
# SeqID - integer
# host and slots info are provided as array of arrays
# host:port - simple string
# slots - simple string

parts = txt.split()
notification = parts[0]
seq_id = parts[1]
hosts_and_slots = parts[2:]
resp = (
">3\r\n" # Push message with 3 elements
f"+{notification}\r\n" # Element 1: Command
f":{seq_id}\r\n" # Element 2: SeqID
f"*{len(hosts_and_slots) // 2}\r\n" # Element 3: Array of host:port, slots pairs
)
for i in range(0, len(hosts_and_slots), 2):
resp += "*2\r\n"
resp += f"+{hosts_and_slots[i]}\r\n"
resp += f"+{hosts_and_slots[i + 1]}\r\n"
else:
return f"${len(txt)}\r\n{txt}"

@staticmethod
def cluster_slots_to_resp(resp: str) -> str:
"""Convert query to RESP format."""
return (
f"*{len(resp.split())}\r\n"
+ "\r\n".join(f"${len(x)}\r\n{x}" for x in resp.split())
+ "\r\n"
)

@staticmethod
def oss_maint_notification_to_resp(resp: str) -> str:
"""Convert query to RESP format."""
return (
f">{len(resp.split())}\r\n"
+ "\r\n".join(
f"{RespTranslator.str_or_list_to_resp(x)}" for x in resp.split()
# SMIGRATING
# Format: SMIGRATING SeqID slot,range1-range2
# SMIGRATING 93923 123,789-1000
# SMIGRATING - simple string
# SeqID - integer
# slots - simple string

parts = txt.split()
notification = parts[0]
seq_id = parts[1]
slots = parts[2]

resp = (
">3\r\n" # Push message with 3 elements
f"+{notification}\r\n" # Element 1: Command
f":{seq_id}\r\n" # Element 2: SeqID
f"+{slots}\r\n" # Element 3: Array of [host:port, slots] pairs
)
+ "\r\n"
)
return resp


@dataclass
Expand Down
Loading