diff --git a/backend/app/services/job_lifecycle.py b/backend/app/services/job_lifecycle.py new file mode 100644 index 0000000..e60898a --- /dev/null +++ b/backend/app/services/job_lifecycle.py @@ -0,0 +1,128 @@ +"""Job-lifecycle finite-state machine for the print queue. + +Brother PT/QL printers have no multi-job queue (TCP/9100 is a single stream). +The hub serialises jobs per printer and tracks state explicitly. This module +owns the state model only; the worker that drives transitions lives in +`app.services.print_queue`. + +State diagram: + + QUEUED ──> PRINTING ──> COMPLETED + │ └──────> FAILED + │ + ├──> PAUSED ──> QUEUED + │ └─────> CANCELLED + │ + ├──> CANCELLED + └──> FAILED + +Mid-print cancel and mid-print pause are NOT supported (Brother spec — +the printer ignores commands once the raster stream starts). +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from datetime import UTC, datetime +from enum import StrEnum +from typing import Any + + +class JobState(StrEnum): + QUEUED = "queued" + PAUSED = "paused" + PRINTING = "printing" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +# Terminal states are absorbing — no transitions out. +# PRINTING is one-way to COMPLETED or FAILED (Brother spec: no mid-print stop). +_VALID_TRANSITIONS: dict[JobState, frozenset[JobState]] = { + JobState.QUEUED: frozenset( + { + JobState.PRINTING, + JobState.PAUSED, + JobState.CANCELLED, + JobState.FAILED, + } + ), + JobState.PAUSED: frozenset({JobState.QUEUED, JobState.CANCELLED}), + JobState.PRINTING: frozenset({JobState.COMPLETED, JobState.FAILED}), + JobState.COMPLETED: frozenset(), + JobState.FAILED: frozenset(), + JobState.CANCELLED: frozenset(), +} + +_TERMINAL_STATES: frozenset[JobState] = frozenset( + { + JobState.COMPLETED, + JobState.FAILED, + JobState.CANCELLED, + } +) + + +class InvalidStateTransitionError(Exception): + """Raised when JobStateMachine.transition() is called with an illegal target.""" + + +@dataclass +class Job: + """A single print job. In-memory MVP; persistence comes in Phase 5.""" + + id: str + printer_id: str + state: JobState = JobState.QUEUED + submitted_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + started_at: datetime | None = None + finished_at: datetime | None = None + image_payload: bytes | None = field(default=None, repr=False) + tape_mm: int | None = None + # `options` carries heterogeneous per-job kwargs (e.g. parent_job_id from + # retry, downstream printer flags). `Any` is contained to this field and + # does not leak into transition() logic; a structured type can replace it + # once the option set stabilises. + options: dict[str, Any] = field(default_factory=dict) + error_msg: str | None = None + error_flags: int | None = None + retry_count: int = 0 + # asyncio.Event is mutable but safe to use as a dataclass field default_factory: + # it's constructed when a Job instance is created (not at import time), and + # since Python 3.10 the Event does not pin to a specific event loop at + # construction — it binds to the running loop on first wait/set. + _done_event: asyncio.Event = field(default_factory=asyncio.Event, init=False, repr=False) + + +class JobStateMachine: + """Stateless gate-keeper for Job.state mutations. + + All state changes MUST flow through `transition()`. Direct assignment to + `job.state` bypasses the validity check and the timestamp/event side + effects — don't do that in production code. + """ + + @staticmethod + def transition(job: Job, new_state: JobState) -> None: + """Move `job` to `new_state` if the transition is valid. + + Raises: + InvalidStateTransitionError: if `new_state` is not reachable from + `job.state`. The exception message names both states so log + entries are self-contained. + """ + if new_state not in _VALID_TRANSITIONS[job.state]: + raise InvalidStateTransitionError( + f"Illegal transition {job.state.value} -> {new_state.value}" + ) + + job.state = new_state + + if new_state == JobState.PRINTING and job.started_at is None: + job.started_at = datetime.now(UTC) + + if new_state in _TERMINAL_STATES: + job.finished_at = datetime.now(UTC) + job._done_event.set() diff --git a/backend/tests/unit/services/test_job_lifecycle.py b/backend/tests/unit/services/test_job_lifecycle.py new file mode 100644 index 0000000..aa43e06 --- /dev/null +++ b/backend/tests/unit/services/test_job_lifecycle.py @@ -0,0 +1,118 @@ +from datetime import UTC, datetime + +import pytest +from app.services.job_lifecycle import ( + InvalidStateTransitionError, + Job, + JobState, + JobStateMachine, +) + + +def test_job_queued_to_printing() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PRINTING) + assert job.state == JobState.PRINTING + assert job.started_at is not None + + +def test_job_printing_to_completed() -> None: + job = Job( + id="abc", + printer_id="pt750w", + state=JobState.PRINTING, + started_at=datetime.now(), + ) + JobStateMachine.transition(job, JobState.COMPLETED) + assert job.state == JobState.COMPLETED + assert job.finished_at is not None + + +def test_invalid_transition_completed_to_printing() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.COMPLETED) + with pytest.raises(InvalidStateTransitionError, match="completed"): + JobStateMachine.transition(job, JobState.PRINTING) + + +def test_cancel_only_from_queued_or_paused() -> None: + """Brother Raster Spec: no mid-print cancel.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + with pytest.raises(InvalidStateTransitionError, match="printing"): + JobStateMachine.transition(job, JobState.CANCELLED) + + +def test_pause_from_queued_to_paused() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PAUSED) + assert job.state == JobState.PAUSED + + +def test_resume_from_paused_to_queued() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.PAUSED) + JobStateMachine.transition(job, JobState.QUEUED) + assert job.state == JobState.QUEUED + + +def test_cancel_from_paused() -> None: + job = Job(id="abc", printer_id="pt750w", state=JobState.PAUSED) + JobStateMachine.transition(job, JobState.CANCELLED) + assert job.state == JobState.CANCELLED + + +def test_pause_printing_not_allowed() -> None: + """Brother Raster Spec: no mid-print pause.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + with pytest.raises(InvalidStateTransitionError, match="printing"): + JobStateMachine.transition(job, JobState.PAUSED) + + +def test_done_event_set_on_terminal_state() -> None: + """Terminal transitions must signal the _done_event for wait_for_job().""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + assert not job._done_event.is_set() + JobStateMachine.transition(job, JobState.COMPLETED) + assert job._done_event.is_set() + + +def test_done_event_not_set_on_pause() -> None: + """Non-terminal transitions must NOT signal the _done_event.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PAUSED) + assert not job._done_event.is_set() + + +def test_done_event_set_on_failed() -> None: + """Transition to FAILED must also set _done_event (parity with COMPLETED).""" + job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING) + JobStateMachine.transition(job, JobState.FAILED) + assert job._done_event.is_set() + assert job.finished_at is not None + + +def test_done_event_set_on_cancelled() -> None: + """Transition to CANCELLED must also set _done_event (parity with COMPLETED).""" + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.CANCELLED) + assert job._done_event.is_set() + assert job.finished_at is not None + + +def test_timestamps_are_utc_aware() -> None: + """submitted_at, started_at and finished_at must carry UTC tzinfo.""" + job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED) + JobStateMachine.transition(job, JobState.PRINTING) + JobStateMachine.transition(job, JobState.COMPLETED) + assert job.submitted_at.tzinfo is UTC + assert job.started_at is not None and job.started_at.tzinfo is UTC + assert job.finished_at is not None and job.finished_at.tzinfo is UTC + + +def test_terminal_states_absorb_no_outgoing_transitions() -> None: + """FAILED and CANCELLED behave like COMPLETED — no further transitions allowed.""" + for terminal in (JobState.FAILED, JobState.CANCELLED): + job = Job(id="abc", printer_id="pt750w", state=terminal) + for target in JobState: + if target == terminal: + continue + with pytest.raises(InvalidStateTransitionError): + JobStateMachine.transition(job, target)