Skip to content

fix(flink): route dead-letter via yield (PyFlink has no ctx.output) — R4 follow-up#124

Merged
brownjuly2003-code merged 3 commits into
mainfrom
fix/flink-deadletter-side-output
Jun 30, 2026
Merged

fix(flink): route dead-letter via yield (PyFlink has no ctx.output) — R4 follow-up#124
brownjuly2003-code merged 3 commits into
mainfrom
fix/flink-deadletter-side-output

Conversation

@brownjuly2003-code

Copy link
Copy Markdown
Owner

Problem

ValidateAndEnrich.process_element routed invalid events with
ctx.output(DEAD_LETTER_TAG, …) — the Java side-output API. PyFlink's
ProcessFunction context (InternalProcessFunctionContext) has no .output(),
so every invalid event raised AttributeError and the dead-letter path was
silently broken. Valid events were unaffected (they never hit ctx.output),
which is why it went unnoticed until the #122 real-path run surfaced it
(docs/perf/freshness-realpath-2026-06-30.md, "Bugs found", item 3).

Fix

PyFlink emits side outputs by yielding (OutputTag, value). The framework
tells main from side output by the first tuple element's type, so the main
(event_id, payload) yield stays unambiguous. All four dead-letter emits
(parse / cdc-normalization / schema / semantic failures) now yield the tag.

Verification — end-to-end on the live Flink cluster

Built and ran the real stack (docker-compose.yml + docker-compose.flink.yml,
PyFlink 2.2.1, Kafka→Flink→Kafka) on the Mac; job reached RUNNING, then
produced three events to orders.raw:

Event Routed to Result
schema-valid order.created events.validated ✅ PASS (main output intact)
malformed JSON events.deadletter (stage=parse) ✅ PASS
valid JSON, schema-invalid events.deadletter (stage=schema_validation) ✅ PASS

Before the fix the dead-letter topic received nothing. (flink-smoke CI also
exercises a live job submission.)

This closes the documented R4 follow-up; road-to-9.8.md R4 is otherwise ✅.

🤖 Generated with Claude Code

…tput

ValidateAndEnrich routed invalid events with `ctx.output(DEAD_LETTER_TAG, …)`
— the Java side-output API. PyFlink's ProcessFunction context
(InternalProcessFunctionContext) has no `.output()`, so every invalid event
raised `AttributeError` and the dead-letter path was broken (documented in
docs/perf/freshness-realpath-2026-06-30.md from the #122 real-path run; valid
events were unaffected because they never hit ctx.output).

PyFlink emits side outputs by *yielding* `(OutputTag, value)`; the framework
distinguishes main from side output by the first tuple element's type, so the
main `(event_id, payload)` yield stays unambiguous. All four dead-letter emits
(parse / cdc-normalization / schema / semantic failures) now yield the tag.

Verified end-to-end on the live Flink cluster (pyflink 2.2.1, real
Kafka→Flink→Kafka path on the Mac, job RUNNING): a malformed-JSON event lands
on events.deadletter with stage=parse, a schema-invalid event lands with
stage=schema_validation, and a schema-valid event still flows to
events.validated (main output intact). Before the fix the dead-letter topic
received nothing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions

github-actions Bot commented Jun 30, 2026

Copy link
Copy Markdown

DORA Metrics

  • Window: last 30 days
  • Branch: main
  • Deployment frequency: 135 total / 31.5 per week
  • Lead time for changes: avg 0.31h / median 0.0h
  • Change failure rate: 79.26% (107/135)
  • MTTR: 0.25h across 3 incident(s)

JuliaEdom and others added 2 commits June 30, 2026 20:33
…ut fake

The stream_processor unit tests routed invalid events through a
`_FakeProcessContext.output()` stub and asserted on `ctx.outputs` — but real
pyflink has no `ctx.output()`, so that fake masked the very AttributeError the
yield fix addresses (the dead-letter path looked tested while it was broken on
the cluster). The fake now omits `output` entirely (a regression to ctx.output
fails loudly), and the three DLQ tests assert the dead-letter is *yielded* as
`(DEAD_LETTER_TAG, payload)`; the two valid-path tests assert no DLQ tuple is
emitted. 20 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@brownjuly2003-code brownjuly2003-code enabled auto-merge (squash) June 30, 2026 17:43
@brownjuly2003-code brownjuly2003-code merged commit 2dda2e4 into main Jun 30, 2026
23 checks passed
@brownjuly2003-code brownjuly2003-code deleted the fix/flink-deadletter-side-output branch June 30, 2026 17:49
brownjuly2003-code added a commit that referenced this pull request Jun 30, 2026
…eak + 1 LOW DDL race) (#125)

* fix(security): mask PII renamed above an inner SELECT * (D2 #123 residual)

The #123 D2 fix resolves projection lineage so a PII column renamed through a
subquery/CTE is masked by what it is built from. But when an inner `SELECT *`
sits *below* the rename — e.g.
`SELECT c FROM (SELECT email AS c FROM (SELECT * FROM users_enriched) z) t` —
sqlglot.lineage walks past the renamed `email` node to the bare `*` leaf and
returns a plain `frozenset({'*'})`. That is NOT the `_UnresolvedSources`
sentinel (which only fires on a lineage *exception*), so `email` is absent from
the source set and the column fails **open** as cleartext with no X-PII-Masked
signal; the shallow scan sees only the outer alias `c`. The #123 deep|shallow
union only closes a star one level *above* the rename.

A `*` lineage leaf means the column could carry any source column of that
table, including PII, so treat it as unresolved and fail closed (mask) — the
same policy the module already applies on a lineage exception.

Regression test (subquery and CTE forms) fails on old code (cleartext, was
masked=False) and passes on new; the existing masking/property/mutation suites
stay green. Independently reproduced against live DuckDB before and after.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(api): lock the webhook delivery-queue lazy DDL (#123 residual)

The #123 lock fix serialized the three offloaded read-handler `ensure_*_table`
helpers behind `catalog_ddl_lock`, but missed `ensure_webhook_delivery_queue_
table`: the dispatcher runs its lazy `CREATE TABLE IF NOT EXISTS webhook_
delivery_queue` on the shared serving connection from the event loop, while an
offloaded read handler runs its own (now-locked) `ensure_*` on a worker thread.
Concurrent catalog DDL on one cold DuckDB raises a "Catalog write-write
conflict" across *different* tables too, so the cross-table 500-on-cold-restart
the #123 fix set out to remove was still reachable through this unlocked site.

Wrap its CREATE in the same shared `catalog_ddl_lock` as its three siblings
(internal-wrap pattern; the two callers do not hold the lock, so no nesting).

Regression: add `ensure_webhook_delivery_queue_table` to the concurrency
harness's ensurer set so both the 32-thread same-table and the cross-table
Barrier hammers cover it — both fail on old code (verified: 23+ and 11+
conflicts on the queue table) and pass with the lock.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: JuliaEdom <uedomskikh@gmail.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants