-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsecurity.py
More file actions
169 lines (135 loc) · 6.67 KB
/
security.py
File metadata and controls
169 lines (135 loc) · 6.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
Security modules for GGUF Forge: Rate limiting, bot detection, spam protection.
"""
import asyncio
from database import get_db_connection
class RateLimiter:
"""IP-based rate limiter to prevent DDoS attacks."""
def __init__(self, requests_per_minute: int = 60, requests_per_second: int = 10):
self.requests_per_minute = requests_per_minute
self.requests_per_second = requests_per_second
self.requests: dict = {} # IP -> list of timestamps
self.blocked_ips: dict = {} # IP -> block_until timestamp
self.lock = asyncio.Lock()
async def is_allowed(self, ip: str) -> tuple[bool, str]:
"""Check if request from IP is allowed. Returns (allowed, reason)."""
import time
now = time.time()
async with self.lock:
# Check if IP is blocked
if ip in self.blocked_ips:
if now < self.blocked_ips[ip]:
remaining = int(self.blocked_ips[ip] - now)
return False, f"IP blocked for {remaining}s due to rate limit abuse"
else:
del self.blocked_ips[ip]
# Initialize or clean old requests
if ip not in self.requests:
self.requests[ip] = []
# Remove requests older than 1 minute
self.requests[ip] = [t for t in self.requests[ip] if now - t < 60]
# Check per-second rate
recent_second = sum(1 for t in self.requests[ip] if now - t < 1)
if recent_second >= self.requests_per_second:
return False, "Too many requests per second"
# Check per-minute rate
if len(self.requests[ip]) >= self.requests_per_minute:
# Block IP for 5 minutes
self.blocked_ips[ip] = now + 300
return False, "Rate limit exceeded. IP blocked for 5 minutes"
# Allow request
self.requests[ip].append(now)
return True, ""
async def cleanup(self):
"""Remove old entries to prevent memory bloat."""
import time
now = time.time()
async with self.lock:
# Clean request history
for ip in list(self.requests.keys()):
self.requests[ip] = [t for t in self.requests[ip] if now - t < 60]
if not self.requests[ip]:
del self.requests[ip]
# Clean expired blocks
for ip in list(self.blocked_ips.keys()):
if now >= self.blocked_ips[ip]:
del self.blocked_ips[ip]
class BotDetector:
"""Detect and block suspicious bot traffic."""
# Known bot/suspicious user agent patterns
SUSPICIOUS_PATTERNS = [
"curl", "wget", "python-requests", "scrapy", "bot", "spider",
"crawler", "scan", "http", "java/", "perl", "ruby", "go-http",
"aiohttp", "httpx", "axios", "node-fetch", "undici"
]
# Legitimate browser patterns
BROWSER_PATTERNS = ["mozilla", "chrome", "safari", "firefox", "edge", "opera"]
@classmethod
def is_suspicious(cls, user_agent: str, path: str) -> tuple[bool, str]:
"""Check if request appears to be from a bot. Returns (is_bot, reason)."""
if not user_agent:
return True, "Missing User-Agent header"
ua_lower = user_agent.lower()
# Skip check for API endpoints that might legitimately use non-browser clients
if path.startswith("/api/"):
return False, ""
# Check for suspicious patterns
for pattern in cls.SUSPICIOUS_PATTERNS:
if pattern in ua_lower:
return True, f"Suspicious User-Agent pattern: {pattern}"
# For non-API routes, require browser-like user agent
has_browser = any(pattern in ua_lower for pattern in cls.BROWSER_PATTERNS)
if not has_browser and not path.startswith("/api/"):
return True, "Non-browser User-Agent for browser route"
return False, ""
class SpamProtection:
"""Prevent spam submissions (model requests, etc.)."""
def __init__(self, max_requests_per_hour: int = 10, max_pending_per_user: int = 5):
self.max_requests_per_hour = max_requests_per_hour
self.max_pending_per_user = max_pending_per_user
self.submissions: dict = {} # user -> list of timestamps
self.lock = asyncio.Lock()
async def can_submit(self, username: str) -> tuple[bool, str]:
"""Check if user can submit a new request."""
import time
now = time.time()
async with self.lock:
if username not in self.submissions:
self.submissions[username] = []
# Remove submissions older than 1 hour
self.submissions[username] = [t for t in self.submissions[username] if now - t < 3600]
if len(self.submissions[username]) >= self.max_requests_per_hour:
return False, f"Request limit reached ({self.max_requests_per_hour}/hour). Please try again later."
return True, ""
async def record_submission(self, username: str):
"""Record a new submission."""
import time
now = time.time()
async with self.lock:
if username not in self.submissions:
self.submissions[username] = []
self.submissions[username].append(now)
# Keep only last hour to prevent memory bloat
self.submissions[username] = [t for t in self.submissions[username] if now - t < 3600]
async def cleanup(self):
"""Remove old entries to prevent memory bloat."""
import time
now = time.time()
async with self.lock:
for user in list(self.submissions.keys()):
self.submissions[user] = [t for t in self.submissions[user] if now - t < 3600]
if not self.submissions[user]:
del self.submissions[user]
async def check_pending_limit(self, username: str) -> tuple[bool, str]:
"""Check if user has too many pending requests (async DB check)."""
conn = await get_db_connection()
await conn.execute(
"SELECT COUNT(*) as cnt FROM requests WHERE requested_by = ? AND status = 'pending'",
(username,)
)
result = await conn.fetchone()
await conn.close()
count = result['cnt'] if result else 0
if count >= self.max_pending_per_user:
return False, f"Too many pending requests ({count}/{self.max_pending_per_user}). Wait for approval."
return True, ""