-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathwebsocket_manager.py
More file actions
175 lines (138 loc) · 5.71 KB
/
websocket_manager.py
File metadata and controls
175 lines (138 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
WebSocket manager for real-time updates in GGUF Forge.
"""
import json
import asyncio
import logging
from datetime import datetime
from typing import Dict, Set, Any
from fastapi import WebSocket, WebSocketDisconnect
logger = logging.getLogger("GGUF_Forge")
def json_serializer(obj):
"""JSON serializer for objects not serializable by default json code."""
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
class ConnectionManager:
"""Manages WebSocket connections and broadcasts updates to connected clients."""
def __init__(self):
# Active connections by channel
self.active_connections: Dict[str, Set[WebSocket]] = {
"models": set(), # Model conversion status updates
"requests": set(), # Request updates (admin)
"tickets": set(), # Ticket updates
"my_requests": set(), # User's own requests
}
# Lock for thread-safe operations
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, channels: list = None):
"""Accept connection and add to specified channels."""
await websocket.accept()
async with self._lock:
if channels is None:
channels = ["models"] # Default channel
for channel in channels:
if channel in self.active_connections:
self.active_connections[channel].add(websocket)
logger.debug(f"WebSocket connected to channels: {channels}")
async def disconnect(self, websocket: WebSocket):
"""Remove connection from all channels."""
async with self._lock:
for channel in self.active_connections.values():
channel.discard(websocket)
async def broadcast(self, channel: str, data: Any):
"""Broadcast message to all connections in a channel."""
if channel not in self.active_connections:
return
message = json.dumps({"channel": channel, "data": data}, default=json_serializer)
dead_connections = set()
async with self._lock:
connections = self.active_connections[channel].copy()
for connection in connections:
try:
await connection.send_text(message)
except Exception:
dead_connections.add(connection)
# Clean up dead connections
if dead_connections:
async with self._lock:
for channel_set in self.active_connections.values():
channel_set -= dead_connections
async def broadcast_all(self, data: Any):
"""Broadcast to all channels (for general updates)."""
for channel in self.active_connections.keys():
await self.broadcast(channel, data)
def get_connection_count(self, channel: str = None) -> int:
"""Get number of active connections."""
if channel:
return len(self.active_connections.get(channel, set()))
return sum(len(conns) for conns in self.active_connections.values())
# Global connection manager instance
manager = ConnectionManager()
async def broadcast_model_update(model_data: dict):
"""Broadcast model status update to all connected clients."""
await manager.broadcast("models", {
"type": "model_update",
"model": model_data
})
async def broadcast_models_list(models: list):
"""Broadcast full models list update."""
await manager.broadcast("models", {
"type": "models_list",
"models": models
})
async def broadcast_requests_update():
"""Broadcast pending requests list to admin clients.
Sends actual data instead of just a signal to avoid HTTP refetch.
"""
from database import get_db_connection
conn = await get_db_connection()
await conn.execute("SELECT * FROM requests WHERE status = 'pending' ORDER BY created_at DESC")
requests = await conn.fetchall()
await conn.close()
await manager.broadcast("requests", {
"type": "requests_list",
"requests": [r.to_dict() for r in requests]
})
async def broadcast_tickets_update():
"""Broadcast open tickets list to admin clients.
Sends actual data instead of just a signal to avoid HTTP refetch.
"""
from database import get_db_connection
conn = await get_db_connection()
await conn.execute("""
SELECT t.*, r.hf_repo_id, r.requested_by
FROM tickets t
JOIN requests r ON t.request_id = r.id
WHERE t.status = 'open'
ORDER BY t.created_at DESC
""")
tickets = await conn.fetchall()
await conn.close()
await manager.broadcast("tickets", {
"type": "tickets_list",
"tickets": [t.to_dict() for t in tickets]
})
async def broadcast_my_requests_update():
"""Signal clients to refresh their own requests.
Note: This one still sends a signal because user-specific requests
require knowing the current user context which isn't available here.
"""
await manager.broadcast("my_requests", {
"type": "my_requests_update"
})
async def broadcast_transfer_progress(model_id: str, transfer_type: str, files: list):
"""
Broadcast download/upload progress for a model.
Args:
model_id: The model job ID
transfer_type: 'download' or 'upload'
files: List of dicts with file progress info:
[{"name": "file.safetensors", "progress": 45, "speed": "12.5MB/s", "size": "5.00G"}]
"""
await manager.broadcast("models", {
"type": "transfer_progress",
"model_id": model_id,
"transfer_type": transfer_type,
"files": files
})