Skip to content

feat(batch,ui): activate max_parallel + cooperative cancellation (PRP-34)#291

Merged
w7-mgfcode merged 3 commits into
devfrom
feat/batch-parallel-execution
May 25, 2026
Merged

feat(batch,ui): activate max_parallel + cooperative cancellation (PRP-34)#291
w7-mgfcode merged 3 commits into
devfrom
feat/batch-parallel-execution

Conversation

@w7-mgfcode
Copy link
Copy Markdown
Owner

@w7-mgfcode w7-mgfcode commented May 25, 2026

Summary

Implements PRP-34 — activates the three forward-compat columns PRP-33 shipped on batch_job (max_parallel, running_items, cancelled_items) by routing BatchService.submit through a new app/features/batch/runner.py (single asyncio.Semaphore inside an asyncio.TaskGroup, per-child AsyncSession, cooperative cancellation via per-batch asyncio.Event + tracked Task refs).

Adds DELETE /batch/{batch_id} (200 settled / 404 / 409 / 504 RFC 7807 via the new GatewayTimeoutError), a max-parallel Slider + cancel AlertDialog on frontend/src/pages/visualize/batch.tsx, and two new settings (BATCH_GLOBAL_MAX_PARALLEL=4, BATCH_CANCEL_DRAIN_TIMEOUT_SECONDS=30).

Closes #290.

What's new

  • app/features/batch/runner.py (new) — _ACTIVE_BATCHES registry, CancelHandle, run_batch / cancel_batch / await_drain / mark_completed. The except* asyncio.CancelledError catch shape is the PEP-654 form documented in PRPs/ai_docs/asyncio-taskgroup-cancellation.md. Children use one shared async_sessionmaker (no per-child engine).
  • app/features/batch/service.pysubmit() now delegates to runner.run_batch; _settle() writes effective_max_parallel into result_summary JSONB (no migration). The existing _pick_next / _execute_item kept on the class so downstream PRPs reuse them.
  • app/features/batch/routes.pyDELETE /batch/{batch_id} with RFC 7807 404 / 409 / 504.
  • app/features/batch/schemas.pyBatchSubmitResponse.effective_max_parallel is a @computed_field (legacy rows return 0).
  • app/features/batch/models.pyVALID_BATCH_ITEM_TRANSITIONS[RUNNING] now includes CANCELLED so the cooperative-cancel path can write its terminal state truthfully.
  • app/core/{config,exceptions,problem_details}.pySettings.batch_global_max_parallel / Settings.batch_cancel_drain_timeout_seconds, GatewayTimeoutError, ERROR_TYPES["GATEWAY_TIMEOUT"].
  • Frontend — Slider (added via shadcn MCP pnpm dlx shadcn@4.7.0 add slider), useCancelBatch hook, BatchSubmitResponse.{max_parallel, effective_max_parallel}, Cancel batch button + AlertDialog, live running_items + parallel chips.

Why no Alembic migration

PRP-33 already shipped batch_job.max_parallel / running_items / cancelled_items as forward-compat columns. uv run alembic check reports "No new upgrade operations detected" on this branch.

Test plan

  • uv run ruff check . && uv run ruff format --check . — clean
  • uv run mypy app/ — Success, no issues in 291 source files
  • uv run pyright app/ — 0 errors (69 pre-existing warnings, unrelated)
  • uv run pytest -m "not integration"1466 passed
  • uv run pytest -m integration app/features/batch/ tests/15 passed, 3 skipped (docker-stack fixtures, unrelated to PRP-34)
  • uv run alembic check — No new upgrade operations detected
  • cd frontend && pnpm tsc --noEmit && pnpm lint && pnpm test --run121 frontend tests passing, 0 lint errors

New tests (load-bearing)

  • app/features/batch/tests/test_runner.py — 9 unit tests including test_semaphore_caps_concurrency (catches unbounded fan-out), test_settings_global_cap_clamps_max_parallel, test_cancel_pending_child_marks_cancelled_without_running, test_cancel_running_child_propagates_cancellederror, plus 4 registry-hygiene tests
  • app/features/batch/tests/test_routes_cancel.py — 3 integration tests for DELETE /batch/{batch_id} (404 / 409 / 504)
  • app/features/batch/tests/test_runner_chaos.py — 2 integration tests asserting no orphaned RUNNING rows post-cancel + parent running_items=0 post-drain
  • frontend/src/hooks/use-batches.test.ts — 2 vitest cases for useCancelBatch (success path + 409 RFC 7807 surface)

Anti-pattern greps (all clean)

$ grep -rn "asyncio.gather" app/features/batch/   # none in production code
$ grep -rn "tg.cancel_scope" app/features/batch/  # none
$ grep -rn "asyncio.all_tasks" app/features/batch/runner.py  # none (only doc warning)
$ grep -rn "from app.features.jobs" app/features/batch/runner.py  # none — cross-slice imports stay in BatchService

Deviations from the PRP

  1. The PRP's pseudocode placed if handle.cancel_event.is_set(): before async with sem: but didn't catch the CancelledError raised inside async with sem: when a pending child is cancelled mid-acquire. Fixed by wrapping with an outer try/except CancelledError keyed off an acquired flag — pending children still route to _mark_cancelled_skipped. The PRP's load-bearing test_cancel_pending_child_marks_cancelled_without_running covers this.
  2. VALID_BATCH_ITEM_TRANSITIONS[RUNNING] extended to include CANCELLED (and test_valid_transitions_dict_item updated to match) — the doc dict was inconsistent with the new cooperative-cancel terminal state.
  3. DELETE-route tests are @pytest.mark.integration (need a live get_db); the PRP listed them under "Level 2: Unit Tests" but they exercise the FastAPI dependency chain.
  4. The shadcn slider install required the 4.7.0 fallback (pnpm dlx shadcn@latest 5.x silently failed) — explicitly anticipated by the PRP.

Summary by Sourcery

Activate bounded parallel execution and cooperative cancellation for batch jobs across backend and frontend.

New Features:

  • Introduce an asyncio-based batch runner with global and per-batch parallelism caps, cooperative cancellation, and effective_max_parallel tracking.
  • Expose DELETE /batch/{batch_id} to cooperatively cancel in-flight batches with RFC 7807 404/409/504 responses and a new GatewayTimeoutError type.
  • Extend BatchSubmitResponse and API types with max_parallel and effective_max_parallel, persisting the latter in result_summary JSONB without schema changes.
  • Add frontend support for per-batch max_parallel via a slider, live running/parallel chips, and a cancel-batch flow wired through a new useCancelBatch hook.

Enhancements:

  • Adjust batch settlement logic and state machine to handle cancelled items and parent CANCELLED status consistently, including updated transition tests.
  • Introduce configuration for global batch concurrency caps and cancel-drain timeout, and wire batch service to the new runner while maintaining existing picker helpers.
  • Add backend chaos and integration tests plus frontend hook tests to validate concurrency caps, cancellation semantics, and drain behaviour.

Documentation:

  • Document asyncio TaskGroup cancellation semantics and design decisions for the batch runner in a new ai_docs note.

Activate the three forward-compat columns PRP-33 shipped on `batch_job`
(`max_parallel`, `running_items`, `cancelled_items`) by routing
`BatchService.submit` through a new `app/features/batch/runner.py` — a
single `asyncio.Semaphore(effective_parallel)` inside an
`asyncio.TaskGroup` with per-child `AsyncSession`s and cooperative
cancellation via a per-batch `asyncio.Event` + tracked `Task` refs. No
new Alembic migration (the three columns already exist).

`DELETE /batch/{batch_id}` cancels what hasn't started and bounds the
drain of what has — 200 settled / 404 missing / 409 terminal / 504
drain-timeout (RFC 7807 via the new `GatewayTimeoutError`). The frontend
`visualize/batch.tsx` gains a max-parallel `Slider`, live `running_items`
+ `parallel` chips, and a "Cancel batch" `AlertDialog`. The settings
defaults are `BATCH_GLOBAL_MAX_PARALLEL=4`,
`BATCH_CANCEL_DRAIN_TIMEOUT_SECONDS=30`.

`BatchSubmitResponse.effective_max_parallel` is a `@computed_field`
resolved from `result_summary["effective_max_parallel"]` (legacy rows
return 0) — JSONB-only, no schema migration.
@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented May 25, 2026

Reviewer's Guide

Implements PRP-34 by introducing an asyncio-based bounded-concurrency batch runner with cooperative cancellation, wiring BatchService.submit through it, exposing DELETE /batch/{batch_id} with RFC 7807 errors, and adding corresponding UI controls (max_parallel slider and cancel flow) plus settings and tests, all without schema changes.

Sequence diagram for cooperative batch cancellation via DELETE /batch/{batch_id}

sequenceDiagram
    actor User
    participant FrontendBatchPage as FrontendBatchPage
    participant UseCancelBatch as useCancelBatch
    participant BatchRoutes as cancel_batch_route
    participant BatchService as BatchService
    participant Runner as runner
    participant DB as Postgres

    User->>FrontendBatchPage: Click Cancel batch
    FrontendBatchPage->>UseCancelBatch: useCancelBatch.mutate(batch_id)
    UseCancelBatch->>BatchRoutes: DELETE /batch/{batch_id}
    BatchRoutes->>BatchService: get(db, batch_id)
    BatchService-->>BatchRoutes: BatchJob or None
    alt Batch not found
        BatchRoutes-->>UseCancelBatch: 404 NotFoundError
        UseCancelBatch-->>FrontendBatchPage: isError (404)
    else Batch terminal
        BatchRoutes-->>UseCancelBatch: 409 ConflictError
        UseCancelBatch-->>FrontendBatchPage: isError (409)
    else Batch running
        BatchRoutes->>Runner: cancel_batch(batch_id)
        alt cancel_batch returns False
            BatchRoutes-->>UseCancelBatch: 409 ConflictError
        else cancel_batch returns True
            BatchRoutes->>Runner: await_drain(batch_id, timeout_seconds)
            alt drain timeout
                Runner-->>BatchRoutes: drained = False
                BatchRoutes-->>UseCancelBatch: 504 GatewayTimeoutError
            else drain success
                Runner-->>BatchRoutes: drained = True
                BatchRoutes->>BatchService: get(db, batch_id)
                BatchService-->>BatchRoutes: settled BatchJob
                BatchRoutes-->>UseCancelBatch: 200 BatchSubmitResponse
                UseCancelBatch-->>FrontendBatchPage: data (status=cancelled or partial)
            end
        end
    end
    Note over BatchService,Runner: BatchService.submit calls runner.mark_completed(batch_id)
    Note over Runner,DB: cancel_event and completed_event control cooperative drain
Loading

File-Level Changes

Change Details Files
Introduce asyncio-based bounded-concurrency runner with cooperative cancellation and registry for active batches.
  • Add app/features/batch/runner.py implementing run_batch with asyncio.Semaphore and TaskGroup, per-batch CancelHandle registry, cooperative cancel via asyncio.Event, and helper functions to update batch_job and batch_job_item state.
  • Ensure each child task uses its own AsyncSession from a shared async_sessionmaker, correctly tracks running_items, and writes cancelled/failed statuses on cancellation or unexpected errors.
  • Expose cancel_batch, await_drain, and mark_completed utilities for use by routes and BatchService, avoiding asyncio.all_tasks scanning and handling ExceptionGroup via except* asyncio.CancelledError.
app/features/batch/runner.py
PRPs/ai_docs/asyncio-taskgroup-cancellation.md
app/features/batch/tests/test_runner.py
app/features/batch/tests/test_runner_chaos.py
Rewire BatchService.submit to use the new runner and persist effective_max_parallel and cancel-aware parent status.
  • Change submit to collect inserted BatchJobItem IDs, create a local session_maker via get_session_maker, and delegate execution to runner.run_batch with an inner _exec_one that opens its own AsyncSession and calls _execute_item.
  • Extend _settle to accept effective_max_parallel, compute CANCELLED as a parent terminal status when applicable, keep empty-batch semantics, and persist effective_max_parallel into result_summary JSONB.
  • Call runner.mark_completed after settling so DELETE waiters observe a settled parent, and enrich completion logging with cancelled_items and effective_max_parallel.
app/features/batch/service.py
Expose DELETE /batch/{batch_id} to cooperatively cancel in-flight batches with bounded drain and RFC 7807 errors.
  • Add DELETE /batch/{batch_id} route that validates batch existence and terminal states, triggers runner.cancel_batch, waits for drain via runner.await_drain with Settings.batch_cancel_drain_timeout_seconds, and reloads the parent.
  • Return 404 for missing batches, 409 when already terminal or when the batch settles between GET and cancel, and 504 via GatewayTimeoutError when drain times out.
  • Define a local set of terminal batch statuses to gate deletion and log cancellation events.
app/features/batch/routes.py
app/core/exceptions.py
app/core/problem_details.py
app/features/batch/tests/test_routes_cancel.py
Extend batch schemas, models, and settings to support parallelism metadata and cancel transitions.
  • Add Settings.batch_global_max_parallel and Settings.batch_cancel_drain_timeout_seconds with defaults and documentation, and expose them via .env.example.
  • Extend BatchSubmitResponse to include max_parallel and a computed_field effective_max_parallel that reads from result_summary.effective_max_parallel with robust defaulting and typing.
  • Allow RUNNING -> CANCELLED in VALID_BATCH_ITEM_TRANSITIONS and update tests to match so cooperative cancel can truthfully mark terminal state.
app/core/config.py
.env.example
app/features/batch/schemas.py
app/features/batch/models.py
app/features/batch/tests/test_models.py
Add frontend support for max_parallel control, cancel flow, and display of running/parallel metadata.
  • Introduce useCancelBatch hook that calls DELETE /batch/{id}, updates the specific batch cache entry, and invalidates related queries, with tests covering success and RFC 7807 error surfaces.
  • Update BatchRunnerPage to include TERMINAL_BATCH_STATES, wire useCancelBatch, manage maxParallel state sent in BatchSubmitRequest, and render a Slider to control max_parallel plus chips for running_items and effective_max_parallel.
  • Add a Cancel batch destructive button with AlertDialog confirmation and ErrorDisplay handling, and tighten WAPE rendering type check.
frontend/src/hooks/use-batches.ts
frontend/src/hooks/use-batches.test.ts
frontend/src/pages/visualize/batch.tsx
frontend/src/types/api.ts
Install and wire shadcn-based Slider UI component and Radix dependency for the frontend.
  • Add Slider component wrapper around Radix slider primitives with Tailwind classes and multi-thumb support under components/ui/slider.tsx.
  • Include radix-ui dependency in frontend/package.json and adjust pnpm.onlyBuiltDependencies formatting.
  • Ensure the slider is used by BatchRunnerPage for max_parallel control.
frontend/src/components/ui/slider.tsx
frontend/package.json
frontend/pnpm-lock.yaml

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 25, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 066a758d-39a5-458e-8897-32b7de59f7bc

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/batch-parallel-execution

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@socket-security
Copy link
Copy Markdown

socket-security Bot commented May 25, 2026

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addednpm/​@​radix-ui/​react-slider@​1.3.6991007291100

View full report

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 4 issues, and left some high level feedback:

  • The new slider component is importing SliderPrimitive from "radix-ui", which differs from the standard shadcn pattern (@radix-ui/react-slider); consider aligning the import/package to the shadcn template to avoid relying on a nonstandard entry point and potential runtime issues.
  • Both the backend (_TERMINAL_BATCH_STATES in routes.py) and frontend (TERMINAL_BATCH_STATES in batch.tsx) now hard-code the set of terminal batch statuses; it may be worth centralizing this mapping (or deriving it from the shared BatchStatus enum) to avoid future drift between the API and UI.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new slider component is importing `SliderPrimitive` from `"radix-ui"`, which differs from the standard shadcn pattern (`@radix-ui/react-slider`); consider aligning the import/package to the shadcn template to avoid relying on a nonstandard entry point and potential runtime issues.
- Both the backend (`_TERMINAL_BATCH_STATES` in `routes.py`) and frontend (`TERMINAL_BATCH_STATES` in `batch.tsx`) now hard-code the set of terminal batch statuses; it may be worth centralizing this mapping (or deriving it from the shared `BatchStatus` enum) to avoid future drift between the API and UI.

## Individual Comments

### Comment 1
<location path="app/features/batch/runner.py" line_range="136-145" />
<code_context>
+    handle = _ACTIVE_BATCHES.get(batch_id)
+    if handle is None:
+        return True
+    try:
+        await asyncio.wait_for(handle.completed_event.wait(), timeout=timeout_seconds)
+        return True
+    except TimeoutError:
+        return False
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Use asyncio.TimeoutError instead of built-in TimeoutError when catching wait_for timeouts

`asyncio.wait_for` raises `asyncio.TimeoutError`, not the built-in `TimeoutError`. Catching `TimeoutError` here means the timeout will escape instead of returning `False`, breaking the intended 504 behavior in the DELETE handler. Change the except clause to `except asyncio.TimeoutError:` (or import and use it directly) so the drain timeout behaves as documented and matches `cancel_batch_route`'s handling.
</issue_to_address>

### Comment 2
<location path="app/features/batch/tests/test_routes_cancel.py" line_range="34-43" />
<code_context>
+async def test_delete_409_terminal_batch(
</code_context>
<issue_to_address>
**suggestion (testing):** Add a happy-path DELETE /batch/{batch_id} test that exercises the 200 + drain-success case

Right now the tests only cover 404, 409, and 504, but not the documented 200 + successful drain path. Please add an integration test that:

- seeds an in-flight batch (similar to `_seed_synthetic_batch` in `test_runner_chaos.py`),
- registers a `CancelHandle` in the runner registry,
- calls `DELETE /batch/{batch_id}` such that `runner.cancel_batch` and `runner.await_drain` both return `True`,
- asserts a 200 and verifies the body shows a terminal status (`cancelled` or `partial`), `running_items == 0`, and `effective_max_parallel` in `result_summary`.

This will exercise the full success path for the cancel flow end-to-end.

Suggested implementation:

```python
    """A successfully-completed batch is terminal — DELETE returns RFC 7807 409.

    Submits a 3-pair naive backtest; the run completes synchronously inside
    ``POST /batch/forecasting``. The subsequent DELETE finds the parent in
    ``completed`` (terminal) and the runner registry empty.


async def test_delete_200_cancel_success(
    client: AsyncClient,
    sample_store: Store,
    sample_products_3: list[Product],
    sample_sales_120: list[Any],
    runner_registry,
) -> None:
    """Happy-path cancel: DELETE /batch/{batch_id} → 200 with successful drain.

    Seeds an in-flight batch, registers a CancelHandle in the runner registry
    whose ``cancel_batch`` and ``await_drain`` both succeed, and verifies that
    DELETE returns 200 with a terminal batch status and no running items.
    """
    # Seed an in-flight batch (same shape as the chaos tests use).
    batch = await _seed_synthetic_batch(
        store=sample_store,
        products=sample_products_3,
        sales_rows=sample_sales_120,
        start_inflight=True,
    )

    # Register a CancelHandle that reports both cancel and drain as successful.
    cancel_handle = AsyncMock()
    cancel_handle.cancel_batch.return_value = True
    cancel_handle.await_drain.return_value = True
    runner_registry.register(batch_id=batch.batch_id, handle=cancel_handle)

    # Exercise the cancel endpoint.
    resp = await client.delete(f"/batch/{batch.batch_id}")
    assert resp.status_code == 200
    body = resp.json()

    # Terminal batch status is either fully cancelled or partially completed.
    assert body["status"] in ("cancelled", "partial")

    # All items should have drained; none are still running.
    assert body["running_items"] == 0

    # The result summary should include the effective max parallelism used.
    summary = body.get("result_summary") or {}
    assert "effective_max_parallel" in summary
    assert isinstance(summary["effective_max_parallel"], int)
    assert summary["effective_max_parallel"] >= 1

```

To make this compile and pass, you will also need to:

1. **Import helpers and mocks at the top of this file (if not already present):**
   - `from httpx import AsyncClient` (already likely present).
   - `from unittest.mock import AsyncMock`.
   - `from app.features.batch.tests.test_runner_chaos import _seed_synthetic_batch` (or the correct module path for `_seed_synthetic_batch`).
   - Whatever provides `Store`, `Product`, and `runner_registry` if not already imported/declared in this file or `conftest.py`.

2. **Ensure `_seed_synthetic_batch` supports an `start_inflight=True` (or equivalent) flag:**
   - It should create a batch in the “in-flight” / “running” state instead of marking it completed.
   - It must return an object with a `batch_id` attribute that matches what `DELETE /batch/{batch_id}` expects.

3. **Ensure the runner registry API matches the usage in the test:**
   - Provide a `runner_registry` fixture that yields the registry used by the cancel route.
   - Implement `runner_registry.register(batch_id=..., handle=...)` or adapt the call in the test to your actual API (e.g., `runner_registry[batch.batch_id] = cancel_handle` or `runner_registry.add(batch.batch_id, cancel_handle)`).

4. **Align the response JSON fields with your actual schema:**
   - If your cancel endpoint uses different field names (e.g., `state` instead of `status`, or `summary` instead of `result_summary`), adjust the assertions in the test accordingly.
   - If terminal statuses use different strings (e.g., `"canceled"` vs `"cancelled"`, `"partially_completed"` vs `"partial"`), update the `status` assertion to use the exact values your API returns.
</issue_to_address>

### Comment 3
<location path="PRPs/PRP-34-batch-parallel-execution.md" line_range="554" />
<code_context>
+      test_cancel_pending_child_marks_cancelled_without_running
+        - max_parallel=1, 3 items. After first starts, cancel event fires.
+        - Assert items 2 and 3 transition pending → cancelled, never opened a session.
+      test_cancel_running_child_propagates_cancellederror
+        - One child sleeps 1s; cancel after 0.05s. Child observes CancelledError, finally block writes cancelled.
+
</code_context>
<issue_to_address>
**issue (typo):** The test name here likely has a typo in `cancellederror` and should match the usual `CancelledError` spelling.

For consistency with the rest of the document (where you use `asyncio.CancelledError`), please rename this to something like `test_cancel_running_child_propagates_cancelled_error` so the `CancelledError` portion is spelled correctly and easier to find via search.

```suggestion
      test_cancel_running_child_propagates_cancelled_error
```
</issue_to_address>

### Comment 4
<location path="app/features/batch/runner.py" line_range="122" />
<code_context>
+                total_items=len(item_ids), max_parallel=max_parallel,
+                effective_max_parallel=effective)
+
+    async def _child(item_id: str) -> None:
+        # FAST-CANCEL BEFORE acquire — skips not-yet-started work cleanly.
+        if handle.cancel_event.is_set():
</code_context>
<issue_to_address>
**issue (complexity):** Consider reusing a single per-child AsyncSession and passing it into the helper functions instead of having each helper create its own session and transaction.

You can reduce complexity and boilerplate without changing behavior by reusing a single `AsyncSession` per child and passing it into the helpers, instead of having each helper open its own session/transaction.

That gives you:

- fewer moving parts (no per-helper `async with session_maker()`)
- more obvious “per child, per DB session” semantics
- a simpler mental model for state transitions

Concretely:

1. Create a per-child session in `_child` and pass it to the helpers:

```python
async def _child(item_id: str) -> None:
    async with session_maker() as session:
        if handle.cancel_event.is_set():
            await _mark_cancelled_skipped(session, item_id)
            return

        acquired = False
        try:
            async with sem:
                acquired = True
                if handle.cancel_event.is_set():
                    await _mark_cancelled_skipped(session, item_id)
                    return
                await _bump_running(session, batch_id, +1)
                try:
                    await execute_item(item_id)
                except asyncio.CancelledError:
                    await _mark_cancelled_running(session, item_id)
                    raise
                except Exception:
                    logger.exception(
                        "batch.runner_unexpected_child_error",
                        batch_id=batch_id,
                        item_id=item_id,
                    )
                    await _mark_failed_unexpected(session, item_id)
                finally:
                    await _bump_running(session, batch_id, -1)
        except asyncio.CancelledError:
            if not acquired:
                await _mark_cancelled_skipped(session, item_id)
            raise
```

2. Change helpers to accept an `AsyncSession` instead of a `session_maker`, and drop their internal `async with` + `commit` boilerplate. For example:

```python
async def _bump_running(
    session: AsyncSession,
    batch_id: str,
    delta: int,
) -> None:
    await session.execute(
        update(BatchJob)
        .where(BatchJob.batch_id == batch_id)
        .values(running_items=BatchJob.running_items + delta)
    )
    await session.commit()
```

```python
async def _mark_cancelled_skipped(
    session: AsyncSession,
    item_id: str,
) -> None:
    now = datetime.now(UTC)
    await session.execute(
        update(BatchJobItem)
        .where(BatchJobItem.item_id == item_id)
        .values(
            status=BatchItemStatus.CANCELLED.value,
            completed_at=now,
        )
    )
    await session.commit()
```

```python
async def _mark_cancelled_running(
    session: AsyncSession,
    item_id: str,
) -> None:
    from sqlalchemy import select

    now = datetime.now(UTC)
    row = (
        await session.execute(
            select(BatchJobItem.started_at).where(BatchJobItem.item_id == item_id)
        )
    ).first()
    started_at = row[0] if row is not None else None
    duration_ms = (
        int((now - started_at).total_seconds() * 1000)
        if started_at is not None
        else None
    )
    await session.execute(
        update(BatchJobItem)
        .where(BatchJobItem.item_id == item_id)
        .values(
            status=BatchItemStatus.CANCELLED.value,
            completed_at=now,
            duration_ms=duration_ms,
        )
    )
    await session.commit()
```

```python
async def _mark_failed_unexpected(
    session: AsyncSession,
    item_id: str,
) -> None:
    now = datetime.now(UTC)
    await session.execute(
        update(BatchJobItem)
        .where(BatchJobItem.item_id == item_id)
        .values(
            status=BatchItemStatus.FAILED.value,
            completed_at=now,
            error_message="Runner caught unexpected exception (see structlog)",
            error_type="UnexpectedRunnerError",
        )
    )
    await session.commit()
```

This keeps all existing behavior (including per-item commits and the same cancellation semantics) while making the control flow and DB interaction substantially easier to follow and maintain.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread app/features/batch/runner.py Outdated
Comment on lines +136 to +145
try:
async with sem:
acquired = True
# Re-check after acquire — a sibling may have signalled
# cancel while we waited on the semaphore.
if handle.cancel_event.is_set():
await _mark_cancelled_skipped(session_maker, item_id)
return
await _bump_running(session_maker, batch_id, +1)
try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Use asyncio.TimeoutError instead of built-in TimeoutError when catching wait_for timeouts

asyncio.wait_for raises asyncio.TimeoutError, not the built-in TimeoutError. Catching TimeoutError here means the timeout will escape instead of returning False, breaking the intended 504 behavior in the DELETE handler. Change the except clause to except asyncio.TimeoutError: (or import and use it directly) so the drain timeout behaves as documented and matches cancel_batch_route's handling.

Comment on lines +34 to +43
async def test_delete_409_terminal_batch(
client: AsyncClient,
sample_store: Store,
sample_products_3: list[Product],
sample_sales_120: list[Any],
) -> None:
"""A successfully-completed batch is terminal — DELETE returns RFC 7807 409.

Submits a 3-pair naive backtest; the run completes synchronously inside
``POST /batch/forecasting``. The subsequent DELETE finds the parent in
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a happy-path DELETE /batch/{batch_id} test that exercises the 200 + drain-success case

Right now the tests only cover 404, 409, and 504, but not the documented 200 + successful drain path. Please add an integration test that:

  • seeds an in-flight batch (similar to _seed_synthetic_batch in test_runner_chaos.py),
  • registers a CancelHandle in the runner registry,
  • calls DELETE /batch/{batch_id} such that runner.cancel_batch and runner.await_drain both return True,
  • asserts a 200 and verifies the body shows a terminal status (cancelled or partial), running_items == 0, and effective_max_parallel in result_summary.

This will exercise the full success path for the cancel flow end-to-end.

Suggested implementation:

    """A successfully-completed batch is terminal — DELETE returns RFC 7807 409.

    Submits a 3-pair naive backtest; the run completes synchronously inside
    ``POST /batch/forecasting``. The subsequent DELETE finds the parent in
    ``completed`` (terminal) and the runner registry empty.


async def test_delete_200_cancel_success(
    client: AsyncClient,
    sample_store: Store,
    sample_products_3: list[Product],
    sample_sales_120: list[Any],
    runner_registry,
) -> None:
    """Happy-path cancel: DELETE /batch/{batch_id} → 200 with successful drain.

    Seeds an in-flight batch, registers a CancelHandle in the runner registry
    whose ``cancel_batch`` and ``await_drain`` both succeed, and verifies that
    DELETE returns 200 with a terminal batch status and no running items.
    """
    # Seed an in-flight batch (same shape as the chaos tests use).
    batch = await _seed_synthetic_batch(
        store=sample_store,
        products=sample_products_3,
        sales_rows=sample_sales_120,
        start_inflight=True,
    )

    # Register a CancelHandle that reports both cancel and drain as successful.
    cancel_handle = AsyncMock()
    cancel_handle.cancel_batch.return_value = True
    cancel_handle.await_drain.return_value = True
    runner_registry.register(batch_id=batch.batch_id, handle=cancel_handle)

    # Exercise the cancel endpoint.
    resp = await client.delete(f"/batch/{batch.batch_id}")
    assert resp.status_code == 200
    body = resp.json()

    # Terminal batch status is either fully cancelled or partially completed.
    assert body["status"] in ("cancelled", "partial")

    # All items should have drained; none are still running.
    assert body["running_items"] == 0

    # The result summary should include the effective max parallelism used.
    summary = body.get("result_summary") or {}
    assert "effective_max_parallel" in summary
    assert isinstance(summary["effective_max_parallel"], int)
    assert summary["effective_max_parallel"] >= 1

To make this compile and pass, you will also need to:

  1. Import helpers and mocks at the top of this file (if not already present):

    • from httpx import AsyncClient (already likely present).
    • from unittest.mock import AsyncMock.
    • from app.features.batch.tests.test_runner_chaos import _seed_synthetic_batch (or the correct module path for _seed_synthetic_batch).
    • Whatever provides Store, Product, and runner_registry if not already imported/declared in this file or conftest.py.
  2. Ensure _seed_synthetic_batch supports an start_inflight=True (or equivalent) flag:

    • It should create a batch in the “in-flight” / “running” state instead of marking it completed.
    • It must return an object with a batch_id attribute that matches what DELETE /batch/{batch_id} expects.
  3. Ensure the runner registry API matches the usage in the test:

    • Provide a runner_registry fixture that yields the registry used by the cancel route.
    • Implement runner_registry.register(batch_id=..., handle=...) or adapt the call in the test to your actual API (e.g., runner_registry[batch.batch_id] = cancel_handle or runner_registry.add(batch.batch_id, cancel_handle)).
  4. Align the response JSON fields with your actual schema:

    • If your cancel endpoint uses different field names (e.g., state instead of status, or summary instead of result_summary), adjust the assertions in the test accordingly.
    • If terminal statuses use different strings (e.g., "canceled" vs "cancelled", "partially_completed" vs "partial"), update the status assertion to use the exact values your API returns.

Comment thread PRPs/PRP-34-batch-parallel-execution.md Outdated
Comment thread app/features/batch/runner.py
Six review-driven cleanups, behaviour preserved:

- runner.py: helpers now accept the per-child AsyncSession instead of the
  session_maker; one session opened at the top of _child, reused for every
  state-transition write (each helper still commits its own UPDATE so the
  running_items counter is observable to concurrent DELETE handlers).
- runner.py: clarifying comment on the `except TimeoutError:` branch in
  await_drain — `asyncio.TimeoutError` has been aliased to the built-in
  since Python 3.11 (PEP 678 / asyncio docs); the project pins >= 3.12.
- frontend/src/components/ui/slider.tsx: switched from the bundled
  `radix-ui` package to per-component `@radix-ui/react-slider` to match
  the project's existing shadcn primitives (alert-dialog, dialog, etc.).
- app/features/batch/models.py + routes.py: TERMINAL_BATCH_STATES is now
  derived from VALID_BATCH_TRANSITIONS (a status with no out-edges is
  terminal) and exported from the models module — routes.py imports it
  instead of redeclaring the set.
- frontend/src/types/api.ts: exports TERMINAL_BATCH_STATES so batch.tsx
  (and any future consumer) reads from the single source of truth.
- test_routes_cancel.py: added test_delete_200_clean_drain covering the
  documented 200 happy-path.
- Typo: test_cancel_running_child_propagates_cancellederror →
  test_cancel_running_child_propagates_cancelled_error (PRP doc updated).
@w7-mgfcode w7-mgfcode merged commit 36fff6e into dev May 25, 2026
10 checks passed
@w7-mgfcode w7-mgfcode deleted the feat/batch-parallel-execution branch May 25, 2026 13:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant