Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/processing/local_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,14 @@ def _generate_random_event() -> tuple[str, dict]:
return topic, json.loads(event.model_dump_json())


def _format_rate(total: float, elapsed: float) -> str:
# Guard against a zero elapsed window: on coarse-resolution monotonic clocks
# (e.g. Windows) the first 100-event progress tick of a --burst run can land
# within a single clock tick (elapsed == 0.0), which would raise
# ZeroDivisionError in the progress log. (audit_28_06_26.md §5 low)
return f"{total / max(elapsed, 0.001):.0f} evt/s"


def run(events_per_second: int = 10, burst: int = 0) -> None:
"""Run the local pipeline."""
configure_logging()
Expand Down Expand Up @@ -424,7 +432,7 @@ def run(events_per_second: int = 10, burst: int = 0) -> None:
total=total,
valid=valid,
invalid=invalid,
rate=f"{total / elapsed:.0f} evt/s",
rate=_format_rate(total, elapsed),
)

if burst == 0:
Expand All @@ -441,7 +449,7 @@ def run(events_per_second: int = 10, burst: int = 0) -> None:
valid=valid,
invalid=invalid,
duration_s=round(elapsed, 1),
avg_rate=f"{total / max(elapsed, 0.001):.0f} evt/s",
avg_rate=_format_rate(total, elapsed),
)


Expand Down
20 changes: 20 additions & 0 deletions tests/unit/test_local_pipeline_rate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""_format_rate guards the --burst progress log against a zero elapsed window.

audit_28_06_26.md §5 (low): on coarse-resolution monotonic clocks the first
100-event progress tick of a burst run can have elapsed == 0.0, which raised
ZeroDivisionError in the inline ``total / elapsed`` rate string.
"""

from __future__ import annotations

from src.processing.local_pipeline import _format_rate


def test_format_rate_handles_zero_elapsed_without_zerodivision() -> None:
# Must not raise even when 100 events report a 0.0s elapsed window.
rate = _format_rate(100, 0.0)
assert rate.endswith("evt/s")


def test_format_rate_computes_events_per_second() -> None:
assert _format_rate(100, 2.0) == "50 evt/s"
Loading