-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_analytics.py
More file actions
338 lines (284 loc) · 11.2 KB
/
redis_analytics.py
File metadata and controls
338 lines (284 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
"""
Redis Analytics Service
Tracks passage attempts, word difficulty, hint usage, and gameplay statistics
"""
import json
import os
import logging
from datetime import datetime
from typing import List, Dict, Optional
import redis
from redis.backoff import ExponentialBackoff
from redis.retry import Retry
logger = logging.getLogger(__name__)
class RedisAnalyticsService:
"""
Service for tracking gameplay analytics using Redis.
Uses Streams for time-series data and Sorted Sets for aggregates.
"""
# Redis keys
STREAM_KEY = "cloze:analytics:stream"
WORDS_FIRST_TRY = "cloze:analytics:words:first_try"
WORDS_RETRY = "cloze:analytics:words:needed_retry"
BOOKS_KEY = "cloze:analytics:books"
SESSIONS_KEY = "cloze:analytics:sessions"
MAX_STREAM_LEN = 10000 # Keep last 10k entries
def __init__(self, redis_url: Optional[str] = None):
"""
Initialize Redis Analytics Service
Args:
redis_url: Redis connection URL (default: REDIS_URL env var)
"""
self.redis_url = redis_url or self._resolve_url()
self.redis_client: Optional[redis.Redis] = None
self._connect()
@staticmethod
def _resolve_url() -> Optional[str]:
"""Try multiple Railway Redis URL env vars"""
for var in ["REDIS_URL", "REDIS_PUBLIC_URL", "REDIS_PRIVATE_URL"]:
url = os.getenv(var)
if url:
logger.info(f"Analytics using Redis URL from {var}")
return url
return None
def _connect(self):
"""Establish Redis connection with retry and health checks"""
if not self.redis_url:
logger.warning("No REDIS_URL provided for analytics")
return
try:
retry = Retry(ExponentialBackoff(), retries=3)
self.redis_client = redis.from_url(
self.redis_url,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry=retry,
retry_on_error=[redis.ConnectionError, redis.TimeoutError],
health_check_interval=30,
)
self.redis_client.ping()
logger.info("Redis Analytics Service connected")
except redis.RedisError as e:
logger.error(f"Failed to connect Redis for analytics: {e}")
self.redis_client = None
def _ensure_connected(self) -> bool:
"""Lazy reconnection - attempt to reconnect if client is None"""
if self.redis_client:
return True
if self.redis_url:
logger.info("Attempting Redis reconnection for analytics...")
self._connect()
return self.redis_client is not None
def is_available(self) -> bool:
"""Check if Redis is available for analytics (with reconnection)"""
if not self._ensure_connected():
return False
try:
self.redis_client.ping()
return True
except redis.RedisError:
self.redis_client = None
return False
def record_passage(self, data: Dict) -> Optional[str]:
"""
Record a completed passage attempt with summary data.
Args:
data: Passage analytics data containing:
- passageId: Unique ID for this passage attempt
- sessionId: Browser session ID
- bookTitle: Title of the book
- bookAuthor: Author name
- level: Current game level
- round: Current round number
- words: List of word data (word, attemptsToCorrect, hintsUsed, finalCorrect)
- totalBlanks: Number of blanks in passage
- correctOnFirstTry: Count of words correct on first attempt
- totalHintsUsed: Total hints requested
- passed: Whether passage was passed
Returns:
Stream entry ID or None if failed
"""
if not self._ensure_connected():
logger.warning("Analytics unavailable - Redis not connected")
return None
try:
# Add timestamp if not present
if "timestamp" not in data:
data["timestamp"] = datetime.utcnow().isoformat()
# Add to stream (time-series)
entry_id = self.redis_client.xadd(
self.STREAM_KEY,
{"data": json.dumps(data)},
maxlen=self.MAX_STREAM_LEN,
approximate=True,
)
# Update word difficulty stats
for word_data in data.get("words", []):
word = word_data.get("word", "").lower()
if not word:
continue
attempts = word_data.get("attemptsToCorrect", 1)
if attempts == 1 and word_data.get("finalCorrect", False):
# Word was correct on first try
self.redis_client.zincrby(self.WORDS_FIRST_TRY, 1, word)
elif attempts > 1:
# Word needed retry(s)
self.redis_client.zincrby(self.WORDS_RETRY, 1, word)
# Update book usage counter
book_key = f"{data.get('bookTitle', 'Unknown')}|{data.get('bookAuthor', 'Unknown')}"
self.redis_client.zincrby(self.BOOKS_KEY, 1, book_key)
# Track session
session_id = data.get("sessionId", "unknown")
self.redis_client.sadd(self.SESSIONS_KEY, session_id)
logger.debug(f"Recorded passage analytics: {entry_id}")
return entry_id
except redis.RedisError as e:
logger.error(f"Failed to record passage analytics: {e}")
self.redis_client = None
return None
def get_summary(self) -> Dict:
"""
Get aggregate statistics for admin dashboard.
Returns:
Dictionary with:
- totalPassages: Total recorded passages
- totalSessions: Unique sessions
- hardestWords: Top 10 words needing retries
- easiestWords: Top 10 words correct on first try
- popularBooks: Top 10 most used books
"""
if not self._ensure_connected():
return self._empty_summary()
try:
# Get stream length
total_passages = self.redis_client.xlen(self.STREAM_KEY)
# Get unique sessions count
total_sessions = self.redis_client.scard(self.SESSIONS_KEY)
# Get hardest words (most retries needed)
hardest_raw = self.redis_client.zrevrange(
self.WORDS_RETRY, 0, 9, withscores=True
)
hardest_words = [
{"word": word, "retryCount": int(score)}
for word, score in hardest_raw
]
# Get easiest words (most first-try successes)
easiest_raw = self.redis_client.zrevrange(
self.WORDS_FIRST_TRY, 0, 9, withscores=True
)
easiest_words = [
{"word": word, "firstTryCount": int(score)}
for word, score in easiest_raw
]
# Get popular books
books_raw = self.redis_client.zrevrange(
self.BOOKS_KEY, 0, 9, withscores=True
)
popular_books = []
for book_key, count in books_raw:
parts = book_key.split("|", 1)
popular_books.append({
"title": parts[0] if parts else "Unknown",
"author": parts[1] if len(parts) > 1 else "Unknown",
"usageCount": int(count),
})
return {
"totalPassages": total_passages,
"totalSessions": total_sessions,
"hardestWords": hardest_words,
"easiestWords": easiest_words,
"popularBooks": popular_books,
}
except redis.RedisError as e:
logger.error(f"Failed to get analytics summary: {e}")
self.redis_client = None
return self._empty_summary()
def _empty_summary(self) -> Dict:
"""Return empty summary structure"""
return {
"totalPassages": 0,
"totalSessions": 0,
"hardestWords": [],
"easiestWords": [],
"popularBooks": [],
}
def get_recent_passages(self, count: int = 50) -> List[Dict]:
"""
Get recent passage attempts for display/export.
Args:
count: Number of recent entries to retrieve
Returns:
List of passage analytics records (newest first)
"""
if not self._ensure_connected():
return []
try:
entries = self.redis_client.xrevrange(
self.STREAM_KEY, count=count
)
return [json.loads(entry[1]["data"]) for entry in entries]
except redis.RedisError as e:
logger.error(f"Failed to get recent passages: {e}")
self.redis_client = None
return []
def export_all(self) -> List[Dict]:
"""
Export all analytics data for backup/analysis.
Returns:
List of all passage analytics records (oldest first)
"""
if not self._ensure_connected():
return []
try:
entries = self.redis_client.xrange(self.STREAM_KEY)
return [json.loads(entry[1]["data"]) for entry in entries]
except redis.RedisError as e:
logger.error(f"Failed to export analytics: {e}")
self.redis_client = None
return []
def get_word_stats(self, word: str) -> Dict:
"""
Get statistics for a specific word.
Args:
word: The word to look up
Returns:
Dictionary with first_try_count and retry_count
"""
if not self._ensure_connected():
return {"firstTryCount": 0, "retryCount": 0}
try:
word_lower = word.lower()
first_try = self.redis_client.zscore(self.WORDS_FIRST_TRY, word_lower)
retry = self.redis_client.zscore(self.WORDS_RETRY, word_lower)
return {
"word": word_lower,
"firstTryCount": int(first_try) if first_try else 0,
"retryCount": int(retry) if retry else 0,
}
except redis.RedisError as e:
logger.error(f"Failed to get word stats: {e}")
self.redis_client = None
return {"firstTryCount": 0, "retryCount": 0}
def clear_analytics(self) -> bool:
"""
Clear all analytics data (admin function).
Returns:
True if successful
"""
if not self._ensure_connected():
return False
try:
self.redis_client.delete(
self.STREAM_KEY,
self.WORDS_FIRST_TRY,
self.WORDS_RETRY,
self.BOOKS_KEY,
self.SESSIONS_KEY,
)
logger.info("Analytics data cleared")
return True
except redis.RedisError as e:
logger.error(f"Failed to clear analytics: {e}")
self.redis_client = None
return False