Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions core/redis_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def __init__(self, bot, mineflayer_bot):
self._response_waiters: dict[str, asyncio.Future] = {}
self.redis: redis.Redis = None
self._restart: bool = True
self._reconnect_delay: int = 30
self._max_reconnect_delay: int = 600

@property
def running(self):
Expand Down Expand Up @@ -132,6 +134,7 @@ async def reader(self):
channel = self.recieve_channel + ":" + self.client_name
await pubsub.subscribe(channel)
print(f"{Color.MAGENTA}Redis{Color.RESET} > Subscribed to {channel}")
self._reconnect_delay = 5
while (not self.bot.is_closed()) and self.running:
try:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=10)
Expand Down Expand Up @@ -181,9 +184,10 @@ async def reader(self):
)
except asyncio.CancelledError:
print(f"{Color.MAGENTA}Redis{Color.RESET} > Task Cancelled")
except redis.ConnectionError:
print(f"{Color.MAGENTA}Redis{Color.RESET} > Redis connection closed")
traceback.print_exc()
self._restart = False
raise
except redis.ConnectionError as e:
print(f"{Color.MAGENTA}Redis{Color.RESET} > Redis connection error: {e}")
except Exception as e: # pylint: disable=broad-exception-caught
print(f"{Color.MAGENTA}Redis{Color.RESET} > Critical error occurred\n" + str(e))
traceback.print_exc()
Expand All @@ -201,12 +205,29 @@ async def reader(self):
await self.bot.send_debug_message(content)
finally:
if self.redis is not None:
await self.close(restart=self._restart)
try:
await self.redis.close()
except Exception:
pass
self.redis = None
print(f"{Color.MAGENTA}Redis{Color.RESET} > Connection closed")
if self._restart:
await asyncio.sleep(5)
print(f"{Color.MAGENTA}Redis{Color.RESET} > Restarting...")
delay = self._reconnect_delay
print(f"{Color.MAGENTA}Redis{Color.RESET} > Reconnecting in {delay}s...")
loop = asyncio.get_event_loop()
loop.call_later(delay, lambda: asyncio.create_task(self._restart_reader()))
self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)

async def _restart_reader(self):
if not self._restart:
return
if self.running:
return
try:
await self.start()
except Exception as e:
print(f"{Color.MAGENTA}Redis{Color.RESET} > Failed to restart reader: {e}")
traceback.print_exc()

async def send_message(self, **data) -> str:
if self.redis is None:
Expand Down Expand Up @@ -241,8 +262,15 @@ async def request(self, endpoint: str, **data):

async def close(self, *, restart: bool = False):
self._restart = restart
await self.redis.close()
self.read_task.cancel()
if self.redis is not None:
try:
await self.redis.close()
except Exception:
pass
if self.read_task is not None and not self.read_task.done():
current = asyncio.current_task()
if current is not self.read_task:
self.read_task.cancel()

@classmethod
async def create(cls, discord_bot, mineflayer_bot):
Expand Down
Loading