From 8853674211214c3d198c76f1aa1e539dce3aade9 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Tue, 30 Jun 2026 19:36:58 +0300 Subject: [PATCH 1/2] fix(flink): route dead-letter events via yield, not the absent ctx.output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ValidateAndEnrich routed invalid events with `ctx.output(DEAD_LETTER_TAG, …)` — the Java side-output API. PyFlink's ProcessFunction context (InternalProcessFunctionContext) has no `.output()`, so every invalid event raised `AttributeError` and the dead-letter path was broken (documented in docs/perf/freshness-realpath-2026-06-30.md from the #122 real-path run; valid events were unaffected because they never hit ctx.output). PyFlink emits side outputs by *yielding* `(OutputTag, value)`; the framework distinguishes main from side output by the first tuple element's type, so the main `(event_id, payload)` yield stays unambiguous. All four dead-letter emits (parse / cdc-normalization / schema / semantic failures) now yield the tag. Verified end-to-end on the live Flink cluster (pyflink 2.2.1, real Kafka→Flink→Kafka path on the Mac, job RUNNING): a malformed-JSON event lands on events.deadletter with stage=parse, a schema-invalid event lands with stage=schema_validation, and a schema-valid event still flows to events.validated (main output intact). Before the fix the dead-letter topic received nothing. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/processing/flink_jobs/stream_processor.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/processing/flink_jobs/stream_processor.py b/src/processing/flink_jobs/stream_processor.py index dd8e465..5da5e2b 100644 --- a/src/processing/flink_jobs/stream_processor.py +++ b/src/processing/flink_jobs/stream_processor.py @@ -75,8 +75,17 @@ class ValidateAndEnrich(ProcessFunction): """ def process_element( - self, value: str, ctx: ProcessFunction.Context - ) -> Iterator[tuple[str, str]]: + self, + value: str, + ctx: ProcessFunction.Context, # noqa: ARG002 - required by the ProcessFunction interface + ) -> Iterator[tuple[str | OutputTag, str]]: + # PyFlink routes side outputs by *yielding* ``(OutputTag, value)`` — the + # Java ``ctx.output(tag, value)`` API does not exist on pyflink's + # ProcessFunction context (``InternalProcessFunctionContext`` has no + # ``.output``), so emitting via ctx raised AttributeError and broke the + # dead-letter path. The main output yields ``(event_id, payload)``; the + # framework tells them apart by the first element's type (str vs + # OutputTag), so the tuple main output is unambiguous. (R4 follow-up) from datetime import UTC, datetime from src.processing.transformations.enrichment import ( @@ -91,7 +100,7 @@ def process_element( try: event = json.loads(value) except json.JSONDecodeError as e: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { @@ -112,7 +121,7 @@ def process_element( # available so tenant resolution sees the prefixed topic. event = normalize_debezium_event(event, topic=event.get("topic")) except ValueError as e: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { @@ -132,7 +141,7 @@ def process_element( # 2. Schema validation (Pydantic models) schema_result = validate_event(event) if not schema_result.is_valid: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { @@ -160,7 +169,7 @@ def process_element( if i.severity == "error" ] if error_issues: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { From 9158b23efaf86141180024817ea5d44bf71c93a2 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Tue, 30 Jun 2026 20:33:49 +0300 Subject: [PATCH 2/2] test(flink): assert dead-letter is yielded, drop the masking ctx.output fake MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stream_processor unit tests routed invalid events through a `_FakeProcessContext.output()` stub and asserted on `ctx.outputs` — but real pyflink has no `ctx.output()`, so that fake masked the very AttributeError the yield fix addresses (the dead-letter path looked tested while it was broken on the cluster). The fake now omits `output` entirely (a regression to ctx.output fails loudly), and the three DLQ tests assert the dead-letter is *yielded* as `(DEAD_LETTER_TAG, payload)`; the two valid-path tests assert no DLQ tuple is emitted. 20 passed. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/unit/test_stream_processor.py | 38 ++++++++++++++++++----------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/tests/unit/test_stream_processor.py b/tests/unit/test_stream_processor.py index 8b5c20b..ec32932 100644 --- a/tests/unit/test_stream_processor.py +++ b/tests/unit/test_stream_processor.py @@ -141,11 +141,14 @@ def execute(self, job_name): class _FakeProcessContext: - def __init__(self): - self.outputs = [] + """Stand-in for pyflink's ProcessFunction.Context. - def output(self, tag, value): - self.outputs.append((tag, value)) + Real pyflink has **no** ``ctx.output()`` — that is the Java side-output API + (``InternalProcessFunctionContext`` lacks it). Side outputs are emitted by + *yielding* ``(OutputTag, value)``. The fake deliberately omits ``output`` so + a regression back to ``ctx.output(...)`` fails loudly here, not only on the + live cluster. + """ class _ValidationResult: @@ -522,9 +525,11 @@ def test_process_element_routes_corrupt_json_to_dlq(stream_processor, monkeypatc emitted = list(processor.process_element("{bad json", ctx)) - assert emitted == [] - assert len(ctx.outputs) == 1 - tag, payload = ctx.outputs[0] + # Invalid events are routed to the dead-letter side output by *yielding* + # (DEAD_LETTER_TAG, payload); the framework distinguishes side from main + # output by the first element's type. Nothing else is emitted. + assert len(emitted) == 1 + tag, payload = emitted[0] assert tag is stream_processor.DEAD_LETTER_TAG assert json.loads(payload)["stage"] == "parse" @@ -542,8 +547,10 @@ def test_process_element_routes_schema_errors_to_dlq(stream_processor, monkeypat emitted = list(processor.process_element(json.dumps({"event_type": "order.created"}), ctx)) - assert emitted == [] - assert json.loads(ctx.outputs[0][1]) == { + assert len(emitted) == 1 + tag, payload = emitted[0] + assert tag is stream_processor.DEAD_LETTER_TAG + assert json.loads(payload) == { "event_id": "unknown", "error": [{"type": "missing", "field": "event_id"}], "stage": "schema_validation", @@ -580,7 +587,7 @@ def _validate_event(event): pairs = list(processor.process_element(value, ctx)) emitted = [json.loads(payload) for _, payload in pairs] - assert ctx.outputs == [] + assert all(tag is not stream_processor.DEAD_LETTER_TAG for tag, _ in pairs) assert validated_events[0]["event_type"] == "order.created" assert validated_events[0]["operation"] == "insert" assert emitted[0]["source"] == "postgres_cdc" @@ -608,8 +615,10 @@ def test_process_element_routes_semantic_errors_to_dlq(stream_processor, monkeyp emitted = list(processor.process_element(value, ctx)) - assert emitted == [] - assert json.loads(ctx.outputs[0][1]) == { + assert len(emitted) == 1 + tag, payload = emitted[0] + assert tag is stream_processor.DEAD_LETTER_TAG + assert json.loads(payload) == { "event_id": "evt-1", "error": [ { @@ -644,9 +653,10 @@ def test_process_element_ignores_semantic_warnings(stream_processor, monkeypatch } ) - emitted = [json.loads(payload) for _, payload in processor.process_element(value, ctx)] + pairs = list(processor.process_element(value, ctx)) + emitted = [json.loads(payload) for _, payload in pairs] - assert ctx.outputs == [] + assert all(tag is not stream_processor.DEAD_LETTER_TAG for tag, _ in pairs) assert emitted[0]["enriched_by"] == "payment"