Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
jobmanager.bind-host: 0.0.0.0
rest.bind-host: 0.0.0.0
state.backend.type: rocksdb
execution.checkpointing.dir: s3://agentflow-lake/checkpoints
s3.endpoint: http://minio:9000
Expand All @@ -79,6 +81,7 @@ services:
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.bind-host: 0.0.0.0
taskmanager.numberOfTaskSlots: 4
state.backend.type: rocksdb
s3.endpoint: http://minio:9000
Expand Down
127 changes: 127 additions & 0 deletions docs/perf/freshness-realpath-2026-06-30.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Event Freshness on the Real Kafka→Flink Path

> Generated by `scripts/benchmark_freshness_realpath.py` against the live
> `docker-compose.yml` + `docker-compose.flink.yml` stack on a Mac.
> Machine-readable results: `.artifacts/freshness/realpath-current.json`.

Measured: `2026-06-30`

## Why this exists

`docs/freshness-benchmark.md` measures freshness through the **in-process
DuckDB shortcut** — `src/processing/local_pipeline._process_event` writes the
serving store directly, skipping Kafka and Flink entirely. That number
(1.06 s p50 / 1.99 s p95) is honest about *what it measures*, but it does **not**
exercise the real streaming path. This benchmark closes that gap: it measures
freshness when an event actually traverses the **real broker and the real Flink
operators**.

## What is measured

One freshness sample is the wall-clock delay from producing an `order.created`
event to the Kafka source topic `orders.raw`, to the validated event landing on
`events.validated`, matched by `event_id`:

```
produce(orders.raw)
→ Flink `stream_processor` job
(Kafka source → schema validation → enrichment → dedup by event_id)
→ events.validated
```

The driver produces the canonical `OrderEvent` model (so the payload passes the
same validation the job applies), then consumes `events.validated` with a
fresh consumer group positioned at the tail, so it only observes the events it
produced.

## System under test

- Host: `deproject-mac`, macOS 13.7.8 (Intel i5-7500, 8 GB RAM, 4 cores)
- Docker: Colima (vz, 6 GiB / 4 CPU)
- Flink `2.2.1-java17`, single TaskManager (4 slots), job parallelism 2,
RocksDB state backend, checkpointing 30 s → MinIO (S3)
- Kafka `confluentinc/cp-kafka:7.7.0` (KRaft), `orders.raw` 6 partitions
- Driver Python 3.11.15, `confluent-kafka` to the Kafka HOST listener
(`localhost:19092`)

## Results (n = 30, 0 misses)

| Metric | Real Kafka→Flink path | In-process DuckDB shortcut* |
|--------|----------------------:|----------------------------:|
| p50 | **2.50 s** | 1.06 s |
| p95 | 10.11 s | 1.99 s |
| p99 | 15.42 s | — |
| min | 0.31 s | — |
| max | 16.09 s | 2.02 s |
| mean | 3.33 s | — |

\* `docs/freshness-benchmark.md`, `event_driven` arm. **Not the same segment** —
see "Reading the numbers".

**Distribution:** 24 of 30 samples fall in a tight **2.14–2.74 s** steady-state
band (the real per-event streaming latency). The tail is driven by **3 outliers**
(5.7 s, 13.8 s, 16.1 s) — periodic Flink checkpoint / Beam-bundle / GC pauses on
the single-node VM — which, at n = 30, dominate p95/p99. A longer run would
tighten the tail percentiles; the p50 is stable.

## Reading the numbers

- The two columns measure **complementary segments, not the same thing.** The
DuckDB shortcut measures *event → serving metric* (and is dominated by the 2 s
cache-invalidation poll); this benchmark measures *Kafka produce → validated
event on Kafka* through the real Flink operators. The serving/DuckDB metric
store is **not wired to the real streaming path** (Flink sinks to
`events.validated`; nothing bridges that topic into the serving DuckDB — that
is a deliberate demo shortcut, production target is a ClickHouse sink). So
there is no honest single "event → metric on the real path" number on this
stand; this is the real **streaming-hop** number.
- The real path's ~2.5 s p50 is the same order of magnitude as the headline
freshness claim. The extra ~1.4 s over the shortcut is the real cost of the
streaming hop: Kafka produce + source poll + the **Beam Python portability
overhead** (the UDFs run as external Python workers) + dedup keyed state +
the Kafka sink — none of which the in-process shortcut pays.

## Bugs found bringing up the real path

The Flink cluster path had never run end-to-end (the live-cluster smoke had only
ever failed earlier on an unrelated `timedelta` watermark bug). Bringing it up
surfaced three latent pyflink-2.x / Docker issues:

1. **Flink `bind-host` (fixed, `docker-compose.yml`).** The JobManager/TaskManager
`FLINK_PROPERTIES` set `jobmanager.rpc.address` but never `bind-host`, so the
JM bound RPC to `localhost` and a separate TM container got *connection
refused* on `:6123` — the cluster could not form. Fix: `jobmanager.bind-host`
/ `taskmanager.bind-host: 0.0.0.0` (+ `taskmanager.host`).
2. **`StateTtlConfig` timedelta (fixed, `stream_processor.py`).** The dedup
operator passed `StateTtlConfig.new_builder(timedelta(minutes=10))`; pyflink
calls `.to_milliseconds()` on the argument, which a Python `timedelta` lacks
→ `AttributeError` crashed the Python worker on every event. Fix:
`Time.minutes(10)` (same class of bug as the already-fixed watermark
`timedelta`→`Duration`).
3. **`ctx.output` side output (found, not yet fixed).** `ValidateAndEnrich`
routes invalid events with `ctx.output(DEAD_LETTER_TAG, …)` — the Java side-
output API; in pyflink the `ProcessFunction` context has no `.output()`
(`AttributeError: 'InternalProcessFunctionContext' object has no attribute
'output'`). **Valid** events are unaffected (this benchmark measures valid
events), but the dead-letter path is currently broken. Left as a follow-up so
the streaming-freshness measurement isn't blocked on it.

## Reproduce

On a Mac with Docker:

```bash
# 1. bring up Kafka + MinIO + Flink (single TaskManager to fit a 6 GB VM)
docker compose -f docker-compose.yml -f docker-compose.flink.yml \
up -d --build --scale flink-taskmanager=1 flink-job-runner
# (the bind-host fix in docker-compose.yml lets the JM/TM cluster form)

# 2. wait for the job to reach RUNNING (http://localhost:8081), then measure
pip install -e ".[load]"
python scripts/benchmark_freshness_realpath.py --bootstrap localhost:19092 \
--warmup 3 --iterations 30
```

Each iteration: produce one schema-valid `order.created` event to `orders.raw`,
then poll `events.validated` until the matching `event_id` appears; the elapsed
time is one freshness sample.
250 changes: 250 additions & 0 deletions scripts/benchmark_freshness_realpath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
#!/usr/bin/env python3
"""Real-path event-to-validation freshness benchmark (Kafka -> Flink -> Kafka).

Unlike ``scripts/benchmark_freshness.py`` -- which measures the in-process
``local_pipeline._process_event`` DuckDB shortcut -- this driver measures the
**real streaming path**: an ``order.created`` event is produced to the Kafka
source topic ``orders.raw``, processed by the live Flink ``stream_processor``
job (schema validation -> enrichment -> deduplication), and emitted to
``events.validated``. One freshness sample is the wall-clock delay from produce
to the validated event landing on ``events.validated``, matched by ``event_id``.

This is the honest "real Kafka->Flink path" number for the freshness headline:
the serving DuckDB metric store is deliberately fed by the in-process shortcut
(production target is a ClickHouse sink), so end-to-end event->metric on the
demo remains the shortcut figure documented in ``docs/freshness-benchmark.md``.
This script measures the streaming hop the shortcut skips, on the real broker
and the real Flink operators.

Prerequisites: the Flink stack is up via
``docker compose -f docker-compose.yml -f docker-compose.flink.yml`` with the
``stream_processor`` job RUNNING, and this process can reach the Kafka HOST
listener (``localhost:19092`` by default). Run from the repo root so the
editable ``src`` package is importable (reuses the canonical event model):

python scripts/benchmark_freshness_realpath.py --bootstrap localhost:19092 --iterations 30
"""

from __future__ import annotations

import argparse
import json
import os
import platform
import random
import statistics
import sys
import time
import uuid
from datetime import UTC, datetime
from decimal import Decimal

from confluent_kafka import Consumer, Producer

DEFAULT_REPORT = ".artifacts/freshness/realpath-current.json"


def build_order_event(amount: Decimal, sequence: int) -> dict:
"""A schema- and semantics-valid order.created event (reuses the repo model).

Importing the canonical ``OrderEvent`` guarantees the produced payload passes
the same validation the Flink job applies, so a valid event lands on
``events.validated`` rather than the dead-letter topic.
"""
from src.ingestion.schemas.events import (
Currency,
EventType,
OrderEvent,
OrderItem,
OrderStatus,
)

event = OrderEvent(
event_id=str(uuid.uuid4()),
event_type=EventType.ORDER_CREATED,
timestamp=datetime.now(UTC),
source="freshness-realpath-benchmark",
# Schema pattern: ^ORD-\d{8}-\d{4,}$. The 9-prefixed sequence keeps these
# ids clear of any seed generator's 4-digit ids.
order_id=f"ORD-{datetime.now(UTC):%Y%m%d}-9{sequence:05d}",
user_id=f"USR-{random.randint(10000, 99999)}", # noqa: S311 - load shape, not crypto
status=OrderStatus.PENDING,
items=[OrderItem(product_id="PROD-001", quantity=1, unit_price=amount)],
total_amount=amount,
currency=Currency.USD,
)
return json.loads(event.model_dump_json())


def percentile(values: list[float], q: float) -> float:
if not values:
return float("nan")
ordered = sorted(values)
if len(ordered) == 1:
return ordered[0]
pos = (len(ordered) - 1) * q
lo = int(pos)
hi = min(lo + 1, len(ordered) - 1)
return ordered[lo] + (ordered[hi] - ordered[lo]) * (pos - lo)


def make_validated_consumer(bootstrap: str, topic: str) -> Consumer:
"""A fresh-group consumer positioned at the END of ``topic``.

A unique group id + ``auto.offset.reset=latest`` means we never replay the
historical contents of ``events.validated`` -- we only observe events we
produce during the run. The initial poll loop blocks until the assignment is
live, then drains anything already buffered so the first sample starts clean.
"""
consumer = Consumer(
{
"bootstrap.servers": bootstrap,
# Docker maps the Kafka HOST listener on IPv4 only; ``localhost``
# resolves to ``::1`` first on macOS, so force the IPv4 family.
"broker.address.family": "v4",
"group.id": f"freshness-realpath-{uuid.uuid4()}",
"auto.offset.reset": "latest",
"enable.auto.commit": False,
}
)
consumer.subscribe([topic])
deadline = time.time() + 30.0
while time.time() < deadline and not consumer.assignment():
consumer.poll(0.5)
# belt-and-suspenders: drain anything already at the tail
while consumer.poll(0.2) is not None:
pass
return consumer


def wait_for_validated(consumer: Consumer, event_id: str, timeout_s: float) -> float | None:
"""Poll ``events.validated`` until our event_id appears; return the perf_counter."""
needle = event_id.encode()
deadline = time.perf_counter() + timeout_s
while time.perf_counter() < deadline:
msg = consumer.poll(0.1)
if msg is None or msg.error():
continue
value = msg.value()
if value and needle in value:
return time.perf_counter()
return None


def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--bootstrap", default=os.getenv("KAFKA_BOOTSTRAP", "localhost:19092"))
parser.add_argument("--source-topic", default="orders.raw")
parser.add_argument("--validated-topic", default="events.validated")
parser.add_argument("--iterations", type=int, default=30, help="measured samples")
parser.add_argument("--warmup", type=int, default=3, help="discarded leading samples")
parser.add_argument("--timeout-seconds", type=float, default=30.0)
parser.add_argument("--report-json", default=DEFAULT_REPORT)
args = parser.parse_args()

producer = Producer(
{
"bootstrap.servers": args.bootstrap,
"broker.address.family": "v4",
"linger.ms": 0,
"acks": "1",
}
)
consumer = make_validated_consumer(args.bootstrap, args.validated_topic)
if not consumer.assignment():
print(
"ERROR: consumer never got an assignment for "
f"{args.validated_topic!r} on {args.bootstrap!r}",
file=sys.stderr,
)
return 2

print(
f"bootstrap={args.bootstrap} source={args.source_topic} "
f"validated={args.validated_topic} "
f"warmup={args.warmup} iterations={args.iterations}"
)

samples: list[float] = []
misses = 0
total = args.warmup + args.iterations
for i in range(total):
is_warmup = i < args.warmup
amount = Decimal(f"{random.randint(100, 999999) / 100:.2f}")
event = build_order_event(amount, i)
event_id = event["event_id"]
payload = json.dumps(event).encode()

t0 = time.perf_counter()
producer.produce(args.source_topic, value=payload)
producer.flush(5)
reflected = wait_for_validated(consumer, event_id, args.timeout_seconds)

tag = " (warmup)" if is_warmup else ""
if reflected is None:
misses += 1
print(f"[{i:3d}] MISS (>{args.timeout_seconds:.0f}s){tag} id={event_id}")
else:
sample_ms = (reflected - t0) * 1000.0
if not is_warmup:
samples.append(sample_ms)
print(f"[{i:3d}] {sample_ms:9.1f} ms{tag}")
time.sleep(random.uniform(0.2, 0.8))

consumer.close()
producer.flush(5)

if not samples:
print(
"\nNO MEASURED SAMPLES -- every event timed out. Is the Flink job "
"RUNNING and consuming orders.raw? Check http://localhost:8081",
file=sys.stderr,
)
return 1

summary = {
"samples": len(samples),
"misses": misses,
"p50_ms": round(percentile(samples, 0.50), 1),
"p95_ms": round(percentile(samples, 0.95), 1),
"p99_ms": round(percentile(samples, 0.99), 1),
"min_ms": round(min(samples), 1),
"max_ms": round(max(samples), 1),
"mean_ms": round(statistics.mean(samples), 1),
}
report = {
"benchmark": "event-to-validation-freshness-realpath",
"path": (
"produce(orders.raw) -> Flink stream_processor "
"(validate/enrich/dedup) -> events.validated"
),
"generated": datetime.now(UTC).isoformat(),
"bootstrap": args.bootstrap,
"source_topic": args.source_topic,
"validated_topic": args.validated_topic,
"timeout_seconds": args.timeout_seconds,
"system": {
"python": platform.python_version(),
"platform": platform.platform(),
},
"summary": summary,
"samples_ms": [round(s, 1) for s in samples],
}

os.makedirs(os.path.dirname(args.report_json) or ".", exist_ok=True)
with open(args.report_json, "w", encoding="utf-8") as fh:
json.dump(report, fh, indent=2)

print("\n=== Real-path freshness (produce -> Flink -> events.validated) ===")
print(f" samples : {summary['samples']} (misses: {summary['misses']})")
print(f" p50 : {summary['p50_ms']:.1f} ms")
print(f" p95 : {summary['p95_ms']:.1f} ms")
print(f" p99 : {summary['p99_ms']:.1f} ms")
print(f" min/max : {summary['min_ms']:.1f} / {summary['max_ms']:.1f} ms")
print(f" mean : {summary['mean_ms']:.1f} ms")
print(f"\nwrote {args.report_json}")
return 0


if __name__ == "__main__":
raise SystemExit(main())
Loading
Loading