diff --git a/docker-compose.yml b/docker-compose.yml index b9cfbcf..12a749e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,6 +54,8 @@ services: environment: FLINK_PROPERTIES: | jobmanager.rpc.address: flink-jobmanager + jobmanager.bind-host: 0.0.0.0 + rest.bind-host: 0.0.0.0 state.backend.type: rocksdb execution.checkpointing.dir: s3://agentflow-lake/checkpoints s3.endpoint: http://minio:9000 @@ -79,6 +81,7 @@ services: environment: FLINK_PROPERTIES: | jobmanager.rpc.address: flink-jobmanager + taskmanager.bind-host: 0.0.0.0 taskmanager.numberOfTaskSlots: 4 state.backend.type: rocksdb s3.endpoint: http://minio:9000 diff --git a/docs/perf/freshness-realpath-2026-06-30.md b/docs/perf/freshness-realpath-2026-06-30.md new file mode 100644 index 0000000..dc0afa2 --- /dev/null +++ b/docs/perf/freshness-realpath-2026-06-30.md @@ -0,0 +1,127 @@ +# Event Freshness on the Real Kafka→Flink Path + +> Generated by `scripts/benchmark_freshness_realpath.py` against the live +> `docker-compose.yml` + `docker-compose.flink.yml` stack on a Mac. +> Machine-readable results: `.artifacts/freshness/realpath-current.json`. + +Measured: `2026-06-30` + +## Why this exists + +`docs/freshness-benchmark.md` measures freshness through the **in-process +DuckDB shortcut** — `src/processing/local_pipeline._process_event` writes the +serving store directly, skipping Kafka and Flink entirely. That number +(1.06 s p50 / 1.99 s p95) is honest about *what it measures*, but it does **not** +exercise the real streaming path. This benchmark closes that gap: it measures +freshness when an event actually traverses the **real broker and the real Flink +operators**. + +## What is measured + +One freshness sample is the wall-clock delay from producing an `order.created` +event to the Kafka source topic `orders.raw`, to the validated event landing on +`events.validated`, matched by `event_id`: + +``` +produce(orders.raw) + → Flink `stream_processor` job + (Kafka source → schema validation → enrichment → dedup by event_id) + → events.validated +``` + +The driver produces the canonical `OrderEvent` model (so the payload passes the +same validation the job applies), then consumes `events.validated` with a +fresh consumer group positioned at the tail, so it only observes the events it +produced. + +## System under test + +- Host: `deproject-mac`, macOS 13.7.8 (Intel i5-7500, 8 GB RAM, 4 cores) +- Docker: Colima (vz, 6 GiB / 4 CPU) +- Flink `2.2.1-java17`, single TaskManager (4 slots), job parallelism 2, + RocksDB state backend, checkpointing 30 s → MinIO (S3) +- Kafka `confluentinc/cp-kafka:7.7.0` (KRaft), `orders.raw` 6 partitions +- Driver Python 3.11.15, `confluent-kafka` to the Kafka HOST listener + (`localhost:19092`) + +## Results (n = 30, 0 misses) + +| Metric | Real Kafka→Flink path | In-process DuckDB shortcut* | +|--------|----------------------:|----------------------------:| +| p50 | **2.50 s** | 1.06 s | +| p95 | 10.11 s | 1.99 s | +| p99 | 15.42 s | — | +| min | 0.31 s | — | +| max | 16.09 s | 2.02 s | +| mean | 3.33 s | — | + +\* `docs/freshness-benchmark.md`, `event_driven` arm. **Not the same segment** — +see "Reading the numbers". + +**Distribution:** 24 of 30 samples fall in a tight **2.14–2.74 s** steady-state +band (the real per-event streaming latency). The tail is driven by **3 outliers** +(5.7 s, 13.8 s, 16.1 s) — periodic Flink checkpoint / Beam-bundle / GC pauses on +the single-node VM — which, at n = 30, dominate p95/p99. A longer run would +tighten the tail percentiles; the p50 is stable. + +## Reading the numbers + +- The two columns measure **complementary segments, not the same thing.** The + DuckDB shortcut measures *event → serving metric* (and is dominated by the 2 s + cache-invalidation poll); this benchmark measures *Kafka produce → validated + event on Kafka* through the real Flink operators. The serving/DuckDB metric + store is **not wired to the real streaming path** (Flink sinks to + `events.validated`; nothing bridges that topic into the serving DuckDB — that + is a deliberate demo shortcut, production target is a ClickHouse sink). So + there is no honest single "event → metric on the real path" number on this + stand; this is the real **streaming-hop** number. +- The real path's ~2.5 s p50 is the same order of magnitude as the headline + freshness claim. The extra ~1.4 s over the shortcut is the real cost of the + streaming hop: Kafka produce + source poll + the **Beam Python portability + overhead** (the UDFs run as external Python workers) + dedup keyed state + + the Kafka sink — none of which the in-process shortcut pays. + +## Bugs found bringing up the real path + +The Flink cluster path had never run end-to-end (the live-cluster smoke had only +ever failed earlier on an unrelated `timedelta` watermark bug). Bringing it up +surfaced three latent pyflink-2.x / Docker issues: + +1. **Flink `bind-host` (fixed, `docker-compose.yml`).** The JobManager/TaskManager + `FLINK_PROPERTIES` set `jobmanager.rpc.address` but never `bind-host`, so the + JM bound RPC to `localhost` and a separate TM container got *connection + refused* on `:6123` — the cluster could not form. Fix: `jobmanager.bind-host` + / `taskmanager.bind-host: 0.0.0.0` (+ `taskmanager.host`). +2. **`StateTtlConfig` timedelta (fixed, `stream_processor.py`).** The dedup + operator passed `StateTtlConfig.new_builder(timedelta(minutes=10))`; pyflink + calls `.to_milliseconds()` on the argument, which a Python `timedelta` lacks + → `AttributeError` crashed the Python worker on every event. Fix: + `Time.minutes(10)` (same class of bug as the already-fixed watermark + `timedelta`→`Duration`). +3. **`ctx.output` side output (found, not yet fixed).** `ValidateAndEnrich` + routes invalid events with `ctx.output(DEAD_LETTER_TAG, …)` — the Java side- + output API; in pyflink the `ProcessFunction` context has no `.output()` + (`AttributeError: 'InternalProcessFunctionContext' object has no attribute + 'output'`). **Valid** events are unaffected (this benchmark measures valid + events), but the dead-letter path is currently broken. Left as a follow-up so + the streaming-freshness measurement isn't blocked on it. + +## Reproduce + +On a Mac with Docker: + +```bash +# 1. bring up Kafka + MinIO + Flink (single TaskManager to fit a 6 GB VM) +docker compose -f docker-compose.yml -f docker-compose.flink.yml \ + up -d --build --scale flink-taskmanager=1 flink-job-runner +# (the bind-host fix in docker-compose.yml lets the JM/TM cluster form) + +# 2. wait for the job to reach RUNNING (http://localhost:8081), then measure +pip install -e ".[load]" +python scripts/benchmark_freshness_realpath.py --bootstrap localhost:19092 \ + --warmup 3 --iterations 30 +``` + +Each iteration: produce one schema-valid `order.created` event to `orders.raw`, +then poll `events.validated` until the matching `event_id` appears; the elapsed +time is one freshness sample. diff --git a/scripts/benchmark_freshness_realpath.py b/scripts/benchmark_freshness_realpath.py new file mode 100644 index 0000000..932d595 --- /dev/null +++ b/scripts/benchmark_freshness_realpath.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +"""Real-path event-to-validation freshness benchmark (Kafka -> Flink -> Kafka). + +Unlike ``scripts/benchmark_freshness.py`` -- which measures the in-process +``local_pipeline._process_event`` DuckDB shortcut -- this driver measures the +**real streaming path**: an ``order.created`` event is produced to the Kafka +source topic ``orders.raw``, processed by the live Flink ``stream_processor`` +job (schema validation -> enrichment -> deduplication), and emitted to +``events.validated``. One freshness sample is the wall-clock delay from produce +to the validated event landing on ``events.validated``, matched by ``event_id``. + +This is the honest "real Kafka->Flink path" number for the freshness headline: +the serving DuckDB metric store is deliberately fed by the in-process shortcut +(production target is a ClickHouse sink), so end-to-end event->metric on the +demo remains the shortcut figure documented in ``docs/freshness-benchmark.md``. +This script measures the streaming hop the shortcut skips, on the real broker +and the real Flink operators. + +Prerequisites: the Flink stack is up via +``docker compose -f docker-compose.yml -f docker-compose.flink.yml`` with the +``stream_processor`` job RUNNING, and this process can reach the Kafka HOST +listener (``localhost:19092`` by default). Run from the repo root so the +editable ``src`` package is importable (reuses the canonical event model): + + python scripts/benchmark_freshness_realpath.py --bootstrap localhost:19092 --iterations 30 +""" + +from __future__ import annotations + +import argparse +import json +import os +import platform +import random +import statistics +import sys +import time +import uuid +from datetime import UTC, datetime +from decimal import Decimal + +from confluent_kafka import Consumer, Producer + +DEFAULT_REPORT = ".artifacts/freshness/realpath-current.json" + + +def build_order_event(amount: Decimal, sequence: int) -> dict: + """A schema- and semantics-valid order.created event (reuses the repo model). + + Importing the canonical ``OrderEvent`` guarantees the produced payload passes + the same validation the Flink job applies, so a valid event lands on + ``events.validated`` rather than the dead-letter topic. + """ + from src.ingestion.schemas.events import ( + Currency, + EventType, + OrderEvent, + OrderItem, + OrderStatus, + ) + + event = OrderEvent( + event_id=str(uuid.uuid4()), + event_type=EventType.ORDER_CREATED, + timestamp=datetime.now(UTC), + source="freshness-realpath-benchmark", + # Schema pattern: ^ORD-\d{8}-\d{4,}$. The 9-prefixed sequence keeps these + # ids clear of any seed generator's 4-digit ids. + order_id=f"ORD-{datetime.now(UTC):%Y%m%d}-9{sequence:05d}", + user_id=f"USR-{random.randint(10000, 99999)}", # noqa: S311 - load shape, not crypto + status=OrderStatus.PENDING, + items=[OrderItem(product_id="PROD-001", quantity=1, unit_price=amount)], + total_amount=amount, + currency=Currency.USD, + ) + return json.loads(event.model_dump_json()) + + +def percentile(values: list[float], q: float) -> float: + if not values: + return float("nan") + ordered = sorted(values) + if len(ordered) == 1: + return ordered[0] + pos = (len(ordered) - 1) * q + lo = int(pos) + hi = min(lo + 1, len(ordered) - 1) + return ordered[lo] + (ordered[hi] - ordered[lo]) * (pos - lo) + + +def make_validated_consumer(bootstrap: str, topic: str) -> Consumer: + """A fresh-group consumer positioned at the END of ``topic``. + + A unique group id + ``auto.offset.reset=latest`` means we never replay the + historical contents of ``events.validated`` -- we only observe events we + produce during the run. The initial poll loop blocks until the assignment is + live, then drains anything already buffered so the first sample starts clean. + """ + consumer = Consumer( + { + "bootstrap.servers": bootstrap, + # Docker maps the Kafka HOST listener on IPv4 only; ``localhost`` + # resolves to ``::1`` first on macOS, so force the IPv4 family. + "broker.address.family": "v4", + "group.id": f"freshness-realpath-{uuid.uuid4()}", + "auto.offset.reset": "latest", + "enable.auto.commit": False, + } + ) + consumer.subscribe([topic]) + deadline = time.time() + 30.0 + while time.time() < deadline and not consumer.assignment(): + consumer.poll(0.5) + # belt-and-suspenders: drain anything already at the tail + while consumer.poll(0.2) is not None: + pass + return consumer + + +def wait_for_validated(consumer: Consumer, event_id: str, timeout_s: float) -> float | None: + """Poll ``events.validated`` until our event_id appears; return the perf_counter.""" + needle = event_id.encode() + deadline = time.perf_counter() + timeout_s + while time.perf_counter() < deadline: + msg = consumer.poll(0.1) + if msg is None or msg.error(): + continue + value = msg.value() + if value and needle in value: + return time.perf_counter() + return None + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--bootstrap", default=os.getenv("KAFKA_BOOTSTRAP", "localhost:19092")) + parser.add_argument("--source-topic", default="orders.raw") + parser.add_argument("--validated-topic", default="events.validated") + parser.add_argument("--iterations", type=int, default=30, help="measured samples") + parser.add_argument("--warmup", type=int, default=3, help="discarded leading samples") + parser.add_argument("--timeout-seconds", type=float, default=30.0) + parser.add_argument("--report-json", default=DEFAULT_REPORT) + args = parser.parse_args() + + producer = Producer( + { + "bootstrap.servers": args.bootstrap, + "broker.address.family": "v4", + "linger.ms": 0, + "acks": "1", + } + ) + consumer = make_validated_consumer(args.bootstrap, args.validated_topic) + if not consumer.assignment(): + print( + "ERROR: consumer never got an assignment for " + f"{args.validated_topic!r} on {args.bootstrap!r}", + file=sys.stderr, + ) + return 2 + + print( + f"bootstrap={args.bootstrap} source={args.source_topic} " + f"validated={args.validated_topic} " + f"warmup={args.warmup} iterations={args.iterations}" + ) + + samples: list[float] = [] + misses = 0 + total = args.warmup + args.iterations + for i in range(total): + is_warmup = i < args.warmup + amount = Decimal(f"{random.randint(100, 999999) / 100:.2f}") + event = build_order_event(amount, i) + event_id = event["event_id"] + payload = json.dumps(event).encode() + + t0 = time.perf_counter() + producer.produce(args.source_topic, value=payload) + producer.flush(5) + reflected = wait_for_validated(consumer, event_id, args.timeout_seconds) + + tag = " (warmup)" if is_warmup else "" + if reflected is None: + misses += 1 + print(f"[{i:3d}] MISS (>{args.timeout_seconds:.0f}s){tag} id={event_id}") + else: + sample_ms = (reflected - t0) * 1000.0 + if not is_warmup: + samples.append(sample_ms) + print(f"[{i:3d}] {sample_ms:9.1f} ms{tag}") + time.sleep(random.uniform(0.2, 0.8)) + + consumer.close() + producer.flush(5) + + if not samples: + print( + "\nNO MEASURED SAMPLES -- every event timed out. Is the Flink job " + "RUNNING and consuming orders.raw? Check http://localhost:8081", + file=sys.stderr, + ) + return 1 + + summary = { + "samples": len(samples), + "misses": misses, + "p50_ms": round(percentile(samples, 0.50), 1), + "p95_ms": round(percentile(samples, 0.95), 1), + "p99_ms": round(percentile(samples, 0.99), 1), + "min_ms": round(min(samples), 1), + "max_ms": round(max(samples), 1), + "mean_ms": round(statistics.mean(samples), 1), + } + report = { + "benchmark": "event-to-validation-freshness-realpath", + "path": ( + "produce(orders.raw) -> Flink stream_processor " + "(validate/enrich/dedup) -> events.validated" + ), + "generated": datetime.now(UTC).isoformat(), + "bootstrap": args.bootstrap, + "source_topic": args.source_topic, + "validated_topic": args.validated_topic, + "timeout_seconds": args.timeout_seconds, + "system": { + "python": platform.python_version(), + "platform": platform.platform(), + }, + "summary": summary, + "samples_ms": [round(s, 1) for s in samples], + } + + os.makedirs(os.path.dirname(args.report_json) or ".", exist_ok=True) + with open(args.report_json, "w", encoding="utf-8") as fh: + json.dump(report, fh, indent=2) + + print("\n=== Real-path freshness (produce -> Flink -> events.validated) ===") + print(f" samples : {summary['samples']} (misses: {summary['misses']})") + print(f" p50 : {summary['p50_ms']:.1f} ms") + print(f" p95 : {summary['p95_ms']:.1f} ms") + print(f" p99 : {summary['p99_ms']:.1f} ms") + print(f" min/max : {summary['min_ms']:.1f} / {summary['max_ms']:.1f} ms") + print(f" mean : {summary['mean_ms']:.1f} ms") + print(f"\nwrote {args.report_json}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/processing/flink_jobs/stream_processor.py b/src/processing/flink_jobs/stream_processor.py index 8d8da62..dd8e465 100644 --- a/src/processing/flink_jobs/stream_processor.py +++ b/src/processing/flink_jobs/stream_processor.py @@ -10,12 +10,11 @@ import json import os from collections.abc import Iterator -from datetime import timedelta from typing import Any from pyflink.common import Types, WatermarkStrategy from pyflink.common.serialization import SimpleStringSchema -from pyflink.common.time import Duration +from pyflink.common.time import Duration, Time from pyflink.common.watermark_strategy import TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.kafka import ( @@ -224,7 +223,7 @@ def open(self, runtime_context: Any) -> None: from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor ttl_config = ( - StateTtlConfig.new_builder(timedelta(minutes=10)) + StateTtlConfig.new_builder(Time.minutes(10)) .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) .build() ) diff --git a/tests/unit/test_stream_processor.py b/tests/unit/test_stream_processor.py index 284b7b0..8b5c20b 100644 --- a/tests/unit/test_stream_processor.py +++ b/tests/unit/test_stream_processor.py @@ -2,7 +2,7 @@ import json import sys import types -from datetime import UTC, datetime, timedelta +from datetime import UTC, datetime import pytest @@ -286,7 +286,16 @@ def __init__(self, millis): def of_seconds(cls, seconds): return cls(seconds * 1000) + class _Time: + def __init__(self, millis): + self.millis = millis + + @classmethod + def minutes(cls, minutes): + return cls(minutes * 60 * 1000) + time_module.Duration = _Duration + time_module.Time = _Time datastream = types.ModuleType("pyflink.datastream") datastream.__path__ = [] @@ -758,7 +767,7 @@ def test_open_initializes_ttl_state_for_deduplication(stream_processor): assert [descriptor.name for descriptor in runtime_context.descriptors] == ["seen"] assert runtime_context.descriptors[0].type_info == stream_processor.Types.BOOLEAN() - assert runtime_context.descriptors[0].ttl_config.ttl == timedelta(minutes=10) + assert runtime_context.descriptors[0].ttl_config.ttl.millis == 10 * 60 * 1000 assert runtime_context.descriptors[0].ttl_config.update_type == "OnCreateAndWrite"