feat(printer-models): PrintQueue worker with pause/resume/cancel/retry#50
Conversation
…/resume/cancel/retry Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…m/payload, document queue invariants - Fix A (I2): replace **kwargs with explicit tape_mm: int kw-only + **options in _PrinterLike.print_image so mypy strict enforces the signature - Fix B (I1): guard job.tape_mm is None in _worker before print_image call; raises RuntimeError → worker's except Exception → FAILED with clear error_msg - Fix C (M3): replace loose len(job_id) >= 8 assert with uuid.UUID(job_id) for a proper UUID round-trip validation - Fix D (M1): document resume_job stale-reference + qsize +1 invariant - Fix E (M2/M4): add TODO(phase5) comments on _jobs eviction and wait_for_job private _done_event access Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request establishes the core infrastructure for a printer job queue, enabling reliable asynchronous job processing for label printers. By implementing a dedicated worker-per-printer model, the system ensures serialized job execution and provides robust control mechanisms for job and printer states. This work completes the logic-foundation phase for the printer hub, setting the stage for upcoming model-specific printer plugins. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a per-printer asynchronous work queue service and associated unit tests to manage print job serialization and lifecycle. The feedback identifies several critical issues: the current shutdown process violates the Brother specification by potentially cancelling jobs mid-print, a race condition exists in the worker loop when a printer is paused while idle, and CPU-intensive image encoding is blocking the event loop. Additionally, there are performance concerns regarding the O(N) complexity of queue operations due to unbounded job storage.
| for task in self._workers.values(): | ||
| task.cancel() |
There was a problem hiding this comment.
The stop() method cancels worker tasks immediately. If a worker is currently executing printer.print_image, this will interrupt the raster stream mid-print. According to the Repository Style Guide (Priority 22), mid-print cancels are forbidden by the Brother spec. Consider implementing a graceful shutdown (e.g., using a sentinel value like None in the queue) to allow the current job to finish before the task exits.
References
- Mid-print cancel is forbidden by the Brother spec; flagging it is a high priority. (link)
| if self._worker_states[printer_id] == PrinterWorkerState.PAUSED: | ||
| await self._worker_resume_events[printer_id].wait() | ||
|
|
||
| job = await queue.get() |
There was a problem hiding this comment.
There is a race condition in the worker loop. If the worker is idle at await queue.get() and the printer is paused, the worker will not re-check the pause state after popping the next job. This allows a job submitted after the pause to begin printing immediately. The pause check should be performed (or repeated) after queue.get() returns.
| if self._worker_states[printer_id] == PrinterWorkerState.PAUSED: | |
| await self._worker_resume_events[printer_id].wait() | |
| job = await queue.get() | |
| job = await queue.get() | |
| # Re-check pause state to handle the race where the printer was paused | |
| # while the worker was idle/waiting for a job. | |
| if self._worker_states[printer_id] == PrinterWorkerState.PAUSED: | |
| await self._worker_resume_events[printer_id].wait() |
| if printer_id not in self._queues: | ||
| raise KeyError(f"Unknown printer: {printer_id}") | ||
| buf = BytesIO() | ||
| image.save(buf, format="PNG") |
There was a problem hiding this comment.
Encoding an image to PNG is a CPU-intensive operation that blocks the event loop. Per the Repository Style Guide (Rule 33), synchronous blocking operations should be avoided in async functions. Use asyncio.to_thread to offload this to a worker thread.
| image.save(buf, format="PNG") | |
| await asyncio.to_thread(image.save, buf, format="PNG") |
References
- Synchronous I/O (or blocking operations) in async functions should be flagged. (link)
| raise KeyError(f"Unknown printer: {printer_id}") | ||
| non_terminal = (JobState.QUEUED, JobState.PAUSED, JobState.PRINTING) | ||
| return [ | ||
| j for j in self._jobs.values() if j.printer_id == printer_id and j.state in non_terminal |
There was a problem hiding this comment.
Iterating over self._jobs.values() in list_queue (and clear_queue) results in _jobs grows unbounded. This will become a performance bottleneck over time. Consider maintaining a per-printer collection of active job IDs or implementing the eviction strategy sooner.
There was a problem hiding this comment.
Pull request overview
Adds the backend’s per-printer asynchronous print queue/worker layer on top of the existing job lifecycle FSM, enabling serialized printing per device with job- and printer-level control (pause/resume/cancel/retry).
Changes:
- Introduces
PrintQueuewith per-printerasyncio.Queueand one worker task per printer, plus job control APIs. - Implements printer-level pause/resume via worker state +
asyncio.Event. - Adds unit tests covering submit/serial execution/pause+resume/clear/retry and printer pause behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
backend/app/services/print_queue.py |
New per-printer queue + worker implementation with job/printer control APIs. |
backend/tests/unit/services/test_print_queue.py |
New asyncio unit tests for PrintQueue behaviors. |
| image = Image.open(BytesIO(job.image_payload)) | ||
| await printer.print_image(image, tape_mm=job.tape_mm, **job.options) |
| # Block here if the printer is paused; resume_printer() sets the event. | ||
| if self._worker_states[printer_id] == PrinterWorkerState.PAUSED: | ||
| await self._worker_resume_events[printer_id].wait() | ||
|
|
||
| job = await queue.get() |
| async def stop(self) -> None: | ||
| for task in self._workers.values(): | ||
| task.cancel() | ||
| if self._workers: | ||
| await asyncio.gather(*self._workers.values(), return_exceptions=True) |
| async def clear_queue(self, printer_id: str) -> int: | ||
| """Cancel all queued + paused jobs for a printer. Returns the count.""" | ||
| if printer_id not in self._queues: | ||
| raise KeyError(f"Unknown printer: {printer_id}") | ||
| cancelled = 0 | ||
| for job in self._jobs.values(): | ||
| if job.printer_id == printer_id and job.state in ( | ||
| JobState.QUEUED, | ||
| JobState.PAUSED, | ||
| ): | ||
| JobStateMachine.transition(job, JobState.CANCELLED) | ||
| cancelled += 1 | ||
| return cancelled | ||
|
|
…age serialization Fix A (HIGH): move pause-check to after queue.get() so a pause set while the worker is idle at queue.get() is always honoured before the job transitions to PRINTING. Add regression test test_queue_pause_after_idle_worker_is_respected. Fix B (HIGH): replace cancel-only stop() with a graceful drain using a None sentinel to wake idle workers and asyncio.wait_for() to give in-flight jobs up to timeout_s (default 30 s) to complete before forcible cancel. Add regression test test_queue_stop_drains_in_flight_job. Add _stopping flag so a paused worker blocked on its resume-event exits immediately when stop() fires. Fix C (MEDIUM): offload image.save() (submit) and Image.open() (worker) to asyncio.to_thread() via the new module-level _serialize_image_to_png helper, keeping the event loop unblocked for typical label payloads. Fix D (MEDIUM): annotate list_queue() and clear_queue() with O(N) complexity cross-reference to the existing TODO(phase5) comment on _jobs. Update test_queue_pause_printer_blocks_worker: drop the now-stale qsize()==1 assertion (worker pops before blocking on pause); replace with a state+call- count check that remains true under the new post-get pause loop. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## 0.3.0 (2026-05-12) * feat(config): pydantic-settings module with env-driven runtime configuration (#45) ([878e9e0](878e9e0)), closes [#45](#45) * feat(integrations): AppLookupService aggregator — Phase 3 complete (#53) ([222bef4](222bef4)), closes [#53](#53) * feat(integrations): Grocy + Spoolman lookup clients with shared NotFoundError base (#52) ([b1c9c3c](b1c9c3c)), closes [#52](#52) * feat(integrations): LabelData schema + Snipe-IT lookup client (#51) ([3bc180f](3bc180f)), closes [#51](#51) * feat(label-renderer): Template schema + Pillow/qrcode renderer for 1-bit label bitmaps (#54) ([fb77028](fb77028)), closes [#54](#54) * feat(printer-models): Brother PT-Series TapeRegistry with TZe and heat-shrink specs (#47) ([7526019](7526019)), closes [#47](#47) * feat(printer-models): Job lifecycle FSM with explicit state machine (#49) ([1a8c40e](1a8c40e)), closes [#49](#49) * feat(printer-models): PrinterModel Protocol + ModelRegistry for plugin discovery (#48) ([2ae0e09](2ae0e09)), closes [#48](#48) * feat(printer-models): PrintQueue worker with pause/resume/cancel/retry (#50) ([dfdf6fe](dfdf6fe)), closes [#50](#50) [skip ci]
Summary
PR B of split-Task 2.8. Adds the per-printer async work queue on top of PR #49's Job FSM. Brother PT/QL printers expose TCP/9100 as a single stream — there is no on-device queue — so the hub serialises jobs per printer with one asyncio worker task each.
What's in this PR
backend/app/services/print_queue.pyPrinterWorkerState(StrEnum)—ACTIVE/PAUSED, orthogonal to per-job state._PrinterLike(Protocol)— minimal contract the queue depends on:id: str+async print_image(image, *, tape_mm: int, **options: Any) -> None.tape_mmis kw-only required so any conforming plugin must accept it (mypy enforces this).PrintQueue:__init__(printers)— per-printer asyncio.Queue, worker-state, resume Event (initially set), jobs/workers dictsstart()/stop()— spawn / cancel workerssubmit(printer_id, image, tape_mm, **options) -> uuidget(job_id)/wait_for_job(job_id, timeout_s)— awaitjob._done_eventcancel(job_id)/pause_job(job_id)/resume_job(job_id)— per-job control (resume_jobre-enqueues at tail; docstring documents the stale-reference +qsize()invariant)retry_job(job_id) -> str | None— only from FAILED; new UUID,retry_count+1,parent_job_idlinkedpause_printer(...)/resume_printer(...)— worker-level control viaasyncio.Eventlist_queue(printer_id)— non-terminal jobs (queued + paused + printing)clear_queue(printer_id) -> int— cancels queued + paused_worker(printer_id)— pause-aware loop, state-filter on pop, transitions queued→printing→completed/failed;tape_mm is Noneandimage_payload is Noneguards before the printer call;CancelledErrorre-raised for cleanqueue.stop()backend/tests/unit/services/test_print_queue.py— 6 asyncio testsuuid.UUID(job_id))asyncio.sleep(0)+ qsize check)What's NOT in this PR
_jobsis in-memory. TODO(phase5) comment marks the eviction concern.Job.wait_done()public wrapper —wait_for_jobaccesses_done_eventdirectly with a TODO(phase5).services/.Test plan
pytest -q→ 70 (64 from PR feat(printer-models): Job lifecycle FSM with explicit state machine #49 + 6 new).pytest tests/unit/services/test_print_queue.py -v→ 6/6.ruff format --check .clean.ruff check .clean.mypy app/(strict) clean — 15 source files. Confirmstape_mm: intProtocol enforces non-None payloads.Review history (subagent-driven)
996451c— initial commit._PrinterLike.print_image: **kwargs: Anyinstead of*, tape_mm: int, **options: Any(fallback used unnecessarily; mypy didn't trip).c461174— Protocol tightened, guards added, UUID test strict, TODOs + invariant doc.Linked plan
docs/superpowers/plans/2026-05-11-label-printer-hub.mdTask 2.8 split B/B. With this PR Phase 2 logic-foundation (FSM + Queue) is complete. Real PT/QL printer plugins (Tasks 2.1 / 2.2) will plug into_PrinterLikeand the registry from PR #48.