From b0e4dab1e6721e1493da550cbb64fd977dfae918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Strausmann?= Date: Mon, 11 May 2026 08:34:42 +0000 Subject: [PATCH 1/3] feat(printer-models): Job lifecycle FSM with explicit state machine and terminal-event signalling Implements JobState (StrEnum), Job (dataclass), JobStateMachine, and InvalidStateTransitionError covering all valid transitions per the Brother raster spec (no mid-print cancel/pause). Terminal states signal asyncio.Event _done_event for the upcoming wait_for_job() in PR B (PrintQueue). Co-Authored-By: Claude Sonnet 4.6 --- backend/app/services/job_lifecycle.py | 123 ++++++++++++++++++ .../tests/unit/services/test_job_lifecycle.py | 81 ++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 backend/app/services/job_lifecycle.py create mode 100644 backend/tests/unit/services/test_job_lifecycle.py diff --git a/backend/app/services/job_lifecycle.py b/backend/app/services/job_lifecycle.py new file mode 100644 index 0000000..353301a --- /dev/null +++ b/backend/app/services/job_lifecycle.py @@ -0,0 +1,123 @@ +"""Job-lifecycle finite-state machine for the print queue. + +Brother PT/QL printers have no multi-job queue (TCP/9100 is a single stream). +The hub serialises jobs per printer and tracks state explicitly. This module +owns the state model only; the worker that drives transitions lives in +`app.services.print_queue`. + +State diagram: + + QUEUED ──> PRINTING ──> COMPLETED + │ └──────> FAILED + │ + ├──> PAUSED ──> QUEUED + │ └─────> CANCELLED + │ + ├──> CANCELLED + └──> FAILED + +Mid-print cancel and mid-print pause are NOT supported (Brother spec — +the printer ignores commands once the raster stream starts). +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from datetime import datetime +from enum import StrEnum +from typing import Any + + +class JobState(StrEnum): + QUEUED = "queued" + PAUSED = "paused" + PRINTING = "printing" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +# Terminal states are absorbing — no transitions out. +# PRINTING is one-way to COMPLETED or FAILED (Brother spec: no mid-print stop). +_VALID_TRANSITIONS: dict[JobState, frozenset[JobState]] = { + JobState.QUEUED: frozenset( + { + JobState.PRINTING, + JobState.PAUSED, + JobState.CANCELLED, + JobState.FAILED, + } + ), + JobState.PAUSED: frozenset({JobState.QUEUED, JobState.CANCELLED}), + JobState.PRINTING: frozenset({JobState.COMPLETED, JobState.FAILED}), + JobState.COMPLETED: frozenset(), + JobState.FAILED: frozenset(), + JobState.CANCELLED: frozenset(), +} + +_TERMINAL_STATES: frozenset[JobState] = frozenset( + { + JobState.COMPLETED, + JobState.FAILED, + JobState.CANCELLED, + } +) + + +class InvalidStateTransitionError(Exception): + """Raised when JobStateMachine.transition() is called with an illegal target.""" + + +@dataclass +class Job: + """A single print job. In-memory MVP; persistence comes in Phase 5.""" + + id: str + printer_id: str + state: JobState = JobState.QUEUED + submitted_at: datetime = field(default_factory=datetime.now) + started_at: datetime | None = None + finished_at: datetime | None = None + image_payload: bytes | None = None + tape_mm: int | None = None + options: dict[str, Any] = field(default_factory=dict) + error_msg: str | None = None + error_flags: int | None = None + retry_count: int = 0 + # asyncio.Event is allowed on a dataclass field because asyncio fields on + # Jobs are only awaited from within an event loop; constructing them at + # import time is OK since asyncio.Event does not require a running loop. + _done_event: asyncio.Event = field(default_factory=asyncio.Event) + + +class JobStateMachine: + """Stateless gate-keeper for Job.state mutations. + + All state changes MUST flow through `transition()`. Direct assignment to + `job.state` bypasses the validity check and the timestamp/event side + effects — don't do that in production code. + """ + + @staticmethod + def transition(job: Job, new_state: JobState) -> None: + """Move `job` to `new_state` if the transition is valid. + + Raises: + InvalidStateTransitionError: if `new_state` is not reachable from + `job.state`. The exception message names both states so log + entries are self-contained. + """ + if new_state not in _VALID_TRANSITIONS[job.state]: + raise InvalidStateTransitionError( + f"Illegal transition {job.state.value} -> {new_state.value}" + ) + + job.state = new_state + + if new_state == JobState.PRINTING and job.started_at is None: + job.started_at = datetime.now() + + if new_state in _TERMINAL_STATES: + job.finished_at = datetime.now() + job._done_event.set() diff --git a/backend/tests/unit/services/test_job_lifecycle.py b/backend/tests/unit/services/test_job_lifecycle.py new file mode 100644 index 0000000..d2ee6e8 --- /dev/null +++ b/backend/tests/unit/services/test_job_lifecycle.py @@ -0,0 +1,81 @@ +from datetime import datetime + +import pytest +from app.services.job_lifecycle import ( + InvalidStateTransitionError, + Job, + JobState, + JobStateMachine, +) + + +def test_job_queued_to_printing() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PRINTING) + assert job.state == JobState.PRINTING + assert job.started_at is not None + + +def test_job_printing_to_completed() -> None: + job = Job( + id="abc", + printer_id="pt750w", + state=JobState.PRINTING, + started_at=datetime.now(), + ) + JobStateMachine.transition(job, JobState.COMPLETED) + assert job.state == JobState.COMPLETED + assert job.finished_at is not None + + +def test_invalid_transition_completed_to_printing() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.COMPLETED) + with pytest.raises(InvalidStateTransitionError, match="completed"): + JobStateMachine.transition(job, JobState.PRINTING) + + +def test_cancel_only_from_queued_or_paused() -> None: + """Brother Raster Spec: no mid-print cancel.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + with pytest.raises(InvalidStateTransitionError, match="printing"): + JobStateMachine.transition(job, JobState.CANCELLED) + + +def test_pause_from_queued_to_paused() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PAUSED) + assert job.state == JobState.PAUSED + + +def test_resume_from_paused_to_queued() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.PAUSED) + JobStateMachine.transition(job, JobState.QUEUED) + assert job.state == JobState.QUEUED + + +def test_cancel_from_paused() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.PAUSED) + JobStateMachine.transition(job, JobState.CANCELLED) + assert job.state == JobState.CANCELLED + + +def test_pause_printing_not_allowed() -> None: + """Brother Raster Spec: no mid-print pause.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + with pytest.raises(InvalidStateTransitionError, match="printing"): + JobStateMachine.transition(job, JobState.PAUSED) + + +def test_done_event_set_on_terminal_state() -> None: + """Terminal transitions must signal the _done_event for wait_for_job().""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + assert not job._done_event.is_set() + JobStateMachine.transition(job, JobState.COMPLETED) + assert job._done_event.is_set() + + +def test_done_event_not_set_on_pause() -> None: + """Non-terminal transitions must NOT signal the _done_event.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PAUSED) + assert not job._done_event.is_set() From a44708b12349c3b4a0550b90cef6fc22f4a65e82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Strausmann?= Date: Mon, 11 May 2026 08:40:45 +0000 Subject: [PATCH 2/3] refactor(job-lifecycle): hide internal fields from repr/init and add terminal-state coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - image_payload: field(repr=False) — avoids spamming logs with raw bytes - _done_event: field(init=False, repr=False) — makes it truly internal; Job(..., _done_event=x) now raises TypeError instead of silently bypassing intended construction - Add three tests: FAILED and CANCELLED set _done_event + finished_at, and both terminal states absorb all outgoing transitions Co-Authored-By: Claude Sonnet 4.6 --- backend/app/services/job_lifecycle.py | 4 +-- .../tests/unit/services/test_job_lifecycle.py | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/backend/app/services/job_lifecycle.py b/backend/app/services/job_lifecycle.py index 353301a..62c7756 100644 --- a/backend/app/services/job_lifecycle.py +++ b/backend/app/services/job_lifecycle.py @@ -79,7 +79,7 @@ class Job: submitted_at: datetime = field(default_factory=datetime.now) started_at: datetime | None = None finished_at: datetime | None = None - image_payload: bytes | None = None + image_payload: bytes | None = field(default=None, repr=False) tape_mm: int | None = None options: dict[str, Any] = field(default_factory=dict) error_msg: str | None = None @@ -88,7 +88,7 @@ class Job: # asyncio.Event is allowed on a dataclass field because asyncio fields on # Jobs are only awaited from within an event loop; constructing them at # import time is OK since asyncio.Event does not require a running loop. - _done_event: asyncio.Event = field(default_factory=asyncio.Event) + _done_event: asyncio.Event = field(default_factory=asyncio.Event, init=False, repr=False) class JobStateMachine: diff --git a/backend/tests/unit/services/test_job_lifecycle.py b/backend/tests/unit/services/test_job_lifecycle.py index d2ee6e8..8218847 100644 --- a/backend/tests/unit/services/test_job_lifecycle.py +++ b/backend/tests/unit/services/test_job_lifecycle.py @@ -79,3 +79,30 @@ def test_done_event_not_set_on_pause() -> None: job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) JobStateMachine.transition(job, JobState.PAUSED) assert not job._done_event.is_set() + + +def test_done_event_set_on_failed() -> None: + """Transition to FAILED must also set _done_event (parity with COMPLETED).""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + JobStateMachine.transition(job, JobState.FAILED) + assert job._done_event.is_set() + assert job.finished_at is not None + + +def test_done_event_set_on_cancelled() -> None: + """Transition to CANCELLED must also set _done_event (parity with COMPLETED).""" + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.CANCELLED) + assert job._done_event.is_set() + assert job.finished_at is not None + + +def test_terminal_states_absorb_no_outgoing_transitions() -> None: + """FAILED and CANCELLED behave like COMPLETED — no further transitions allowed.""" + for terminal in (JobState.FAILED, JobState.CANCELLED): + job = Job(id="abc", printer_id="pt750w", state=terminal) + for target in JobState: + if target == terminal: + continue + with pytest.raises(InvalidStateTransitionError): + JobStateMachine.transition(job, target) From 0e3ad486fcc26dae90269f79cc361db04de943fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Strausmann?= Date: Mon, 11 May 2026 08:47:59 +0000 Subject: [PATCH 3/3] refactor(job-lifecycle): UTC-aware timestamps + accurate asyncio.Event comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Import UTC from datetime; replace all datetime.now() calls with datetime.now(UTC) (submitted_at default_factory, started_at and finished_at in transition()) so timestamps are timezone-aware and safe across DST boundaries and SQLModel persistence (Phase 5). - Rewrite the _done_event comment: it was wrong about "import time" — default_factory runs at instance creation, and since Python 3.10 asyncio.Event does not pin to a loop at construction. - Add rationale comment on options: dict[str, Any] so future reviewers don't re-flag the intentional Any. - Add test_timestamps_are_utc_aware() verifying tzinfo is UTC on all three timestamp fields (14th test, 64 total). Co-Authored-By: Claude Sonnet 4.6 --- backend/app/services/job_lifecycle.py | 19 ++++++++++++------- .../tests/unit/services/test_job_lifecycle.py | 12 +++++++++++- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/backend/app/services/job_lifecycle.py b/backend/app/services/job_lifecycle.py index 62c7756..e60898a 100644 --- a/backend/app/services/job_lifecycle.py +++ b/backend/app/services/job_lifecycle.py @@ -24,7 +24,7 @@ import asyncio from dataclasses import dataclass, field -from datetime import datetime +from datetime import UTC, datetime from enum import StrEnum from typing import Any @@ -76,18 +76,23 @@ class Job: id: str printer_id: str state: JobState = JobState.QUEUED - submitted_at: datetime = field(default_factory=datetime.now) + submitted_at: datetime = field(default_factory=lambda: datetime.now(UTC)) started_at: datetime | None = None finished_at: datetime | None = None image_payload: bytes | None = field(default=None, repr=False) tape_mm: int | None = None + # `options` carries heterogeneous per-job kwargs (e.g. parent_job_id from + # retry, downstream printer flags). `Any` is contained to this field and + # does not leak into transition() logic; a structured type can replace it + # once the option set stabilises. options: dict[str, Any] = field(default_factory=dict) error_msg: str | None = None error_flags: int | None = None retry_count: int = 0 - # asyncio.Event is allowed on a dataclass field because asyncio fields on - # Jobs are only awaited from within an event loop; constructing them at - # import time is OK since asyncio.Event does not require a running loop. + # asyncio.Event is mutable but safe to use as a dataclass field default_factory: + # it's constructed when a Job instance is created (not at import time), and + # since Python 3.10 the Event does not pin to a specific event loop at + # construction — it binds to the running loop on first wait/set. _done_event: asyncio.Event = field(default_factory=asyncio.Event, init=False, repr=False) @@ -116,8 +121,8 @@ def transition(job: Job, new_state: JobState) -> None: job.state = new_state if new_state == JobState.PRINTING and job.started_at is None: - job.started_at = datetime.now() + job.started_at = datetime.now(UTC) if new_state in _TERMINAL_STATES: - job.finished_at = datetime.now() + job.finished_at = datetime.now(UTC) job._done_event.set() diff --git a/backend/tests/unit/services/test_job_lifecycle.py b/backend/tests/unit/services/test_job_lifecycle.py index 8218847..aa43e06 100644 --- a/backend/tests/unit/services/test_job_lifecycle.py +++ b/backend/tests/unit/services/test_job_lifecycle.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import UTC, datetime import pytest from app.services.job_lifecycle import ( @@ -97,6 +97,16 @@ def test_done_event_set_on_cancelled() -> None: assert job.finished_at is not None +def test_timestamps_are_utc_aware() -> None: + """submitted_at, started_at and finished_at must carry UTC tzinfo.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PRINTING) + JobStateMachine.transition(job, JobState.COMPLETED) + assert job.submitted_at.tzinfo is UTC + assert job.started_at is not None and job.started_at.tzinfo is UTC + assert job.finished_at is not None and job.finished_at.tzinfo is UTC + + def test_terminal_states_absorb_no_outgoing_transitions() -> None: """FAILED and CANCELLED behave like COMPLETED — no further transitions allowed.""" for terminal in (JobState.FAILED, JobState.CANCELLED):