Skip to content

perf: measure event freshness on the real Kafka->Flink path (R4)#122

Merged
brownjuly2003-code merged 2 commits into
mainfrom
perf/flink-realpath-freshness
Jun 30, 2026
Merged

perf: measure event freshness on the real Kafka->Flink path (R4)#122
brownjuly2003-code merged 2 commits into
mainfrom
perf/flink-realpath-freshness

Conversation

@brownjuly2003-code

Copy link
Copy Markdown
Owner

What

Closes the R4 gap — event-driven freshness measured on the real Kafka→Flink path, not just the in-process DuckDB shortcut.

Two bug fixes (the Flink cluster path had never run end-to-end)

  1. docker-compose.yml — Flink bind-host. JM/TM FLINK_PROPERTIES set jobmanager.rpc.address but never bind-host, so the JobManager bound RPC to localhost and a separate TaskManager container got connection refused on :6123 — the cluster could not form. Fix: jobmanager/taskmanager.bind-host: 0.0.0.0 (+ rest.bind-host).
  2. stream_processor.pyStateTtlConfig timedelta. The dedup operator passed StateTtlConfig.new_builder(timedelta(minutes=10)); pyflink calls .to_milliseconds() on the argument, which a timedelta lacks → the Python worker crashed on every event. Fix: Time.minutes(10) (same class as the already-fixed watermark timedeltaDuration). Unit-test pyflink stub + TTL assertion updated.

Measurement

Single-node Mac stack (Flink 2.2.1, 1 TaskManager, Kafka KRaft, MinIO checkpoints):

Metric Real Kafka→Flink In-process shortcut
p50 2.50 s 1.06 s
p95 10.1 s 1.99 s

n=30, 0 misses. 24/30 samples sit in a tight 2.1–2.7 s band; the tail comes from periodic checkpoint/Beam-bundle/GC pauses on the single-node VM. Report + methodology: docs/perf/freshness-realpath-2026-06-30.md; driver: scripts/benchmark_freshness_realpath.py.

These measure complementary segments — the shortcut measures event→serving-metric (DuckDB, deliberately not wired to the real stream), this measures the real Kafka→Flink streaming hop.

Known remaining issue (documented, not fixed)

ValidateAndEnrich routes invalid events with the Java ctx.output() side-output API, which pyflink's ProcessFunction context lacks → the dead-letter path is broken for invalid events. Valid events (this benchmark) are unaffected.

Verification

ruff + mypy clean; tests/unit/test_stream_processor.py 20 passed; driver ran live against the real stack (30 samples, 0 misses).

🤖 Generated with Claude Code

Bring up the real streaming path (Kafka -> Flink stream_processor ->
events.validated) and measure end-to-end freshness, closing the gap
where only the in-process DuckDB-shortcut number existed.

Fix two latent bugs that kept the Flink cluster path from running
end-to-end (the live-cluster smoke had only ever failed earlier on the
unrelated watermark timedelta bug, so these were never reached):

- docker-compose: set jobmanager/taskmanager bind-host to 0.0.0.0 so a
  separate TaskManager container can reach the JobManager RPC. It was
  bound to localhost, so the TM got "connection refused" on :6123 and
  the cluster could not form.
- stream_processor: StateTtlConfig.new_builder needs a pyflink Time, not
  a datetime.timedelta -- pyflink calls .to_milliseconds() on it, which a
  timedelta lacks, so the dedup operator's Python worker crashed on every
  event. Same class as the already-fixed watermark timedelta->Duration.
  Updates the unit test's pyflink stub + TTL assertion accordingly.

Measured on a single-node Mac stack (Flink 2.2.1, 1 TaskManager, Kafka
KRaft, MinIO checkpoints): p50 2.50s / p95 10.1s (n=30, 0 misses) vs the
1.06s/1.99s in-process shortcut -- the real streaming hop adds the Beam
Python portability + Kafka + dedup cost. Report + methodology in
docs/perf/freshness-realpath-2026-06-30.md; driver in
scripts/benchmark_freshness_realpath.py.

Known remaining issue (documented, not fixed here): ValidateAndEnrich
routes invalid events with the Java ctx.output() side-output API, which
pyflink's ProcessFunction context lacks, so the dead-letter path is
broken for invalid events. Valid events (this benchmark) are unaffected.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions

Copy link
Copy Markdown

DORA Metrics

  • Window: last 30 days
  • Branch: main
  • Deployment frequency: 133 total / 31.03 per week
  • Lead time for changes: avg 0.32h / median 0.0h
  • Change failure rate: 78.2% (104/133)
  • MTTR: 0.25h across 3 incident(s)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@brownjuly2003-code brownjuly2003-code merged commit 31eb900 into main Jun 30, 2026
21 of 22 checks passed
@brownjuly2003-code brownjuly2003-code deleted the perf/flink-realpath-freshness branch June 30, 2026 14:05
brownjuly2003-code added a commit that referenced this pull request Jun 30, 2026
…IGH + 2 MEDIUM) (#123)

* fix(security): reject WITH RECURSIVE CTE shadowing a tenant table (D1 bypass)

The D1 fix (f153b23) re-scopes a physical table reference shadowed by a
non-recursive CTE of the same name, but a recursive CTE *can* self-
reference, so sqlglot keeps its name in its own body scope: the physical
anchor reference (the first UNION branch, which cannot self-reference) is
mis-classified as a CTE reference and never re-scoped — it stays bound to
the shared `main` schema and leaks every tenant's rows. _scope_sql is the
sole tenant-isolation mechanism (one DuckDB, schema-per-tenant), so this
is a full cross-tenant read from a single valid SELECT.

There is no safe re-scoping of a recursive anchor (genuinely ambiguous
with the recursion) and no legitimate query names a recursive CTE after a
physical table, so fail closed at both layers: validate_nl_sql rejects the
shape (the NL/LLM gate) and _scope_sql raises for any other caller
(defense-in-depth). Non-recursive shadows keep the safe re-scope path.

Regression tests fail on old code (validate accepts; _scope_sql leaks),
pass on new. Verified e2e: the recursive attack through execute_nl_query
now returns 403; the legitimate tenant_a query still sees only its rows.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(security): mask PII renamed through a subquery/CTE (D2 lineage bypass)

The D2 fix (faa6c77) mapped each output column to the source names in the
*outermost* projection only, so a PII column renamed at an inner level —
`SELECT contact FROM (SELECT email AS contact FROM users_enriched) t` (and
the CTE/double-rename variants) — never matched the `email` rule and was
returned as cleartext with no X-PII-Masked signal.

Resolve true projection lineage with sqlglot.lineage, tracing each output
column to its ultimate source columns across subqueries, CTEs and union
branches. Union the deep lineage sources with the shallow columns named
directly in the projection: lineage is blind through an inner `SELECT *`
(no schema to expand it) where the shallow scan still catches a direct
`email AS contact`, and the shallow scan misses the inner rename lineage
catches — either alone leaks one shape, the union closes both. A lineage
failure falls back to a sentinel source set that matches every rule field
(fail closed), so an unresolvable column is masked, never leaked.

Regression test covers subquery, CTE and double-rename renames; the full
masking + property + router suites stay green. SELECT * still falls back to
name-matching (its outputs are the source names verbatim).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(api): serialize lazy table DDL to stop cold-DB concurrent-create 500s

The #120 read-handler offload moved deadletter/webhook-log/alert-history
reads onto worker threads (run_in_threadpool) with a dedicated cursor, but
each still calls its ensure_*_table() — a lazy CREATE TABLE IF NOT EXISTS /
ALTER ... ADD COLUMN IF NOT EXISTS — on that cursor. Pre-#120 these ran
serialized on the event loop; the offload let them run truly in parallel,
and concurrent catalog DDL on one DuckDB raises "Catalog write-write
conflict" (across different tables too, not just the same one — the catalog
is a single versioned structure). The serving store defaults to :memory:,
cold on every restart, so a concurrent burst of cold reads returned HTTP
500s until one request won the CREATE race.

Serialize every ensure_*_table behind one shared process-wide lock
(src/db_concurrency.catalog_ddl_lock): the first thread creates, the rest
see a warm no-op (warm DDL doesn't conflict). The lock is held only for the
brief CREATE/ALTER, never around queries and never nested. Also close the
freshly-opened read cursor in deadletter._read_cursor if the DDL raises,
before the handler's own try/finally takes over (no per-failure leak).

Regression tests fire 32 threads at one ensure_* and 12 each across all
three behind a Barrier: reliably raise without the lock (verified 16-30
conflicts), green with it. A shared-instance test pins the single lock so a
future per-table lock can't reintroduce the cross-table race.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: JuliaEdom <uedomskikh@gmail.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
brownjuly2003-code pushed a commit that referenced this pull request Jun 30, 2026
…tput

ValidateAndEnrich routed invalid events with `ctx.output(DEAD_LETTER_TAG, …)`
— the Java side-output API. PyFlink's ProcessFunction context
(InternalProcessFunctionContext) has no `.output()`, so every invalid event
raised `AttributeError` and the dead-letter path was broken (documented in
docs/perf/freshness-realpath-2026-06-30.md from the #122 real-path run; valid
events were unaffected because they never hit ctx.output).

PyFlink emits side outputs by *yielding* `(OutputTag, value)`; the framework
distinguishes main from side output by the first tuple element's type, so the
main `(event_id, payload)` yield stays unambiguous. All four dead-letter emits
(parse / cdc-normalization / schema / semantic failures) now yield the tag.

Verified end-to-end on the live Flink cluster (pyflink 2.2.1, real
Kafka→Flink→Kafka path on the Mac, job RUNNING): a malformed-JSON event lands
on events.deadletter with stage=parse, a schema-invalid event lands with
stage=schema_validation, and a schema-valid event still flows to
events.validated (main output intact). Before the fix the dead-letter topic
received nothing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
brownjuly2003-code added a commit that referenced this pull request Jun 30, 2026
… R4 follow-up (#124)

* fix(flink): route dead-letter events via yield, not the absent ctx.output

ValidateAndEnrich routed invalid events with `ctx.output(DEAD_LETTER_TAG, …)`
— the Java side-output API. PyFlink's ProcessFunction context
(InternalProcessFunctionContext) has no `.output()`, so every invalid event
raised `AttributeError` and the dead-letter path was broken (documented in
docs/perf/freshness-realpath-2026-06-30.md from the #122 real-path run; valid
events were unaffected because they never hit ctx.output).

PyFlink emits side outputs by *yielding* `(OutputTag, value)`; the framework
distinguishes main from side output by the first tuple element's type, so the
main `(event_id, payload)` yield stays unambiguous. All four dead-letter emits
(parse / cdc-normalization / schema / semantic failures) now yield the tag.

Verified end-to-end on the live Flink cluster (pyflink 2.2.1, real
Kafka→Flink→Kafka path on the Mac, job RUNNING): a malformed-JSON event lands
on events.deadletter with stage=parse, a schema-invalid event lands with
stage=schema_validation, and a schema-valid event still flows to
events.validated (main output intact). Before the fix the dead-letter topic
received nothing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* test(flink): assert dead-letter is yielded, drop the masking ctx.output fake

The stream_processor unit tests routed invalid events through a
`_FakeProcessContext.output()` stub and asserted on `ctx.outputs` — but real
pyflink has no `ctx.output()`, so that fake masked the very AttributeError the
yield fix addresses (the dead-letter path looked tested while it was broken on
the cluster). The fake now omits `output` entirely (a regression to ctx.output
fails loudly), and the three DLQ tests assert the dead-letter is *yielded* as
`(DEAD_LETTER_TAG, payload)`; the two valid-path tests assert no DLQ tuple is
emitted. 20 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: JuliaEdom <uedomskikh@gmail.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants