diff --git a/src/processing/flink_jobs/stream_processor.py b/src/processing/flink_jobs/stream_processor.py index dd8e465..5da5e2b 100644 --- a/src/processing/flink_jobs/stream_processor.py +++ b/src/processing/flink_jobs/stream_processor.py @@ -75,8 +75,17 @@ class ValidateAndEnrich(ProcessFunction): """ def process_element( - self, value: str, ctx: ProcessFunction.Context - ) -> Iterator[tuple[str, str]]: + self, + value: str, + ctx: ProcessFunction.Context, # noqa: ARG002 - required by the ProcessFunction interface + ) -> Iterator[tuple[str | OutputTag, str]]: + # PyFlink routes side outputs by *yielding* ``(OutputTag, value)`` — the + # Java ``ctx.output(tag, value)`` API does not exist on pyflink's + # ProcessFunction context (``InternalProcessFunctionContext`` has no + # ``.output``), so emitting via ctx raised AttributeError and broke the + # dead-letter path. The main output yields ``(event_id, payload)``; the + # framework tells them apart by the first element's type (str vs + # OutputTag), so the tuple main output is unambiguous. (R4 follow-up) from datetime import UTC, datetime from src.processing.transformations.enrichment import ( @@ -91,7 +100,7 @@ def process_element( try: event = json.loads(value) except json.JSONDecodeError as e: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { @@ -112,7 +121,7 @@ def process_element( # available so tenant resolution sees the prefixed topic. event = normalize_debezium_event(event, topic=event.get("topic")) except ValueError as e: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { @@ -132,7 +141,7 @@ def process_element( # 2. Schema validation (Pydantic models) schema_result = validate_event(event) if not schema_result.is_valid: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { @@ -160,7 +169,7 @@ def process_element( if i.severity == "error" ] if error_issues: - ctx.output( + yield ( DEAD_LETTER_TAG, json.dumps( { diff --git a/tests/unit/test_stream_processor.py b/tests/unit/test_stream_processor.py index 8b5c20b..ec32932 100644 --- a/tests/unit/test_stream_processor.py +++ b/tests/unit/test_stream_processor.py @@ -141,11 +141,14 @@ def execute(self, job_name): class _FakeProcessContext: - def __init__(self): - self.outputs = [] + """Stand-in for pyflink's ProcessFunction.Context. - def output(self, tag, value): - self.outputs.append((tag, value)) + Real pyflink has **no** ``ctx.output()`` — that is the Java side-output API + (``InternalProcessFunctionContext`` lacks it). Side outputs are emitted by + *yielding* ``(OutputTag, value)``. The fake deliberately omits ``output`` so + a regression back to ``ctx.output(...)`` fails loudly here, not only on the + live cluster. + """ class _ValidationResult: @@ -522,9 +525,11 @@ def test_process_element_routes_corrupt_json_to_dlq(stream_processor, monkeypatc emitted = list(processor.process_element("{bad json", ctx)) - assert emitted == [] - assert len(ctx.outputs) == 1 - tag, payload = ctx.outputs[0] + # Invalid events are routed to the dead-letter side output by *yielding* + # (DEAD_LETTER_TAG, payload); the framework distinguishes side from main + # output by the first element's type. Nothing else is emitted. + assert len(emitted) == 1 + tag, payload = emitted[0] assert tag is stream_processor.DEAD_LETTER_TAG assert json.loads(payload)["stage"] == "parse" @@ -542,8 +547,10 @@ def test_process_element_routes_schema_errors_to_dlq(stream_processor, monkeypat emitted = list(processor.process_element(json.dumps({"event_type": "order.created"}), ctx)) - assert emitted == [] - assert json.loads(ctx.outputs[0][1]) == { + assert len(emitted) == 1 + tag, payload = emitted[0] + assert tag is stream_processor.DEAD_LETTER_TAG + assert json.loads(payload) == { "event_id": "unknown", "error": [{"type": "missing", "field": "event_id"}], "stage": "schema_validation", @@ -580,7 +587,7 @@ def _validate_event(event): pairs = list(processor.process_element(value, ctx)) emitted = [json.loads(payload) for _, payload in pairs] - assert ctx.outputs == [] + assert all(tag is not stream_processor.DEAD_LETTER_TAG for tag, _ in pairs) assert validated_events[0]["event_type"] == "order.created" assert validated_events[0]["operation"] == "insert" assert emitted[0]["source"] == "postgres_cdc" @@ -608,8 +615,10 @@ def test_process_element_routes_semantic_errors_to_dlq(stream_processor, monkeyp emitted = list(processor.process_element(value, ctx)) - assert emitted == [] - assert json.loads(ctx.outputs[0][1]) == { + assert len(emitted) == 1 + tag, payload = emitted[0] + assert tag is stream_processor.DEAD_LETTER_TAG + assert json.loads(payload) == { "event_id": "evt-1", "error": [ { @@ -644,9 +653,10 @@ def test_process_element_ignores_semantic_warnings(stream_processor, monkeypatch } ) - emitted = [json.loads(payload) for _, payload in processor.process_element(value, ctx)] + pairs = list(processor.process_element(value, ctx)) + emitted = [json.loads(payload) for _, payload in pairs] - assert ctx.outputs == [] + assert all(tag is not stream_processor.DEAD_LETTER_TAG for tag, _ in pairs) assert emitted[0]["enriched_by"] == "payment"