-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgame_locks.py
More file actions
42 lines (33 loc) · 1.35 KB
/
game_locks.py
File metadata and controls
42 lines (33 loc) · 1.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""Game locks for preventing race conditions."""
import asyncio
from typing import Dict
import logging
logger = logging.getLogger(__name__)
class GameLockManager:
"""Manages locks for game sessions to prevent race conditions."""
def __init__(self):
self._locks: Dict[int, asyncio.Lock] = {}
self._lock_creation_lock = asyncio.Lock()
async def get_lock(self, game_id: int) -> asyncio.Lock:
"""Get or create a lock for a game."""
if game_id not in self._locks:
async with self._lock_creation_lock:
if game_id not in self._locks:
self._locks[game_id] = asyncio.Lock()
return self._locks[game_id]
async def acquire(self, game_id: int):
"""Acquire lock for a game."""
lock = await self.get_lock(game_id)
await lock.acquire()
logger.debug(f"Lock acquired for game {game_id}")
def release(self, game_id: int):
"""Release lock for a game."""
if game_id in self._locks:
self._locks[game_id].release()
logger.debug(f"Lock released for game {game_id}")
async def cleanup_unused_locks(self):
"""Clean up locks for finished games."""
# In production, periodically clean up locks for finished games
pass
# Global lock manager
lock_manager = GameLockManager()