Skip to content

Commit 74947dc

Browse files
committed
feat: add compositionresolver OTel processor to replace EventRouter
Build a custom OTel collector (otelcol-krateo) that includes a compositionresolver processor. For each K8s event, the processor resolves the involvedObject via the K8s API, reads its krateo.io/composition-id label, and sets it as a LogAttribute — replicating the lookup that EventRouter performed before forwarding events to EventSSE. This allows the /events ClickHouse endpoint to filter by composition UID directly (LogAttributes['krateo.io/composition-id']), eliminating the need for EventRouter, EventSSE, and etcd. New files: - otel-collector-custom/ — processor source, OCB config, Dockerfile - .github/workflows/otel-collector.yaml — CI for ghcr.io image Updated: - otel-collectors/deployment.yaml — custom image + compositionresolver in the logs pipeline, broad RBAC for involvedObject GET - clickhouse-config/ — /events query filters by LogAttributes instead of namespace Made-with: Cursor
1 parent cea0f7e commit 74947dc

10 files changed

Lines changed: 477 additions & 30 deletions

File tree

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
name: OTel Collector – Build & Push
2+
3+
on:
4+
push:
5+
branches: [main]
6+
paths:
7+
- "otel-collector-custom/**"
8+
- ".github/workflows/otel-collector.yaml"
9+
pull_request:
10+
branches: [main]
11+
paths:
12+
- "otel-collector-custom/**"
13+
- ".github/workflows/otel-collector.yaml"
14+
workflow_dispatch: {}
15+
16+
env:
17+
REGISTRY: ghcr.io
18+
IMAGE_NAME: braghettos/otelcol-krateo
19+
20+
jobs:
21+
build-push:
22+
name: Build & push image
23+
runs-on: ubuntu-latest
24+
permissions:
25+
contents: read
26+
packages: write
27+
28+
steps:
29+
- name: Checkout
30+
uses: actions/checkout@v4
31+
32+
- name: Set up QEMU (multi-arch)
33+
uses: docker/setup-qemu-action@v3
34+
35+
- name: Set up Docker Buildx
36+
uses: docker/setup-buildx-action@v3
37+
38+
- name: Log in to GHCR
39+
if: github.event_name != 'pull_request'
40+
uses: docker/login-action@v3
41+
with:
42+
registry: ${{ env.REGISTRY }}
43+
username: ${{ github.actor }}
44+
password: ${{ secrets.GITHUB_TOKEN }}
45+
46+
- name: Extract metadata
47+
id: meta
48+
uses: docker/metadata-action@v5
49+
with:
50+
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
51+
tags: |
52+
type=sha,prefix=sha-,format=short
53+
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}
54+
type=semver,pattern={{version}}
55+
type=semver,pattern={{major}}.{{minor}}
56+
57+
- name: Build and push
58+
uses: docker/build-push-action@v6
59+
with:
60+
context: otel-collector-custom
61+
file: otel-collector-custom/Dockerfile
62+
platforms: linux/amd64,linux/arm64
63+
push: ${{ github.event_name != 'pull_request' }}
64+
tags: ${{ steps.meta.outputs.tags }}
65+
labels: ${{ steps.meta.outputs.labels }}
66+
cache-from: type=gha
67+
cache-to: type=gha,mode=max
68+
69+
- name: Image digest
70+
run: echo "Pushed ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ steps.build-push.outputs.digest }}"

clickhouse-config/configmap.yaml

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,11 @@ data:
1414
<clickhouse>
1515
<http_handlers>
1616
<!--
17-
GET /events?composition_id=<namespace>[&limit=50]
17+
GET /events?composition_id=<uid>[&limit=50]
1818
19-
k8sobjects receiver stores events as raw K8s watch JSON in Body:
20-
Body = {"object": {"reason":"...", "message":"...", "type":"Normal|Warning",
21-
"involvedObject": {"name":"...", "namespace":"...",
22-
"kind":"...", "uid":"..."},
23-
"eventTime":"...", ...}, "type":"ADDED|MODIFIED"}
24-
25-
Filter by k8s.namespace.name (set by k8sobjects from event metadata.namespace),
26-
which maps to the Krateo composition namespace.
19+
The compositionresolver OTel processor enriches each K8s event with
20+
LogAttributes['krateo.io/composition-id'] by resolving the involvedObject
21+
via the K8s API. composition_id is the composition resource's UID.
2722
-->
2823
<rule>
2924
<url>/events</url>
@@ -47,7 +42,7 @@ data:
4742
ifNull(JSONExtractString(Body, 'object', 'source', 'component'), '') AS source_component
4843
FROM otel_logs
4944
WHERE ResourceAttributes['telemetry.source'] = 'k8s-events'
50-
AND ResourceAttributes['k8s.namespace.name'] = {composition_id:String}
45+
AND LogAttributes['krateo.io/composition-id'] = {composition_id:String}
5146
AND JSONExtractString(Body, 'object', 'reason') != ''
5247
ORDER BY Timestamp DESC
5348
LIMIT 50

clickhouse-config/http-handlers.xml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
main config (which has no <http_handlers>). Built-in hardcoded handlers
88
(/ping, /replicas_status, /play, etc.) remain available as fallbacks.
99
10-
Endpoint: GET /events?composition_id=<id>[&limit=50]
10+
Endpoint: GET /events?composition_id=<uid>[&limit=50]
1111
Returns: FORMAT JSON rows; transformed into SSEK8sEvent by the RESTAction
1212
JQ filter in restaction.composition-events.yaml.
1313
Mount at: /etc/clickhouse-server/config.d/http-handlers.xml
@@ -21,18 +21,18 @@
2121

2222
<!-- ================================================================
2323
Custom: Krateo K8s events endpoint
24-
GET /events?composition_id=<id>[&limit=50]
25-
{composition_id:String} is read from the URL query parameter.
24+
GET /events?composition_id=<uid>[&limit=50]
25+
{composition_id:String} is the composition resource's UID,
26+
matching the value of label krateo.io/composition-id on managed
27+
resources.
2628
27-
k8sobjects receiver stores events as raw Kubernetes watch JSON in Body:
28-
Body = {"object": {"reason":"...", "message":"...", "type":"Normal|Warning",
29-
"involvedObject": {"name":"...", "namespace":"...",
30-
"kind":"...", "uid":"..."},
31-
"eventTime":"...", ...}, "type":"ADDED|MODIFIED"}
29+
The compositionresolver OTel processor enriches each K8s event
30+
log record with a LogAttribute "krateo.io/composition-id" by
31+
resolving the involvedObject via the K8s API — the same logic
32+
the old EventRouter performed.
3233
33-
We filter by k8s.namespace.name (ResourceAttribute set by the k8sobjects
34-
receiver from the event's metadata.namespace), which maps to the Krateo
35-
composition namespace. Event fields are extracted with JSONExtractString.
34+
Event fields are extracted from the raw K8s watch JSON in Body
35+
using JSONExtractString.
3636
================================================================ -->
3737
<rule>
3838
<url>/events</url>
@@ -56,7 +56,7 @@
5656
ifNull(JSONExtractString(Body, 'object', 'source', 'component'), '') AS source_component
5757
FROM otel_logs
5858
WHERE ResourceAttributes['telemetry.source'] = 'k8s-events'
59-
AND ResourceAttributes['k8s.namespace.name'] = {composition_id:String}
59+
AND LogAttributes['krateo.io/composition-id'] = {composition_id:String}
6060
AND JSONExtractString(Body, 'object', 'reason') != ''
6161
ORDER BY Timestamp DESC
6262
LIMIT 50

otel-collector-custom/Dockerfile

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
FROM golang:1.23-alpine AS builder
2+
3+
RUN apk add --no-cache ca-certificates git
4+
5+
# Install OCB (OpenTelemetry Collector Builder)
6+
RUN --mount=type=cache,target=/root/.cache/go-build \
7+
go install go.opentelemetry.io/collector/cmd/builder@v0.117.0
8+
9+
WORKDIR /build
10+
COPY builder-config.yaml .
11+
COPY compositionresolver/ compositionresolver/
12+
13+
RUN --mount=type=cache,target=/root/.cache/go-build \
14+
--mount=type=cache,target=/go/pkg/mod \
15+
builder --config builder-config.yaml
16+
17+
# ---
18+
FROM gcr.io/distroless/base-debian12:latest
19+
20+
ARG USER_UID=10001
21+
USER ${USER_UID}
22+
23+
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
24+
COPY --chmod=755 --from=builder /build/otelcol-krateo/otelcol-krateo /otelcol-krateo
25+
26+
ENTRYPOINT ["/otelcol-krateo"]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
dist:
2+
name: otelcol-krateo
3+
description: Krateo OTel Collector – contrib + compositionresolver processor
4+
output_path: ./otelcol-krateo
5+
otelcol_version: 0.117.0
6+
7+
receivers:
8+
# K8s events (watch mode) and cluster metrics
9+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver v0.117.0
10+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver v0.117.0
11+
12+
processors:
13+
# Core processors
14+
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.117.0
15+
- gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.117.0
16+
# Contrib processors
17+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.117.0
18+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.117.0
19+
# Custom: resolves krateo.io/composition-id from involvedObject labels
20+
- gomod: github.com/braghettos/observability-stack/otel-collector-custom/compositionresolver v0.0.1
21+
path: ./compositionresolver
22+
23+
exporters:
24+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.117.0
25+
- gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.117.0
26+
27+
extensions:
28+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.117.0
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package compositionresolver
2+
3+
import "time"
4+
5+
type Config struct {
6+
// CacheTTL controls how long a resolved composition-id stays cached before
7+
// re-querying the K8s API. A longer TTL reduces API pressure but delays
8+
// detection of label changes on the involvedObject.
9+
CacheTTL time.Duration `mapstructure:"cache_ttl"`
10+
11+
// NegativeCacheTTL is the TTL for caching "not found" results (resources
12+
// that exist but have no krateo.io/composition-id label, or that could
13+
// not be resolved at all). Shorter than CacheTTL so we re-check sooner.
14+
NegativeCacheTTL time.Duration `mapstructure:"negative_cache_ttl"`
15+
16+
// LabelKey is the Kubernetes label on the involvedObject that holds the
17+
// composition identifier (the composition resource's UID).
18+
LabelKey string `mapstructure:"label_key"`
19+
}
20+
21+
func (c *Config) Validate() error {
22+
return nil
23+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package compositionresolver
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.opentelemetry.io/collector/component"
8+
"go.opentelemetry.io/collector/consumer"
9+
"go.opentelemetry.io/collector/processor"
10+
"go.opentelemetry.io/collector/processor/processorhelper"
11+
)
12+
13+
var processorType = component.MustNewType("compositionresolver")
14+
15+
func NewFactory() processor.Factory {
16+
return processor.NewFactory(
17+
processorType,
18+
createDefaultConfig,
19+
processor.WithLogs(createLogsProcessor, component.StabilityLevelAlpha),
20+
)
21+
}
22+
23+
func createDefaultConfig() component.Config {
24+
return &Config{
25+
CacheTTL: 5 * time.Minute,
26+
NegativeCacheTTL: 30 * time.Second,
27+
LabelKey: "krateo.io/composition-id",
28+
}
29+
}
30+
31+
func createLogsProcessor(
32+
ctx context.Context,
33+
set processor.Settings,
34+
cfg component.Config,
35+
nextConsumer consumer.Logs,
36+
) (processor.Logs, error) {
37+
pCfg := cfg.(*Config)
38+
p := newProcessor(set.Logger, pCfg)
39+
40+
return processorhelper.NewLogs(
41+
ctx, set, cfg, nextConsumer, p.processLogs,
42+
processorhelper.WithStart(p.start),
43+
processorhelper.WithShutdown(p.shutdown),
44+
)
45+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module github.com/braghettos/observability-stack/otel-collector-custom/compositionresolver
2+
3+
go 1.23.0
4+
5+
require (
6+
go.opentelemetry.io/collector/component v0.117.0
7+
go.opentelemetry.io/collector/consumer v1.23.0
8+
go.opentelemetry.io/collector/pdata v1.23.0
9+
go.opentelemetry.io/collector/processor v0.117.0
10+
go.uber.org/zap v1.27.0
11+
k8s.io/apimachinery v0.32.0
12+
k8s.io/client-go v0.32.0
13+
)

0 commit comments

Comments
 (0)