-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
232 lines (191 loc) · 7.96 KB
/
server.py
File metadata and controls
232 lines (191 loc) · 7.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""HTTP server for the Fleet Logger.
Exposes a REST API for log ingestion, querying, stats, tracing, and
real-time tail. Uses only the Python stdlib (:mod:`http.server`).
Endpoints
---------
POST /ingest — ingest one or many log entries (JSON body)
GET /search — query logs (query-string filters)
GET /agents — list agents with log counts
GET /stats — aggregate log statistics
GET /trace/<trace_id> — all logs for a trace
GET /tail — long-poll for new entries
DELETE /prune — prune old logs
GET /health — health check
"""
from __future__ import annotations
import json
import threading
import time
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Optional
from logger import FleetLogger, LogEntry, DEFAULT_LOG_DIR, DEFAULT_PORT
from query import LogQuery
# ---------------------------------------------------------------------------
# Request handler
# ---------------------------------------------------------------------------
class LoggerHandler(BaseHTTPRequestHandler):
"""Routes incoming HTTP requests to FleetLogger / LogQuery methods."""
# Shared across all request instances (set once by serve())
fleet_logger: FleetLogger # type: ignore[assignment]
log_query: LogQuery # type: ignore[assignment]
# Suppress default stderr logging
def log_message(self, fmt: str, *args: Any) -> None:
pass
# -- Helpers -------------------------------------------------------------
def _send_json(self, data: Any, status: int = 200) -> None:
body = json.dumps(data, indent=2, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _send_error(self, msg: str, status: int = 400) -> None:
self._send_json({"error": msg}, status)
def _read_body(self) -> bytes:
length = int(self.headers.get("Content-Length", 0))
return self.rfile.read(length)
def _parse_qs(self) -> dict[str, str]:
qs = urllib.parse.parse_qs(self.path.split("?", 1)[-1], keep_blank_values=True)
return {k: v[0] for k, v in qs.items()}
def _route(self) -> tuple[str, dict[str, str]]:
"""Parse path into (route, params)."""
path = self.path.split("?")[0]
parts = [p for p in path.strip("/").split("/") if p]
params = self._parse_qs()
return "/".join(parts), params
# -- GET -----------------------------------------------------------------
def do_GET(self) -> None: # noqa: N802
route, params = self._route()
if route == "health":
self._handle_health()
elif route == "search":
self._handle_search(params)
elif route == "agents":
self._handle_agents()
elif route == "stats":
self._handle_stats()
elif route == "tail":
self._handle_tail(params)
elif route.startswith("trace/"):
trace_id = route.split("/", 1)[1]
self._handle_trace(trace_id)
else:
self._send_error(f"Unknown route: GET /{route}", 404)
# -- POST ----------------------------------------------------------------
def do_POST(self) -> None: # noqa: N802
route, _ = self._route()
if route == "ingest":
self._handle_ingest()
else:
self._send_error(f"Unknown route: POST /{route}", 404)
# -- DELETE --------------------------------------------------------------
def do_DELETE(self) -> None: # noqa: N802
route, params = self._route()
if route == "prune":
self._handle_prune(params)
else:
self._send_error(f"Unknown route: DELETE /{route}", 404)
# -- Endpoint implementations -------------------------------------------
def _handle_health(self) -> None:
stats = self.fleet_logger.stats()
self._send_json({
"status": "ok",
"entries": stats["total_entries"],
"agents": stats["agents_count"],
"uptime_since": stats["earliest_ts"],
})
def _handle_ingest(self) -> None:
try:
body = json.loads(self._read_body())
except (json.JSONDecodeError, Exception) as exc:
self._send_error(f"Invalid JSON: {exc}")
return
entries: list[LogEntry] = []
# Support both single entry and batch (list)
data = body if isinstance(body, list) else [body]
for item in data:
try:
entries.append(LogEntry.from_dict(item))
except (TypeError, ValueError) as exc:
self._send_error(f"Invalid entry: {exc}")
return
count = self.fleet_logger.ingest_batch(entries)
self._send_json({"ingested": count, "status": "ok"})
def _handle_search(self, params: dict[str, str]) -> None:
result = self.log_query.search(
agent=params.get("agent"),
level=params.get("level"),
min_ts=self._float(params.get("min_ts")),
max_ts=self._float(params.get("max_ts")),
trace_id=params.get("trace_id"),
session_id=params.get("session_id"),
pattern=params.get("pattern"),
sort_desc=params.get("sort", "desc").lower() == "desc",
limit=int(params.get("limit", "100")),
offset=int(params.get("offset", "0")),
)
self._send_json(result)
def _handle_agents(self) -> None:
agents = self.fleet_logger.list_agents()
self._send_json({"agents": agents, "count": len(agents)})
def _handle_stats(self) -> None:
self._send_json(self.fleet_logger.stats())
def _handle_trace(self, trace_id: str) -> None:
result = self.log_query.trace(trace_id)
self._send_json(result)
def _handle_tail(self, params: dict[str, str]) -> None:
timeout = float(params.get("timeout", "30"))
entries = self.log_query.tail(
agent=params.get("agent"),
level=params.get("level"),
timeout=timeout,
)
self._send_json({
"entries": [e.to_dict() for e in entries],
"count": len(entries),
})
def _handle_prune(self, params: dict[str, str]) -> None:
days = int(params.get("days", "30"))
removed = self.fleet_logger.prune(days)
self._send_json({"pruned": removed, "retention_days": days})
@staticmethod
def _float(value: Optional[str]) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except ValueError:
return None
# ---------------------------------------------------------------------------
# Server bootstrap
# ---------------------------------------------------------------------------
def serve(
host: str = "0.0.0.0",
port: int = DEFAULT_PORT,
log_dir: str = DEFAULT_LOG_DIR,
retention_days: int = 30,
) -> None:
"""Start the Fleet Logger HTTP server (blocking)."""
fleet_logger = FleetLogger(
log_dir=log_dir,
retention_days=retention_days,
)
fleet_logger.rebuild_index()
log_query = LogQuery(fleet_logger)
# Inject shared state into handler class
LoggerHandler.fleet_logger = fleet_logger
LoggerHandler.log_query = log_query
server = HTTPServer((host, port), LoggerHandler)
print(f"[fleet-logger] Listening on {host}:{port}")
print(f"[fleet-logger] Log directory: {log_dir}")
print(f"[fleet-logger] Retention: {retention_days} days")
print(f"[fleet-logger] Loaded {len(fleet_logger._index)} entries from disk")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n[fleet-logger] Shutting down…")
finally:
fleet_logger.close()
server.server_close()
print("[fleet-logger] Server stopped.")