diff --git a/core/redis_handler.py b/core/redis_handler.py index e2c1726..f534cec 100644 --- a/core/redis_handler.py +++ b/core/redis_handler.py @@ -19,6 +19,8 @@ def __init__(self, bot, mineflayer_bot): self._response_waiters: dict[str, asyncio.Future] = {} self.redis: redis.Redis = None self._restart: bool = True + self._reconnect_delay: int = 30 + self._max_reconnect_delay: int = 600 @property def running(self): @@ -132,6 +134,7 @@ async def reader(self): channel = self.recieve_channel + ":" + self.client_name await pubsub.subscribe(channel) print(f"{Color.MAGENTA}Redis{Color.RESET} > Subscribed to {channel}") + self._reconnect_delay = 5 while (not self.bot.is_closed()) and self.running: try: message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=10) @@ -181,9 +184,10 @@ async def reader(self): ) except asyncio.CancelledError: print(f"{Color.MAGENTA}Redis{Color.RESET} > Task Cancelled") - except redis.ConnectionError: - print(f"{Color.MAGENTA}Redis{Color.RESET} > Redis connection closed") - traceback.print_exc() + self._restart = False + raise + except redis.ConnectionError as e: + print(f"{Color.MAGENTA}Redis{Color.RESET} > Redis connection error: {e}") except Exception as e: # pylint: disable=broad-exception-caught print(f"{Color.MAGENTA}Redis{Color.RESET} > Critical error occurred\n" + str(e)) traceback.print_exc() @@ -201,12 +205,29 @@ async def reader(self): await self.bot.send_debug_message(content) finally: if self.redis is not None: - await self.close(restart=self._restart) + try: + await self.redis.close() + except Exception: + pass + self.redis = None print(f"{Color.MAGENTA}Redis{Color.RESET} > Connection closed") if self._restart: - await asyncio.sleep(5) - print(f"{Color.MAGENTA}Redis{Color.RESET} > Restarting...") + delay = self._reconnect_delay + print(f"{Color.MAGENTA}Redis{Color.RESET} > Reconnecting in {delay}s...") + loop = asyncio.get_event_loop() + loop.call_later(delay, lambda: asyncio.create_task(self._restart_reader())) + self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay) + + async def _restart_reader(self): + if not self._restart: + return + if self.running: + return + try: await self.start() + except Exception as e: + print(f"{Color.MAGENTA}Redis{Color.RESET} > Failed to restart reader: {e}") + traceback.print_exc() async def send_message(self, **data) -> str: if self.redis is None: @@ -241,8 +262,15 @@ async def request(self, endpoint: str, **data): async def close(self, *, restart: bool = False): self._restart = restart - await self.redis.close() - self.read_task.cancel() + if self.redis is not None: + try: + await self.redis.close() + except Exception: + pass + if self.read_task is not None and not self.read_task.done(): + current = asyncio.current_task() + if current is not self.read_task: + self.read_task.cancel() @classmethod async def create(cls, discord_bot, mineflayer_bot):