Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions backend/app/services/job_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Job-lifecycle finite-state machine for the print queue.

Brother PT/QL printers have no multi-job queue (TCP/9100 is a single stream).
The hub serialises jobs per printer and tracks state explicitly. This module
owns the state model only; the worker that drives transitions lives in
`app.services.print_queue`.

State diagram:

QUEUED ──> PRINTING ──> COMPLETED
│ └──────> FAILED
├──> PAUSED ──> QUEUED
│ └─────> CANCELLED
├──> CANCELLED
└──> FAILED

Mid-print cancel and mid-print pause are NOT supported (Brother spec —
the printer ignores commands once the raster stream starts).
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
from typing import Any


class JobState(StrEnum):
QUEUED = "queued"
PAUSED = "paused"
PRINTING = "printing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"


# Terminal states are absorbing — no transitions out.
# PRINTING is one-way to COMPLETED or FAILED (Brother spec: no mid-print stop).
_VALID_TRANSITIONS: dict[JobState, frozenset[JobState]] = {
JobState.QUEUED: frozenset(
{
JobState.PRINTING,
JobState.PAUSED,
JobState.CANCELLED,
JobState.FAILED,
}
),
JobState.PAUSED: frozenset({JobState.QUEUED, JobState.CANCELLED}),
JobState.PRINTING: frozenset({JobState.COMPLETED, JobState.FAILED}),
JobState.COMPLETED: frozenset(),
JobState.FAILED: frozenset(),
JobState.CANCELLED: frozenset(),
}

_TERMINAL_STATES: frozenset[JobState] = frozenset(
{
JobState.COMPLETED,
JobState.FAILED,
JobState.CANCELLED,
}
)


class InvalidStateTransitionError(Exception):
"""Raised when JobStateMachine.transition() is called with an illegal target."""


@dataclass
class Job:
"""A single print job. In-memory MVP; persistence comes in Phase 5."""

id: str
printer_id: str
state: JobState = JobState.QUEUED
submitted_at: datetime = field(default_factory=lambda: datetime.now(UTC))
started_at: datetime | None = None
finished_at: datetime | None = None
image_payload: bytes | None = field(default=None, repr=False)
tape_mm: int | None = None
# `options` carries heterogeneous per-job kwargs (e.g. parent_job_id from
# retry, downstream printer flags). `Any` is contained to this field and
# does not leak into transition() logic; a structured type can replace it
# once the option set stabilises.
options: dict[str, Any] = field(default_factory=dict)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This introduces Any, which is flagged by the repository's strict type safety policy (Priority 7). Consider using a more specific type for printer options, such as dict[str, str | int | bool | float | None], or define a TypeAlias if the schema is known.

References
  1. Flag new Any introductions. mypy --strict on app/. (link)

error_msg: str | None = None
error_flags: int | None = None
retry_count: int = 0
# asyncio.Event is mutable but safe to use as a dataclass field default_factory:
# it's constructed when a Job instance is created (not at import time), and
# since Python 3.10 the Event does not pin to a specific event loop at
# construction — it binds to the running loop on first wait/set.
_done_event: asyncio.Event = field(default_factory=asyncio.Event, init=False, repr=False)


class JobStateMachine:
"""Stateless gate-keeper for Job.state mutations.

All state changes MUST flow through `transition()`. Direct assignment to
`job.state` bypasses the validity check and the timestamp/event side
effects — don't do that in production code.
"""

@staticmethod
def transition(job: Job, new_state: JobState) -> None:
"""Move `job` to `new_state` if the transition is valid.

Raises:
InvalidStateTransitionError: if `new_state` is not reachable from
`job.state`. The exception message names both states so log
entries are self-contained.
"""
if new_state not in _VALID_TRANSITIONS[job.state]:
raise InvalidStateTransitionError(
f"Illegal transition {job.state.value} -> {new_state.value}"
)

job.state = new_state

if new_state == JobState.PRINTING and job.started_at is None:
job.started_at = datetime.now(UTC)

if new_state in _TERMINAL_STATES:
job.finished_at = datetime.now(UTC)
job._done_event.set()
118 changes: 118 additions & 0 deletions backend/tests/unit/services/test_job_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from datetime import UTC, datetime

import pytest
from app.services.job_lifecycle import (
InvalidStateTransitionError,
Job,
JobState,
JobStateMachine,
)


def test_job_queued_to_printing() -> None:
job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED)
JobStateMachine.transition(job, JobState.PRINTING)
assert job.state == JobState.PRINTING
assert job.started_at is not None


def test_job_printing_to_completed() -> None:
job = Job(
id="abc",
printer_id="pt750w",
state=JobState.PRINTING,
started_at=datetime.now(),
)
JobStateMachine.transition(job, JobState.COMPLETED)
assert job.state == JobState.COMPLETED
assert job.finished_at is not None


def test_invalid_transition_completed_to_printing() -> None:
job = Job(id="abc", printer_id="pt750w", state=JobState.COMPLETED)
with pytest.raises(InvalidStateTransitionError, match="completed"):
JobStateMachine.transition(job, JobState.PRINTING)


def test_cancel_only_from_queued_or_paused() -> None:
"""Brother Raster Spec: no mid-print cancel."""
job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING)
with pytest.raises(InvalidStateTransitionError, match="printing"):
JobStateMachine.transition(job, JobState.CANCELLED)


def test_pause_from_queued_to_paused() -> None:
job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED)
JobStateMachine.transition(job, JobState.PAUSED)
assert job.state == JobState.PAUSED


def test_resume_from_paused_to_queued() -> None:
job = Job(id="abc", printer_id="pt750w", state=JobState.PAUSED)
JobStateMachine.transition(job, JobState.QUEUED)
assert job.state == JobState.QUEUED


def test_cancel_from_paused() -> None:
job = Job(id="abc", printer_id="pt750w", state=JobState.PAUSED)
JobStateMachine.transition(job, JobState.CANCELLED)
assert job.state == JobState.CANCELLED


def test_pause_printing_not_allowed() -> None:
"""Brother Raster Spec: no mid-print pause."""
job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING)
with pytest.raises(InvalidStateTransitionError, match="printing"):
JobStateMachine.transition(job, JobState.PAUSED)


def test_done_event_set_on_terminal_state() -> None:
"""Terminal transitions must signal the _done_event for wait_for_job()."""
job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING)
assert not job._done_event.is_set()
JobStateMachine.transition(job, JobState.COMPLETED)
assert job._done_event.is_set()


def test_done_event_not_set_on_pause() -> None:
"""Non-terminal transitions must NOT signal the _done_event."""
job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED)
JobStateMachine.transition(job, JobState.PAUSED)
assert not job._done_event.is_set()


def test_done_event_set_on_failed() -> None:
"""Transition to FAILED must also set _done_event (parity with COMPLETED)."""
job = Job(id="abc", printer_id="pt750w", state=JobState.PRINTING)
JobStateMachine.transition(job, JobState.FAILED)
assert job._done_event.is_set()
assert job.finished_at is not None


def test_done_event_set_on_cancelled() -> None:
"""Transition to CANCELLED must also set _done_event (parity with COMPLETED)."""
job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED)
JobStateMachine.transition(job, JobState.CANCELLED)
assert job._done_event.is_set()
assert job.finished_at is not None


def test_timestamps_are_utc_aware() -> None:
"""submitted_at, started_at and finished_at must carry UTC tzinfo."""
job = Job(id="abc", printer_id="pt750w", state=JobState.QUEUED)
JobStateMachine.transition(job, JobState.PRINTING)
JobStateMachine.transition(job, JobState.COMPLETED)
assert job.submitted_at.tzinfo is UTC
assert job.started_at is not None and job.started_at.tzinfo is UTC
assert job.finished_at is not None and job.finished_at.tzinfo is UTC


def test_terminal_states_absorb_no_outgoing_transitions() -> None:
"""FAILED and CANCELLED behave like COMPLETED — no further transitions allowed."""
for terminal in (JobState.FAILED, JobState.CANCELLED):
job = Job(id="abc", printer_id="pt750w", state=terminal)
for target in JobState:
if target == terminal:
continue
with pytest.raises(InvalidStateTransitionError):
JobStateMachine.transition(job, target)