Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/processing/flink_jobs/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,17 @@ class ValidateAndEnrich(ProcessFunction):
"""

def process_element(
self, value: str, ctx: ProcessFunction.Context
) -> Iterator[tuple[str, str]]:
self,
value: str,
ctx: ProcessFunction.Context, # noqa: ARG002 - required by the ProcessFunction interface
) -> Iterator[tuple[str | OutputTag, str]]:
# PyFlink routes side outputs by *yielding* ``(OutputTag, value)`` — the
# Java ``ctx.output(tag, value)`` API does not exist on pyflink's
# ProcessFunction context (``InternalProcessFunctionContext`` has no
# ``.output``), so emitting via ctx raised AttributeError and broke the
# dead-letter path. The main output yields ``(event_id, payload)``; the
# framework tells them apart by the first element's type (str vs
# OutputTag), so the tuple main output is unambiguous. (R4 follow-up)
from datetime import UTC, datetime

from src.processing.transformations.enrichment import (
Expand All @@ -91,7 +100,7 @@ def process_element(
try:
event = json.loads(value)
except json.JSONDecodeError as e:
ctx.output(
yield (
DEAD_LETTER_TAG,
json.dumps(
{
Expand All @@ -112,7 +121,7 @@ def process_element(
# available so tenant resolution sees the prefixed topic.
event = normalize_debezium_event(event, topic=event.get("topic"))
except ValueError as e:
ctx.output(
yield (
DEAD_LETTER_TAG,
json.dumps(
{
Expand All @@ -132,7 +141,7 @@ def process_element(
# 2. Schema validation (Pydantic models)
schema_result = validate_event(event)
if not schema_result.is_valid:
ctx.output(
yield (
DEAD_LETTER_TAG,
json.dumps(
{
Expand Down Expand Up @@ -160,7 +169,7 @@ def process_element(
if i.severity == "error"
]
if error_issues:
ctx.output(
yield (
DEAD_LETTER_TAG,
json.dumps(
{
Expand Down
38 changes: 24 additions & 14 deletions tests/unit/test_stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,14 @@ def execute(self, job_name):


class _FakeProcessContext:
def __init__(self):
self.outputs = []
"""Stand-in for pyflink's ProcessFunction.Context.

def output(self, tag, value):
self.outputs.append((tag, value))
Real pyflink has **no** ``ctx.output()`` — that is the Java side-output API
(``InternalProcessFunctionContext`` lacks it). Side outputs are emitted by
*yielding* ``(OutputTag, value)``. The fake deliberately omits ``output`` so
a regression back to ``ctx.output(...)`` fails loudly here, not only on the
live cluster.
"""


class _ValidationResult:
Expand Down Expand Up @@ -522,9 +525,11 @@ def test_process_element_routes_corrupt_json_to_dlq(stream_processor, monkeypatc

emitted = list(processor.process_element("{bad json", ctx))

assert emitted == []
assert len(ctx.outputs) == 1
tag, payload = ctx.outputs[0]
# Invalid events are routed to the dead-letter side output by *yielding*
# (DEAD_LETTER_TAG, payload); the framework distinguishes side from main
# output by the first element's type. Nothing else is emitted.
assert len(emitted) == 1
tag, payload = emitted[0]
assert tag is stream_processor.DEAD_LETTER_TAG
assert json.loads(payload)["stage"] == "parse"

Expand All @@ -542,8 +547,10 @@ def test_process_element_routes_schema_errors_to_dlq(stream_processor, monkeypat

emitted = list(processor.process_element(json.dumps({"event_type": "order.created"}), ctx))

assert emitted == []
assert json.loads(ctx.outputs[0][1]) == {
assert len(emitted) == 1
tag, payload = emitted[0]
assert tag is stream_processor.DEAD_LETTER_TAG
assert json.loads(payload) == {
"event_id": "unknown",
"error": [{"type": "missing", "field": "event_id"}],
"stage": "schema_validation",
Expand Down Expand Up @@ -580,7 +587,7 @@ def _validate_event(event):
pairs = list(processor.process_element(value, ctx))
emitted = [json.loads(payload) for _, payload in pairs]

assert ctx.outputs == []
assert all(tag is not stream_processor.DEAD_LETTER_TAG for tag, _ in pairs)
assert validated_events[0]["event_type"] == "order.created"
assert validated_events[0]["operation"] == "insert"
assert emitted[0]["source"] == "postgres_cdc"
Expand Down Expand Up @@ -608,8 +615,10 @@ def test_process_element_routes_semantic_errors_to_dlq(stream_processor, monkeyp

emitted = list(processor.process_element(value, ctx))

assert emitted == []
assert json.loads(ctx.outputs[0][1]) == {
assert len(emitted) == 1
tag, payload = emitted[0]
assert tag is stream_processor.DEAD_LETTER_TAG
assert json.loads(payload) == {
"event_id": "evt-1",
"error": [
{
Expand Down Expand Up @@ -644,9 +653,10 @@ def test_process_element_ignores_semantic_warnings(stream_processor, monkeypatch
}
)

emitted = [json.loads(payload) for _, payload in processor.process_element(value, ctx)]
pairs = list(processor.process_element(value, ctx))
emitted = [json.loads(payload) for _, payload in pairs]

assert ctx.outputs == []
assert all(tag is not stream_processor.DEAD_LETTER_TAG for tag, _ in pairs)
assert emitted[0]["enriched_by"] == "payment"


Expand Down