Skip to content

Commit c1c3b59

Browse files
committed
feat: add /events REST endpoint returning EventV1 JSON array
The frontend's useGetEvents hook fetches EVENTS_API_BASE_URL + "/events" and expects a plain JSON array of Kubernetes EventV1 objects. ClickHouse predefined_query_handler can't produce this format (nested JSON + no wrapper), so the SSE proxy now also serves /events — querying ClickHouse and transforming to the EventV1 shape the frontend needs. This lets EVENTS_API_BASE_URL point to the SSE proxy alongside EVENTS_PUSH_API_BASE_URL. Made-with: Cursor
1 parent 4c2dda2 commit c1c3b59

1 file changed

Lines changed: 63 additions & 8 deletions

File tree

sse-proxy/main.go

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (p *poller) run(ctx context.Context) {
242242

243243
func (p *poller) poll() {
244244
query := fmt.Sprintf(pollSQL, p.lastSeenUnix)
245-
rows, err := p.queryClickHouse(query)
245+
rows, err := queryClickHouse(p.cfg, query)
246246
if err != nil {
247247
log.Printf("[poller] query error: %v", err)
248248
return
@@ -280,14 +280,18 @@ func (p *poller) poll() {
280280
}
281281
}
282282

283-
func (p *poller) queryClickHouse(query string) ([]chRow, error) {
284-
req, err := http.NewRequest(http.MethodPost, p.cfg.clickhouseURL, strings.NewReader(query))
283+
// ---------------------------------------------------------------------------
284+
// Shared ClickHouse query helper
285+
// ---------------------------------------------------------------------------
286+
287+
func queryClickHouse(cfg config, query string) ([]chRow, error) {
288+
req, err := http.NewRequest(http.MethodPost, cfg.clickhouseURL, strings.NewReader(query))
285289
if err != nil {
286290
return nil, err
287291
}
288292
req.Header.Set("Content-Type", "text/plain")
289-
if p.cfg.clickhouseUser != "" {
290-
req.SetBasicAuth(p.cfg.clickhouseUser, p.cfg.clickhousePassword)
293+
if cfg.clickhouseUser != "" {
294+
req.SetBasicAuth(cfg.clickhouseUser, cfg.clickhousePassword)
291295
}
292296

293297
resp, err := http.DefaultClient.Do(req)
@@ -304,22 +308,73 @@ func (p *poller) queryClickHouse(query string) ([]chRow, error) {
304308
return nil, fmt.Errorf("clickhouse returned %d: %s", resp.StatusCode, body)
305309
}
306310

307-
// FORMAT JSONEachRow: one JSON object per newline.
308311
var rows []chRow
309312
for _, line := range strings.Split(strings.TrimSpace(string(body)), "\n") {
310313
if line == "" {
311314
continue
312315
}
313316
var row chRow
314317
if err := json.Unmarshal([]byte(line), &row); err != nil {
315-
log.Printf("[poller] unmarshal error: %v (line=%s)", err, line)
318+
log.Printf("[query] unmarshal error: %v (line=%s)", err, line)
316319
continue
317320
}
318321
rows = append(rows, row)
319322
}
320323
return rows, nil
321324
}
322325

326+
// ---------------------------------------------------------------------------
327+
// REST /events – returns recent K8s events as a plain JSON array of EventV1.
328+
// The frontend's useGetEvents hook calls EVENTS_API_BASE_URL + "/events" and
329+
// expects (await res.json()) as SSEK8sEvent[].
330+
// ---------------------------------------------------------------------------
331+
332+
const eventsSQL = `SELECT
333+
toUnixTimestamp(Timestamp) AS ts_unix,
334+
ifNull(LogAttributes['krateo.io/composition-id'], '') AS composition_id,
335+
ifNull(JSONExtractString(Body, 'object', 'involvedObject', 'apiVersion'), '') AS obj_apiversion,
336+
ifNull(JSONExtractString(Body, 'object', 'involvedObject', 'name'), '') AS obj_name,
337+
ifNull(JSONExtractString(Body, 'object', 'involvedObject', 'namespace'), '') AS obj_namespace,
338+
ifNull(JSONExtractString(Body, 'object', 'involvedObject', 'uid'), '') AS obj_uid,
339+
ifNull(JSONExtractString(Body, 'object', 'involvedObject', 'kind'), '') AS obj_kind,
340+
ifNull(JSONExtractString(Body, 'object', 'reason'), '') AS reason,
341+
ifNull(JSONExtractString(Body, 'object', 'message'), '') AS message,
342+
ifNull(JSONExtractString(Body, 'object', 'type'), 'Normal') AS type,
343+
coalesce(
344+
nullIf(JSONExtractString(Body, 'object', 'eventTime'), ''),
345+
nullIf(JSONExtractString(Body, 'object', 'lastTimestamp'), ''),
346+
formatDateTime(toDateTime(Timestamp), '%%Y-%%m-%%dT%%H:%%i:%%SZ', 'UTC')
347+
) AS event_time,
348+
ifNull(JSONExtractString(Body, 'object', 'source', 'component'), '') AS source_component
349+
FROM otel_logs
350+
WHERE ResourceAttributes['telemetry.source'] = 'k8s-events'
351+
AND JSONExtractString(Body, 'object', 'reason') != ''
352+
ORDER BY Timestamp DESC
353+
LIMIT 200
354+
FORMAT JSONEachRow`
355+
356+
func handleEvents(cfg config) http.HandlerFunc {
357+
return func(w http.ResponseWriter, r *http.Request) {
358+
rows, err := queryClickHouse(cfg, eventsSQL)
359+
if err != nil {
360+
log.Printf("[/events] query error: %v", err)
361+
http.Error(w, "internal error", http.StatusInternalServerError)
362+
return
363+
}
364+
365+
events := make([]SSEK8sEvent, 0, len(rows))
366+
for _, row := range rows {
367+
events = append(events, row.toSSEK8sEvent())
368+
}
369+
370+
w.Header().Set("Content-Type", "application/json")
371+
w.Header().Set("Access-Control-Allow-Origin", "*")
372+
if err := json.NewEncoder(w).Encode(events); err != nil {
373+
log.Printf("[/events] encode error: %v", err)
374+
}
375+
}
376+
}
377+
323378
// ---------------------------------------------------------------------------
324379
// HTTP handlers
325380
// ---------------------------------------------------------------------------
@@ -383,7 +438,7 @@ func main() {
383438
go p.run(ctx)
384439

385440
mux := http.NewServeMux()
386-
// Accept both /notifications and /notifications/ to match frontend behaviour.
441+
mux.HandleFunc("/events", handleEvents(cfg))
387442
mux.HandleFunc("/notifications/", handleSSE(h))
388443
mux.HandleFunc("/notifications", handleSSE(h))
389444
mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {

0 commit comments

Comments
 (0)