From 996451c97d19033154d8bb346194e193b11b85d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Strausmann?= Date: Mon, 11 May 2026 08:53:35 +0000 Subject: [PATCH 1/3] feat(printer-models): PrintQueue with per-printer async worker, pause/resume/cancel/retry Co-Authored-By: Claude Sonnet 4.6 --- backend/app/services/print_queue.py | 264 ++++++++++++++++++ .../tests/unit/services/test_print_queue.py | 123 ++++++++ 2 files changed, 387 insertions(+) create mode 100644 backend/app/services/print_queue.py create mode 100644 backend/tests/unit/services/test_print_queue.py diff --git a/backend/app/services/print_queue.py b/backend/app/services/print_queue.py new file mode 100644 index 0000000..e43c8e5 --- /dev/null +++ b/backend/app/services/print_queue.py @@ -0,0 +1,264 @@ +"""Per-printer async work queue. + +Brother PT/QL printers expose TCP/9100 as a single-stream channel — there is +no on-device multi-job queue. The hub serialises jobs per printer by running +one asyncio worker task per printer and feeding it from an asyncio.Queue. + +Jobs live in-memory (MVP). Phase 5 will add SQLite persistence behind a +JobStore protocol that this module will accept by dependency injection. + +Internal dependency note: the worker reads `job._done_event` (a private field +on `Job`) to signal completion to `wait_for_job`. `PrintQueue` and +`wait_for_job` are the intended and only legitimate consumers of that event; +the underscore signals "don't touch without thinking". +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from enum import StrEnum +from io import BytesIO +from typing import Any, Protocol + +from PIL import Image + +from app.services.job_lifecycle import ( + InvalidStateTransitionError, + Job, + JobState, + JobStateMachine, +) + +logger = logging.getLogger(__name__) + + +class PrinterWorkerState(StrEnum): + """Per-printer worker state, orthogonal to per-job state.""" + + ACTIVE = "active" + PAUSED = "paused" + + +class _PrinterLike(Protocol): + """Minimal printer contract this queue depends on. + + Real printer plugins (PR for Tasks 2.1/2.2) implement the richer + PrinterModel Protocol (PR #48). The queue depends only on `id` and + `print_image` — the `**kwargs` signature avoids repeating the full + option set here, which belongs to the driver layer. + """ + + id: str + + async def print_image(self, image: Image.Image, **kwargs: Any) -> None: ... + + +class PrintQueue: + """Per-printer async work queue with submit/pause/resume/cancel/retry.""" + + def __init__(self, printers: list[_PrinterLike]) -> None: + self._printers: dict[str, _PrinterLike] = {p.id: p for p in printers} + self._queues: dict[str, asyncio.Queue[Job]] = {p.id: asyncio.Queue() for p in printers} + self._worker_states: dict[str, PrinterWorkerState] = { + p.id: PrinterWorkerState.ACTIVE for p in printers + } + self._worker_resume_events: dict[str, asyncio.Event] = { + p.id: asyncio.Event() for p in printers + } + # All resume events start "set" so a never-paused worker doesn't block. + for ev in self._worker_resume_events.values(): + ev.set() + self._jobs: dict[str, Job] = {} + self._workers: dict[str, asyncio.Task[None]] = {} + self._running: bool = False + + # --- lifecycle ---------------------------------------------------------- + + async def start(self) -> None: + if self._running: + return + for printer_id in self._queues: + self._workers[printer_id] = asyncio.create_task( + self._worker(printer_id), name=f"printer-worker-{printer_id}" + ) + self._running = True + + async def stop(self) -> None: + for task in self._workers.values(): + task.cancel() + if self._workers: + await asyncio.gather(*self._workers.values(), return_exceptions=True) + self._workers.clear() + self._running = False + + # --- job CRUD ----------------------------------------------------------- + + async def submit( + self, + printer_id: str, + image: Image.Image, + tape_mm: int, + **options: Any, + ) -> str: + if printer_id not in self._queues: + raise KeyError(f"Unknown printer: {printer_id}") + buf = BytesIO() + image.save(buf, format="PNG") + job = Job( + id=str(uuid.uuid4()), + printer_id=printer_id, + image_payload=buf.getvalue(), + tape_mm=tape_mm, + options=dict(options), + ) + self._jobs[job.id] = job + await self._queues[printer_id].put(job) + logger.info("Job %s queued on %s", job.id, printer_id) + return job.id + + async def get(self, job_id: str) -> Job: + return self._jobs[job_id] + + async def wait_for_job(self, job_id: str, timeout_s: float = 60.0) -> Job: + job = self._jobs[job_id] + await asyncio.wait_for(job._done_event.wait(), timeout=timeout_s) + return job + + # --- per-job control --------------------------------------------------- + + async def cancel(self, job_id: str) -> bool: + """Cancel a queued or paused job. Returns False for non-cancellable states.""" + job = self._jobs[job_id] + if job.state not in (JobState.QUEUED, JobState.PAUSED): + return False + try: + JobStateMachine.transition(job, JobState.CANCELLED) + except InvalidStateTransitionError: + return False + return True + + async def pause_job(self, job_id: str) -> bool: + """Pause a queued job — worker skips it when it pops the queue.""" + job = self._jobs[job_id] + if job.state != JobState.QUEUED: + return False + JobStateMachine.transition(job, JobState.PAUSED) + return True + + async def resume_job(self, job_id: str) -> bool: + """Re-enqueue a paused job at the tail of the queue (FIFO preserved).""" + job = self._jobs[job_id] + if job.state != JobState.PAUSED: + return False + JobStateMachine.transition(job, JobState.QUEUED) + await self._queues[job.printer_id].put(job) + return True + + async def retry_job(self, job_id: str) -> str | None: + """Submit a fresh copy of a FAILED job. Returns new job id or None.""" + original = self._jobs[job_id] + if original.state != JobState.FAILED: + return None + new_job = Job( + id=str(uuid.uuid4()), + printer_id=original.printer_id, + image_payload=original.image_payload, + tape_mm=original.tape_mm, + options=dict(original.options), + retry_count=original.retry_count + 1, + ) + new_job.options["parent_job_id"] = original.id + self._jobs[new_job.id] = new_job + await self._queues[new_job.printer_id].put(new_job) + return new_job.id + + # --- per-printer control ----------------------------------------------- + + async def pause_printer(self, printer_id: str, reason: str = "") -> None: + """Pause the worker for a printer. Any in-flight job completes first.""" + if printer_id not in self._worker_states: + raise KeyError(f"Unknown printer: {printer_id}") + self._worker_states[printer_id] = PrinterWorkerState.PAUSED + self._worker_resume_events[printer_id].clear() + logger.info("Printer %s paused: %s", printer_id, reason) + + async def resume_printer(self, printer_id: str) -> None: + """Resume a paused printer worker.""" + if printer_id not in self._worker_states: + raise KeyError(f"Unknown printer: {printer_id}") + self._worker_states[printer_id] = PrinterWorkerState.ACTIVE + self._worker_resume_events[printer_id].set() + logger.info("Printer %s resumed", printer_id) + + async def list_queue(self, printer_id: str) -> list[Job]: + """All non-terminal jobs for a printer (queued + paused + printing).""" + if printer_id not in self._queues: + raise KeyError(f"Unknown printer: {printer_id}") + non_terminal = (JobState.QUEUED, JobState.PAUSED, JobState.PRINTING) + return [ + j for j in self._jobs.values() if j.printer_id == printer_id and j.state in non_terminal + ] + + async def clear_queue(self, printer_id: str) -> int: + """Cancel all queued + paused jobs for a printer. Returns the count.""" + if printer_id not in self._queues: + raise KeyError(f"Unknown printer: {printer_id}") + cancelled = 0 + for job in self._jobs.values(): + if job.printer_id == printer_id and job.state in ( + JobState.QUEUED, + JobState.PAUSED, + ): + JobStateMachine.transition(job, JobState.CANCELLED) + cancelled += 1 + return cancelled + + # --- worker loop ------------------------------------------------------- + + async def _worker(self, printer_id: str) -> None: + """Consume the queue for one printer, one job at a time. + + The worker waits on `_worker_resume_events[printer_id]` when the + printer is paused, so no jobs are dequeued until the printer is + resumed. Each job is checked again after dequeue — it may have been + cancelled or paused between submit and the worker picking it up. + """ + printer = self._printers[printer_id] + queue = self._queues[printer_id] + while True: + # Block here if the printer is paused; resume_printer() sets the event. + if self._worker_states[printer_id] == PrinterWorkerState.PAUSED: + await self._worker_resume_events[printer_id].wait() + + job = await queue.get() + + # Job may have been cancelled or paused between submit and pop. + if job.state != JobState.QUEUED: + continue + + try: + JobStateMachine.transition(job, JobState.PRINTING) + if job.image_payload is None: + raise RuntimeError(f"Job {job.id} has no image payload") + image = Image.open(BytesIO(job.image_payload)) + await printer.print_image(image, tape_mm=job.tape_mm, **job.options) + JobStateMachine.transition(job, JobState.COMPLETED) + logger.info("Job %s completed on %s", job.id, printer_id) + except asyncio.CancelledError: + # queue.stop() cancelled this task — re-raise so the task exits. + raise + except Exception as exc: + job.error_msg = str(exc) + try: + JobStateMachine.transition(job, JobState.FAILED) + except InvalidStateTransitionError: + # Job was already moved to a terminal state externally. + logger.warning( + "Job %s: unexpected state %s after exception; error was: %s", + job.id, + job.state, + exc, + ) + logger.exception("Job %s failed on %s", job.id, printer_id) diff --git a/backend/tests/unit/services/test_print_queue.py b/backend/tests/unit/services/test_print_queue.py new file mode 100644 index 0000000..fcb94e2 --- /dev/null +++ b/backend/tests/unit/services/test_print_queue.py @@ -0,0 +1,123 @@ +import asyncio +from unittest.mock import AsyncMock, MagicMock + +import pytest +from app.services.job_lifecycle import ( + JobState, + JobStateMachine, +) +from app.services.print_queue import PrintQueue +from PIL import Image + + +@pytest.mark.asyncio +async def test_queue_submit_returns_job_id() -> None: + fake_printer = MagicMock() + fake_printer.id = "pt750w" + queue = PrintQueue([fake_printer]) + + img = Image.new("1", (300, 76)) + job_id = await queue.submit("pt750w", img, tape_mm=12) + assert isinstance(job_id, str) + assert len(job_id) >= 8 # UUID-like + + +@pytest.mark.asyncio +async def test_queue_serial_per_printer() -> None: + """Two jobs on the same printer execute serially, not in parallel.""" + fake_printer = MagicMock() + fake_printer.id = "pt750w" + fake_printer.print_image = AsyncMock(return_value=None) + + queue = PrintQueue([fake_printer]) + await queue.start() + try: + img = Image.new("1", (300, 76)) + job_id_1 = await queue.submit("pt750w", img, tape_mm=12) + job_id_2 = await queue.submit("pt750w", img, tape_mm=12) + + await queue.wait_for_job(job_id_1, timeout_s=5) + await queue.wait_for_job(job_id_2, timeout_s=5) + + assert fake_printer.print_image.await_count == 2 + finally: + await queue.stop() + + +@pytest.mark.asyncio +async def test_queue_pause_and_resume_job() -> None: + fake_printer = MagicMock() + fake_printer.id = "pt750w" + fake_printer.print_image = AsyncMock() + queue = PrintQueue([fake_printer]) + + img = Image.new("1", (300, 76)) + job_id = await queue.submit("pt750w", img, tape_mm=12) + assert (await queue.pause_job(job_id)) is True + assert (await queue.get(job_id)).state == JobState.PAUSED + assert (await queue.resume_job(job_id)) is True + assert (await queue.get(job_id)).state == JobState.QUEUED + + +@pytest.mark.asyncio +async def test_queue_clear_cancels_all_pending() -> None: + fake_printer = MagicMock() + fake_printer.id = "pt750w" + queue = PrintQueue([fake_printer]) + + img = Image.new("1", (300, 76)) + j1 = await queue.submit("pt750w", img, tape_mm=12) + j2 = await queue.submit("pt750w", img, tape_mm=12) + cancelled = await queue.clear_queue("pt750w") + assert cancelled == 2 + assert (await queue.get(j1)).state == JobState.CANCELLED + assert (await queue.get(j2)).state == JobState.CANCELLED + + +@pytest.mark.asyncio +async def test_queue_pause_printer_blocks_worker() -> None: + """When a printer is paused, the worker must not pick further jobs.""" + fake_printer = MagicMock() + fake_printer.id = "pt750w" + fake_printer.print_image = AsyncMock() + queue = PrintQueue([fake_printer]) + await queue.start() + try: + await queue.pause_printer("pt750w", reason="manual pause") + + img = Image.new("1", (300, 76)) + job_id = await queue.submit("pt750w", img, tape_mm=12) + + # Deterministic check: worker is paused, job stays in asyncio.Queue. + await asyncio.sleep(0) # yield to event loop; worker should not proceed + assert queue._worker_states["pt750w"].value == "paused" + assert queue._queues["pt750w"].qsize() == 1 + assert (await queue.get(job_id)).state == JobState.QUEUED + + await queue.resume_printer("pt750w") + await queue.wait_for_job(job_id, timeout_s=5) + assert (await queue.get(job_id)).state == JobState.COMPLETED + finally: + await queue.stop() + + +@pytest.mark.asyncio +async def test_queue_retry_failed_creates_new_job() -> None: + fake_printer = MagicMock() + fake_printer.id = "pt750w" + queue = PrintQueue([fake_printer]) + + img = Image.new("1", (300, 76)) + job_id = await queue.submit("pt750w", img, tape_mm=12) + # Drive job to FAILED manually (no worker running) + job = await queue.get(job_id) + JobStateMachine.transition(job, JobState.PRINTING) + JobStateMachine.transition(job, JobState.FAILED) + + new_id = await queue.retry_job(job_id) + assert new_id is not None + assert new_id != job_id + new_job = await queue.get(new_id) + assert new_job.state == JobState.QUEUED + assert new_job.retry_count == 1 + assert new_job.options.get("parent_job_id") == job_id From c4611745d74fb7c406a816fed86f984cd17e9101 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Strausmann?= Date: Mon, 11 May 2026 09:01:28 +0000 Subject: [PATCH 2/3] refactor(printer-models): tighten _PrinterLike Protocol, guard tape_mm/payload, document queue invariants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix A (I2): replace **kwargs with explicit tape_mm: int kw-only + **options in _PrinterLike.print_image so mypy strict enforces the signature - Fix B (I1): guard job.tape_mm is None in _worker before print_image call; raises RuntimeError → worker's except Exception → FAILED with clear error_msg - Fix C (M3): replace loose len(job_id) >= 8 assert with uuid.UUID(job_id) for a proper UUID round-trip validation - Fix D (M1): document resume_job stale-reference + qsize +1 invariant - Fix E (M2/M4): add TODO(phase5) comments on _jobs eviction and wait_for_job private _done_event access Co-Authored-By: Claude Sonnet 4.6 --- backend/app/services/print_queue.py | 26 ++++++++++++++++--- .../tests/unit/services/test_print_queue.py | 3 ++- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/backend/app/services/print_queue.py b/backend/app/services/print_queue.py index e43c8e5..a7c22c6 100644 --- a/backend/app/services/print_queue.py +++ b/backend/app/services/print_queue.py @@ -46,13 +46,14 @@ class _PrinterLike(Protocol): Real printer plugins (PR for Tasks 2.1/2.2) implement the richer PrinterModel Protocol (PR #48). The queue depends only on `id` and - `print_image` — the `**kwargs` signature avoids repeating the full - option set here, which belongs to the driver layer. + `print_image`. `tape_mm` is required as a keyword-only argument so mypy + strict can verify that conforming printer plugins accept it explicitly; + `**options` carries driver-specific extras that vary per plugin. """ id: str - async def print_image(self, image: Image.Image, **kwargs: Any) -> None: ... + async def print_image(self, image: Image.Image, *, tape_mm: int, **options: Any) -> None: ... class PrintQueue: @@ -70,6 +71,9 @@ def __init__(self, printers: list[_PrinterLike]) -> None: # All resume events start "set" so a never-paused worker doesn't block. for ev in self._worker_resume_events.values(): ev.set() + # TODO(phase5): _jobs grows unbounded over the service lifetime; evict + # terminal jobs older than a configurable window once + # persistence lands. self._jobs: dict[str, Job] = {} self._workers: dict[str, asyncio.Task[None]] = {} self._running: bool = False @@ -123,6 +127,8 @@ async def get(self, job_id: str) -> Job: async def wait_for_job(self, job_id: str, timeout_s: float = 60.0) -> Job: job = self._jobs[job_id] + # TODO(phase5): expose Job.wait_done() to remove this cross-module + # private access to _done_event. await asyncio.wait_for(job._done_event.wait(), timeout=timeout_s) return job @@ -148,7 +154,15 @@ async def pause_job(self, job_id: str) -> bool: return True async def resume_job(self, job_id: str) -> bool: - """Re-enqueue a paused job at the tail of the queue (FIFO preserved).""" + """Re-enqueue a paused job at the tail of the queue (FIFO preserved). + + Note: the job's original reference remains in the asyncio.Queue from + when it was first submitted. After resume, a new reference is appended + at the tail. The worker filters by state on pop, so the stale reference + drains cleanly (state != QUEUED on the second pop). asyncio.Queue.qsize() + will be +1 high until that drain happens — use list_queue() for accurate + active-job counts. + """ job = self._jobs[job_id] if job.state != JobState.PAUSED: return False @@ -240,6 +254,10 @@ async def _worker(self, printer_id: str) -> None: try: JobStateMachine.transition(job, JobState.PRINTING) + if job.tape_mm is None: + raise RuntimeError( + f"Job {job.id} has no tape_mm — submit() and retry_job() must populate it" + ) if job.image_payload is None: raise RuntimeError(f"Job {job.id} has no image payload") image = Image.open(BytesIO(job.image_payload)) diff --git a/backend/tests/unit/services/test_print_queue.py b/backend/tests/unit/services/test_print_queue.py index fcb94e2..140c377 100644 --- a/backend/tests/unit/services/test_print_queue.py +++ b/backend/tests/unit/services/test_print_queue.py @@ -1,4 +1,5 @@ import asyncio +import uuid from unittest.mock import AsyncMock, MagicMock import pytest @@ -19,7 +20,7 @@ async def test_queue_submit_returns_job_id() -> None: img = Image.new("1", (300, 76)) job_id = await queue.submit("pt750w", img, tape_mm=12) assert isinstance(job_id, str) - assert len(job_id) >= 8 # UUID-like + uuid.UUID(job_id) # raises ValueError if not a valid UUID @pytest.mark.asyncio From 20e4647228204afb6eacae997230fe4391354333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Strausmann?= Date: Mon, 11 May 2026 09:11:47 +0000 Subject: [PATCH 3/3] refactor(print-queue): graceful stop, pause-race fix, non-blocking image serialization Fix A (HIGH): move pause-check to after queue.get() so a pause set while the worker is idle at queue.get() is always honoured before the job transitions to PRINTING. Add regression test test_queue_pause_after_idle_worker_is_respected. Fix B (HIGH): replace cancel-only stop() with a graceful drain using a None sentinel to wake idle workers and asyncio.wait_for() to give in-flight jobs up to timeout_s (default 30 s) to complete before forcible cancel. Add regression test test_queue_stop_drains_in_flight_job. Add _stopping flag so a paused worker blocked on its resume-event exits immediately when stop() fires. Fix C (MEDIUM): offload image.save() (submit) and Image.open() (worker) to asyncio.to_thread() via the new module-level _serialize_image_to_png helper, keeping the event loop unblocked for typical label payloads. Fix D (MEDIUM): annotate list_queue() and clear_queue() with O(N) complexity cross-reference to the existing TODO(phase5) comment on _jobs. Update test_queue_pause_printer_blocks_worker: drop the now-stale qsize()==1 assertion (worker pops before blocking on pause); replace with a state+call- count check that remains true under the new post-get pause loop. Co-Authored-By: Claude Sonnet 4.6 --- backend/app/services/print_queue.py | 92 +++++++++++++++---- .../tests/unit/services/test_print_queue.py | 67 +++++++++++++- 2 files changed, 137 insertions(+), 22 deletions(-) diff --git a/backend/app/services/print_queue.py b/backend/app/services/print_queue.py index a7c22c6..4cf0770 100644 --- a/backend/app/services/print_queue.py +++ b/backend/app/services/print_queue.py @@ -34,6 +34,13 @@ logger = logging.getLogger(__name__) +def _serialize_image_to_png(image: Image.Image) -> bytes: + """Encode *image* as PNG bytes (CPU-bound; intended for asyncio.to_thread).""" + buf = BytesIO() + image.save(buf, format="PNG") + return buf.getvalue() + + class PrinterWorkerState(StrEnum): """Per-printer worker state, orthogonal to per-job state.""" @@ -61,7 +68,11 @@ class PrintQueue: def __init__(self, printers: list[_PrinterLike]) -> None: self._printers: dict[str, _PrinterLike] = {p.id: p for p in printers} - self._queues: dict[str, asyncio.Queue[Job]] = {p.id: asyncio.Queue() for p in printers} + # Queue type is Job | None — None is the sentinel used by stop() to wake + # workers that are blocked at queue.get(). + self._queues: dict[str, asyncio.Queue[Job | None]] = { + p.id: asyncio.Queue() for p in printers + } self._worker_states: dict[str, PrinterWorkerState] = { p.id: PrinterWorkerState.ACTIVE for p in printers } @@ -77,6 +88,7 @@ def __init__(self, printers: list[_PrinterLike]) -> None: self._jobs: dict[str, Job] = {} self._workers: dict[str, asyncio.Task[None]] = {} self._running: bool = False + self._stopping: bool = False # --- lifecycle ---------------------------------------------------------- @@ -89,13 +101,38 @@ async def start(self) -> None: ) self._running = True - async def stop(self) -> None: - for task in self._workers.values(): - task.cancel() + async def stop(self, timeout_s: float = 30.0) -> None: + """Stop all workers. + + Workers are signalled to exit and given up to *timeout_s* seconds to + finish the job they are currently printing. After the timeout, they are + cancelled forcibly — that leaves the printer in an undefined state for + that one job. Callers should pass enough timeout to cover a normal + print. + """ + self._stopping = True + # Wake up any worker waiting on a paused resume event so it sees the + # stop signal. + for ev in self._worker_resume_events.values(): + ev.set() + # Put a sentinel (None) onto each queue so workers blocked at queue.get() + # wake up and see the stop flag. + for q in self._queues.values(): + await q.put(None) if self._workers: - await asyncio.gather(*self._workers.values(), return_exceptions=True) + try: + await asyncio.wait_for( + asyncio.gather(*self._workers.values(), return_exceptions=True), + timeout=timeout_s, + ) + except TimeoutError: + for task in self._workers.values(): + if not task.done(): + task.cancel() + await asyncio.gather(*self._workers.values(), return_exceptions=True) self._workers.clear() self._running = False + self._stopping = False # --- job CRUD ----------------------------------------------------------- @@ -108,12 +145,11 @@ async def submit( ) -> str: if printer_id not in self._queues: raise KeyError(f"Unknown printer: {printer_id}") - buf = BytesIO() - image.save(buf, format="PNG") + payload = await asyncio.to_thread(_serialize_image_to_png, image) job = Job( id=str(uuid.uuid4()), printer_id=printer_id, - image_payload=buf.getvalue(), + image_payload=payload, tape_mm=tape_mm, options=dict(options), ) @@ -207,7 +243,11 @@ async def resume_printer(self, printer_id: str) -> None: logger.info("Printer %s resumed", printer_id) async def list_queue(self, printer_id: str) -> list[Job]: - """All non-terminal jobs for a printer (queued + paused + printing).""" + """All non-terminal jobs for a printer (queued + paused + printing). + + O(N) over all-time jobs — acceptable at MVP scale; see TODO(phase5) + on the _jobs declaration in __init__. + """ if printer_id not in self._queues: raise KeyError(f"Unknown printer: {printer_id}") non_terminal = (JobState.QUEUED, JobState.PAUSED, JobState.PRINTING) @@ -216,7 +256,11 @@ async def list_queue(self, printer_id: str) -> list[Job]: ] async def clear_queue(self, printer_id: str) -> int: - """Cancel all queued + paused jobs for a printer. Returns the count.""" + """Cancel all queued + paused jobs for a printer. Returns the count. + + O(N) over all-time jobs — acceptable at MVP scale; see TODO(phase5) + on the _jobs declaration in __init__. + """ if printer_id not in self._queues: raise KeyError(f"Unknown printer: {printer_id}") cancelled = 0 @@ -234,19 +278,27 @@ async def clear_queue(self, printer_id: str) -> int: async def _worker(self, printer_id: str) -> None: """Consume the queue for one printer, one job at a time. - The worker waits on `_worker_resume_events[printer_id]` when the - printer is paused, so no jobs are dequeued until the printer is - resumed. Each job is checked again after dequeue — it may have been - cancelled or paused between submit and the worker picking it up. + After popping a job the worker checks the pause state — this handles + the race where pause_printer() is called while the worker is blocked at + queue.get(). A sentinel value of None signals that stop() wants the + worker to exit cleanly. """ printer = self._printers[printer_id] queue = self._queues[printer_id] while True: - # Block here if the printer is paused; resume_printer() sets the event. - if self._worker_states[printer_id] == PrinterWorkerState.PAUSED: - await self._worker_resume_events[printer_id].wait() + item = await queue.get() + + if item is None: # sentinel — stop() requested a clean exit + return - job = await queue.get() + job = item + + # Wait while paused — pause may have been set while we were idle at + # queue.get(), so this check must come AFTER the pop. + while self._worker_states[printer_id] == PrinterWorkerState.PAUSED: + if self._stopping: + return + await self._worker_resume_events[printer_id].wait() # Job may have been cancelled or paused between submit and pop. if job.state != JobState.QUEUED: @@ -260,12 +312,12 @@ async def _worker(self, printer_id: str) -> None: ) if job.image_payload is None: raise RuntimeError(f"Job {job.id} has no image payload") - image = Image.open(BytesIO(job.image_payload)) + image = await asyncio.to_thread(Image.open, BytesIO(job.image_payload)) await printer.print_image(image, tape_mm=job.tape_mm, **job.options) JobStateMachine.transition(job, JobState.COMPLETED) logger.info("Job %s completed on %s", job.id, printer_id) except asyncio.CancelledError: - # queue.stop() cancelled this task — re-raise so the task exits. + # Forcible cancel after stop() timeout — re-raise so the task exits. raise except Exception as exc: job.error_msg = str(exc) diff --git a/backend/tests/unit/services/test_print_queue.py b/backend/tests/unit/services/test_print_queue.py index 140c377..ba1252e 100644 --- a/backend/tests/unit/services/test_print_queue.py +++ b/backend/tests/unit/services/test_print_queue.py @@ -89,11 +89,13 @@ async def test_queue_pause_printer_blocks_worker() -> None: img = Image.new("1", (300, 76)) job_id = await queue.submit("pt750w", img, tape_mm=12) - # Deterministic check: worker is paused, job stays in asyncio.Queue. + # Deterministic check: worker is paused and must not start printing. + # With the post-get pause loop, the worker pops the job and then blocks — + # qsize() drops to 0 but the job state remains QUEUED (not PRINTING). await asyncio.sleep(0) # yield to event loop; worker should not proceed assert queue._worker_states["pt750w"].value == "paused" - assert queue._queues["pt750w"].qsize() == 1 assert (await queue.get(job_id)).state == JobState.QUEUED + assert fake_printer.print_image.await_count == 0 await queue.resume_printer("pt750w") await queue.wait_for_job(job_id, timeout_s=5) @@ -102,6 +104,67 @@ async def test_queue_pause_printer_blocks_worker() -> None: await queue.stop() +@pytest.mark.asyncio +async def test_queue_pause_after_idle_worker_is_respected() -> None: + """Pausing while the worker is idle at queue.get() must still block the next pop.""" + fake_printer = MagicMock() + fake_printer.id = "pt750w" + fake_printer.print_image = AsyncMock() + queue = PrintQueue([fake_printer]) + await queue.start() + try: + # Worker is now idle at queue.get() with an empty queue. + # Give the loop a tick so the worker is actually blocked. + await asyncio.sleep(0) + + # Pause AFTER the worker has entered queue.get(). + await queue.pause_printer("pt750w", reason="race test") + + # Submit a job. The pause must hold. + img = Image.new("1", (300, 76)) + job_id = await queue.submit("pt750w", img, tape_mm=12) + + # Yield a few times — worker would print here if pause was ignored. + for _ in range(5): + await asyncio.sleep(0) + assert (await queue.get(job_id)).state == JobState.QUEUED + assert fake_printer.print_image.await_count == 0 + + # Resume — job should complete now. + await queue.resume_printer("pt750w") + await queue.wait_for_job(job_id, timeout_s=5) + assert (await queue.get(job_id)).state == JobState.COMPLETED + finally: + await queue.stop() + + +@pytest.mark.asyncio +async def test_queue_stop_drains_in_flight_job() -> None: + """stop() must wait for the currently-printing job to complete cleanly.""" + started = asyncio.Event() + finished = asyncio.Event() + + async def slow_print(image, *, tape_mm, **kw): + started.set() + await asyncio.sleep(0.1) + finished.set() + + fake_printer = MagicMock() + fake_printer.id = "pt750w" + fake_printer.print_image = AsyncMock(side_effect=slow_print) + queue = PrintQueue([fake_printer]) + await queue.start() + + img = Image.new("1", (300, 76)) + job_id = await queue.submit("pt750w", img, tape_mm=12) + await started.wait() # printer is in the middle of printing + + await queue.stop(timeout_s=5.0) + # The in-flight print finished cleanly (was not cancelled). + assert finished.is_set() + assert (await queue.get(job_id)).state == JobState.COMPLETED + + @pytest.mark.asyncio async def test_queue_retry_failed_creates_new_job() -> None: fake_printer = MagicMock()