-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_worker.py
More file actions
125 lines (111 loc) · 5.57 KB
/
async_worker.py
File metadata and controls
125 lines (111 loc) · 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import threading
import queue
import time
import sqlite3
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
RATE_LIMIT_DB = '/var/www/wikipedia-api/rate_limits.db'
MONITORING_DB = '/var/www/wikipedia-api/monitoring.db'
class AsyncLoggingWorker:
def __init__(self):
self.task_queue = queue.Queue()
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
def _worker(self):
"""Background worker that processes logging tasks"""
while True:
try:
task = self.task_queue.get(timeout=1)
if task is None: # Shutdown signal
break
task_type = task['type']
if task_type == 'increment_usage':
self._do_increment_usage(task['client_id'], task['client_type'])
elif task_type == 'log_usage':
self._do_log_usage(**task['data'])
elif task_type == 'log_error':
self._do_log_error(**task['data'])
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Async worker error: {e}")
def _do_increment_usage(self, client_id, client_type):
"""Perform the actual increment_usage operation"""
conn = sqlite3.connect(RATE_LIMIT_DB, timeout=30)
cursor = conn.cursor()
today = datetime.now().strftime('%Y-%m-%d')
try:
if client_type == 'api_key':
if client_id in ['demo_key_123', 'test_key_456', 'public']:
cursor.execute("INSERT OR REPLACE INTO daily_usage (key_id, date, search_count) VALUES (?, ?, COALESCE((SELECT search_count FROM daily_usage WHERE key_id = ? AND date = ?), 0) + 1)", ('demo_keys', today, 'demo_keys', today))
else:
cursor.execute("INSERT OR REPLACE INTO daily_usage (key_id, date, search_count) VALUES (?, ?, COALESCE((SELECT search_count FROM daily_usage WHERE key_id = ? AND date = ?), 0) + 1)", (client_id, today, client_id, today))
else:
cursor.execute("INSERT OR REPLACE INTO ip_limits (ip_address, date, search_count) VALUES (?, ?, COALESCE((SELECT search_count FROM ip_limits WHERE ip_address = ? AND date = ?), 0) + 1)", (client_id, today, client_id, today))
conn.commit()
except Exception as e:
logger.error(f"Async increment_usage error: {e}")
finally:
conn.close()
def _do_log_usage(self, **kwargs):
"""Perform the actual log_usage operation"""
try:
conn = sqlite3.connect(MONITORING_DB, timeout=30)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO usage_stats (endpoint, query, response_time, result_count, client_ip, tier, status_code, session_id, user_agent, referer, page_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (kwargs.get('endpoint'), kwargs.get('query'), kwargs.get('response_time'), kwargs.get('result_count'),
kwargs.get('client_ip'), kwargs.get('tier'), kwargs.get('status_code', 200), kwargs.get('session_id'),
kwargs.get('user_agent'), kwargs.get('referer'), kwargs.get('page_url')))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Async log_usage error: {e}")
def _do_log_error(self, **kwargs):
"""Perform the actual log_error operation"""
try:
conn = sqlite3.connect(MONITORING_DB, timeout=30)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO error_logs (endpoint, error_type, error_message, client_ip, api_key)
VALUES (?, ?, ?, ?, ?)
""", (kwargs.get('endpoint'), kwargs.get('error_type'), kwargs.get('error_message'),
kwargs.get('client_ip'), kwargs.get('api_key')))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Async log_error error: {e}")
def queue_increment_usage(self, client_id, client_type):
"""Queue increment_usage for background processing"""
self.task_queue.put({
'type': 'increment_usage',
'client_id': client_id,
'client_type': client_type
})
def queue_log_usage(self, endpoint, query=None, response_time=None, result_count=None,
client_ip=None, tier=None, status_code=200, session_id=None,
user_agent=None, referer=None, page_url=None):
"""Queue log_usage for background processing"""
self.task_queue.put({
'type': 'log_usage',
'data': {
'endpoint': endpoint, 'query': query, 'response_time': response_time,
'result_count': result_count, 'client_ip': client_ip, 'tier': tier,
'status_code': status_code, 'session_id': session_id, 'user_agent': user_agent,
'referer': referer, 'page_url': page_url
}
})
def queue_log_error(self, endpoint, error_type, error_message, client_ip=None, api_key=None):
"""Queue log_error for background processing"""
self.task_queue.put({
'type': 'log_error',
'data': {
'endpoint': endpoint, 'error_type': error_type, 'error_message': error_message,
'client_ip': client_ip, 'api_key': api_key
}
})
# Global async worker instance
async_worker = AsyncLoggingWorker()