From 82944bbfdfe1907e3cb5bf7cefb88ace2ea8b76f Mon Sep 17 00:00:00 2001 From: Chris Pyle Date: Mon, 30 Mar 2026 16:50:58 -0400 Subject: [PATCH 1/2] async tasks --- .../console-2026-03-30T12-44-16-928Z.log | 3 + .../console-2026-03-30T12-46-50-629Z.log | 3 + .../console-2026-03-30T18-45-35-182Z.log | 3 + .../console-2026-03-30T18-54-46-288Z.log | 2 + .../console-2026-03-30T18-56-02-715Z.log | 2 + .../console-2026-03-30T20-39-28-985Z.log | 3 + backend/Dockerfile | 2 +- backend/app/api/ingestion.py | 141 +++++------ backend/app/jobs/ingestion_job.py | 155 +++++++++++- backend/app/jobs/propose_job.py | 138 +++++++++++ backend/app/jobs/run_tracker.py | 60 +++++ backend/app/main.py | 23 ++ backend/app/services/ingestion_service.py | 68 ++++-- docker-compose.yml | 8 +- frontend/src/api/useIngestion.ts | 217 ++++++++++++++-- frontend/src/components/layout/Topbar.tsx | 55 +++-- frontend/src/pages/Ingestion.tsx | 55 ++--- plans/async-runs-improvements.md | 107 ++++++++ plans/async-runs.md | 231 ++++++++++++++++++ 19 files changed, 1087 insertions(+), 189 deletions(-) create mode 100644 .playwright-mcp/console-2026-03-30T12-44-16-928Z.log create mode 100644 .playwright-mcp/console-2026-03-30T12-46-50-629Z.log create mode 100644 .playwright-mcp/console-2026-03-30T18-45-35-182Z.log create mode 100644 .playwright-mcp/console-2026-03-30T18-54-46-288Z.log create mode 100644 .playwright-mcp/console-2026-03-30T18-56-02-715Z.log create mode 100644 .playwright-mcp/console-2026-03-30T20-39-28-985Z.log create mode 100644 backend/app/jobs/propose_job.py create mode 100644 backend/app/jobs/run_tracker.py create mode 100644 plans/async-runs-improvements.md create mode 100644 plans/async-runs.md diff --git a/.playwright-mcp/console-2026-03-30T12-44-16-928Z.log b/.playwright-mcp/console-2026-03-30T12-44-16-928Z.log new file mode 100644 index 0000000..0b6d308 --- /dev/null +++ b/.playwright-mcp/console-2026-03-30T12-44-16-928Z.log @@ -0,0 +1,3 @@ +[ 1021ms] [WARNING] ⚠️ React Router Future Flag Warning: React Router will begin wrapping state updates in `React.startTransition` in v7. You can use the `v7_startTransition` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_starttransition. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 1021ms] [WARNING] ⚠️ React Router Future Flag Warning: Relative route resolution within Splat routes is changing in v7. You can use the `v7_relativeSplatPath` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_relativesplatpath. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 1037ms] [ERROR] Failed to load resource: the server responded with a status of 404 (Not Found) @ http://tasky-async-runs.lvh.me/favicon.ico:0 diff --git a/.playwright-mcp/console-2026-03-30T12-46-50-629Z.log b/.playwright-mcp/console-2026-03-30T12-46-50-629Z.log new file mode 100644 index 0000000..db534b2 --- /dev/null +++ b/.playwright-mcp/console-2026-03-30T12-46-50-629Z.log @@ -0,0 +1,3 @@ +[ 446ms] [WARNING] ⚠️ React Router Future Flag Warning: React Router will begin wrapping state updates in `React.startTransition` in v7. You can use the `v7_startTransition` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_starttransition. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 447ms] [WARNING] ⚠️ React Router Future Flag Warning: Relative route resolution within Splat routes is changing in v7. You can use the `v7_relativeSplatPath` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_relativesplatpath. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 462ms] [ERROR] Failed to load resource: the server responded with a status of 404 (Not Found) @ http://tasky-async-runs.lvh.me/favicon.ico:0 diff --git a/.playwright-mcp/console-2026-03-30T18-45-35-182Z.log b/.playwright-mcp/console-2026-03-30T18-45-35-182Z.log new file mode 100644 index 0000000..57d273e --- /dev/null +++ b/.playwright-mcp/console-2026-03-30T18-45-35-182Z.log @@ -0,0 +1,3 @@ +[ 673ms] [WARNING] ⚠️ React Router Future Flag Warning: React Router will begin wrapping state updates in `React.startTransition` in v7. You can use the `v7_startTransition` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_starttransition. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 673ms] [WARNING] ⚠️ React Router Future Flag Warning: Relative route resolution within Splat routes is changing in v7. You can use the `v7_relativeSplatPath` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_relativesplatpath. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 702ms] [ERROR] Failed to load resource: the server responded with a status of 404 (Not Found) @ http://tasky-async-runs.lvh.me/favicon.ico:0 diff --git a/.playwright-mcp/console-2026-03-30T18-54-46-288Z.log b/.playwright-mcp/console-2026-03-30T18-54-46-288Z.log new file mode 100644 index 0000000..b9ec3dd --- /dev/null +++ b/.playwright-mcp/console-2026-03-30T18-54-46-288Z.log @@ -0,0 +1,2 @@ +[ 680ms] [WARNING] ⚠️ React Router Future Flag Warning: React Router will begin wrapping state updates in `React.startTransition` in v7. You can use the `v7_startTransition` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_starttransition. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 681ms] [WARNING] ⚠️ React Router Future Flag Warning: Relative route resolution within Splat routes is changing in v7. You can use the `v7_relativeSplatPath` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_relativesplatpath. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 diff --git a/.playwright-mcp/console-2026-03-30T18-56-02-715Z.log b/.playwright-mcp/console-2026-03-30T18-56-02-715Z.log new file mode 100644 index 0000000..99f47b4 --- /dev/null +++ b/.playwright-mcp/console-2026-03-30T18-56-02-715Z.log @@ -0,0 +1,2 @@ +[ 329ms] [WARNING] ⚠️ React Router Future Flag Warning: React Router will begin wrapping state updates in `React.startTransition` in v7. You can use the `v7_startTransition` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_starttransition. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 329ms] [WARNING] ⚠️ React Router Future Flag Warning: Relative route resolution within Splat routes is changing in v7. You can use the `v7_relativeSplatPath` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_relativesplatpath. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 diff --git a/.playwright-mcp/console-2026-03-30T20-39-28-985Z.log b/.playwright-mcp/console-2026-03-30T20-39-28-985Z.log new file mode 100644 index 0000000..1a82ecb --- /dev/null +++ b/.playwright-mcp/console-2026-03-30T20-39-28-985Z.log @@ -0,0 +1,3 @@ +[ 6486ms] [WARNING] ⚠️ React Router Future Flag Warning: React Router will begin wrapping state updates in `React.startTransition` in v7. You can use the `v7_startTransition` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_starttransition. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 6486ms] [WARNING] ⚠️ React Router Future Flag Warning: Relative route resolution within Splat routes is changing in v7. You can use the `v7_relativeSplatPath` future flag to opt-in early. For more information, see https://reactrouter.com/v6/upgrading/future#v7_relativesplatpath. @ http://tasky-async-runs.lvh.me/node_modules/.vite/deps/react-router-dom.js?v=68971fd7:4435 +[ 6506ms] [ERROR] Failed to load resource: the server responded with a status of 404 (Not Found) @ http://tasky-async-runs.lvh.me/favicon.ico:0 diff --git a/backend/Dockerfile b/backend/Dockerfile index 6d40100..ce5c422 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -15,4 +15,4 @@ RUN uv sync --frozen --no-dev COPY . . # Run migrations then start the server -CMD ["sh", "-c", "uv run alembic upgrade head && uv run uvicorn app.main:app --host 0.0.0.0 --port 7400 --reload"] +CMD ["sh", "-c", "uv run alembic upgrade head && uv run uvicorn app.main:app --host 0.0.0.0 --port 7400 --reload --reload-exclude .venv"] diff --git a/backend/app/api/ingestion.py b/backend/app/api/ingestion.py index 9101f83..819ee8a 100644 --- a/backend/app/api/ingestion.py +++ b/backend/app/api/ingestion.py @@ -12,6 +12,7 @@ POST /ingestion/runs/{id}/rerun Re-run an existing ingestion run """ +import asyncio import time from datetime import datetime, timedelta, timezone @@ -186,16 +187,26 @@ def _run_to_detail(run) -> dict: @router.post("/runs") -def create_run(body: CreateRunRequest, db: Session = Depends(get_db)): +async def create_run(body: CreateRunRequest, db: Session = Depends(get_db)): + from app.jobs.ingestion_job import _run_full_pipeline_bg + from app.jobs.run_tracker import claim_run, has_active_run + from app.models.ingestion_run import IngestionRun, RunStatus from app.services.ingestion_service import IngestionService + existing = db.query(IngestionRun).filter(IngestionRun.status == RunStatus.running).first() + if has_active_run() or existing: + raise HTTPException(status_code=409, detail="A run is already in progress") + range_start = datetime.fromisoformat(body.start_date) range_end = ( datetime.fromisoformat(body.end_date) if body.end_date else datetime.now(timezone.utc) ) svc = IngestionService(db) - run = svc.trigger_run(range_start, range_end) - return _run_to_detail(run) + run = svc.create_run(range_start, range_end) + if not claim_run(run.id): + raise HTTPException(status_code=409, detail="A run is already in progress") + asyncio.create_task(asyncio.to_thread(_run_full_pipeline_bg, run.id)) + return _run_to_summary(run) @router.get("/runs") @@ -217,26 +228,46 @@ def get_run(run_id: int, db: Session = Depends(get_db)): @router.post("/runs/{run_id}/rerun") -def rerun(run_id: int, db: Session = Depends(get_db)): +async def rerun(run_id: int, db: Session = Depends(get_db)): + from app.jobs.ingestion_job import _run_full_pipeline_bg + from app.jobs.run_tracker import claim_run, has_active_run + from app.models.ingestion_run import IngestionRun, RunStatus from app.services.ingestion_service import IngestionService - run = IngestionService(db).rerun(run_id) + existing = db.query(IngestionRun).filter(IngestionRun.status == RunStatus.running).first() + if has_active_run() or existing: + raise HTTPException(status_code=409, detail="A run is already in progress") + + run = IngestionService(db).reset_run_for_rerun(run_id) if not run: raise HTTPException(status_code=404, detail="Run not found") - return _run_to_detail(run) + if not claim_run(run.id): + raise HTTPException(status_code=409, detail="A run is already in progress") + asyncio.create_task(asyncio.to_thread(_run_full_pipeline_bg, run.id)) + return _run_to_summary(run) @router.post("/sync") -def sync(db: Session = Depends(get_db)): - """Run a full ingestion + LLM pipeline from the last run's end time to now. +async def sync(db: Session = Depends(get_db)): + """Kick off a full ingestion + LLM pipeline asynchronously. - Determines range_start automatically from the most recent run's range_end, - falling back to 24 hours ago if no prior runs exist. + Returns immediately with {run_id, status: "running"}. + Poll GET /ingestion/runs/{run_id} for live status. + Returns 409 if a run is already in progress. """ - from app.jobs.ingestion_job import run_full_pipeline - from app.models.ingestion_run import TriggeredBy + from app.jobs.ingestion_job import _run_full_pipeline_bg, run_full_pipeline_async + from app.jobs.run_tracker import claim_run, has_active_run + from app.models.ingestion_run import IngestionRun, RunStatus, TriggeredBy + + existing = db.query(IngestionRun).filter(IngestionRun.status == RunStatus.running).first() + if has_active_run() or existing: + raise HTTPException(status_code=409, detail="A run is already in progress") - return run_full_pipeline(db, triggered_by=TriggeredBy.manual) + run = run_full_pipeline_async(db, triggered_by=TriggeredBy.manual) + if not claim_run(run.id): + raise HTTPException(status_code=409, detail="A run is already in progress") + asyncio.create_task(asyncio.to_thread(_run_full_pipeline_bg, run.id)) + return {"run_id": run.id, "status": "running"} @router.delete("/runs/{run_id}") @@ -289,88 +320,26 @@ def get_run_proposals(run_id: int, db: Session = Depends(get_db)): @router.post("/runs/{run_id}/propose") -def propose_tasks(run_id: int, db: Session = Depends(get_db)): - """Run LLM proposal generation for a run. +async def propose_tasks(run_id: int, db: Session = Depends(get_db)): + """Kick off LLM proposal generation for a run asynchronously. - Deletes all existing proposals for this run's batches, then calls the LLM - and saves the new proposals with status=pending. + Returns immediately with {run_id, status: "running"}. + Poll GET /ingestion/runs/{run_id}/proposals for results. + Returns 409 if a propose job is already running for this run. """ - from app.llm.client import LLMClient, LLMError - from app.llm.prompt_builder import build_proposal_prompt - from app.models.experience import Experience - from app.models.task import Task, TaskStatus - from app.models.task_proposal import ProposalCreatedBy, ProposalType, TaskProposal + from app.jobs.propose_job import run_propose_bg + from app.jobs.run_tracker import claim_propose from app.services.ingestion_service import IngestionService run = IngestionService(db).get_run(run_id) if not run: raise HTTPException(status_code=404, detail="Run not found") - # Delete existing proposals for this run's batches. - batch_ids = [b.id for b in run.batches] - if batch_ids: - db.query(TaskProposal).filter(TaskProposal.ingestion_batch_id.in_(batch_ids)).delete( - synchronize_session=False - ) - db.commit() - - active_tasks = ( - db.query(Task).filter(Task.status.notin_([TaskStatus.done, TaskStatus.cancelled])).all() - ) - active_experiences = db.query(Experience).filter(Experience.active.is_(True)).all() - experience_name_to_id = {exp.folder_path: exp.id for exp in active_experiences} - - system_prompt, user_prompt = build_proposal_prompt(run, db, active_tasks, active_experiences) + if not claim_propose(run_id): + raise HTTPException(status_code=409, detail="Proposal already in progress for this run") - try: - t0 = time.monotonic() - result = LLMClient().generate_proposals_with_meta(system_prompt, user_prompt) - duration_ms = round((time.monotonic() - t0) * 1000) - except LLMError as e: - raise HTTPException(status_code=500, detail=str(e)) from e - - batch_id = run.batches[0].id if run.batches else None - now = datetime.now(timezone.utc) - saved = [] - for p in result.batch.proposals: - due_at = None - if p.proposed_due_at: - try: - due_at = datetime.fromisoformat(p.proposed_due_at.replace("Z", "+00:00")) - except ValueError: - pass - experience_id = ( - experience_name_to_id.get(p.proposed_experience_name) - if p.proposed_experience_name - else None - ) - proposal = TaskProposal( - proposal_type=ProposalType(p.proposal_type), - status="pending", - task_id=p.task_id, - proposed_title=p.proposed_title, - proposed_description=p.proposed_description, - proposed_status=p.proposed_status, - proposed_experience_id=experience_id, - proposed_due_at=due_at, - proposed_external_ref=p.proposed_external_ref, - reason_summary=p.reason_summary, - created_at=now, - created_by=ProposalCreatedBy.ai, - ingestion_batch_id=batch_id, - ) - db.add(proposal) - saved.append(proposal) - db.commit() - - return { - "run_id": run.id, - "model": result.model, - "proposals_saved": len(saved), - "input_tokens": result.input_tokens, - "output_tokens": result.output_tokens, - "duration_ms": duration_ms, - } + asyncio.create_task(asyncio.to_thread(run_propose_bg, run_id)) + return {"run_id": run_id, "status": "running"} # --------------------------------------------------------------------------- diff --git a/backend/app/jobs/ingestion_job.py b/backend/app/jobs/ingestion_job.py index ee56d87..b0cd168 100644 --- a/backend/app/jobs/ingestion_job.py +++ b/backend/app/jobs/ingestion_job.py @@ -1,7 +1,11 @@ """Ingestion job: full pipeline (ingest + LLM proposals). -`run_full_pipeline` is the single entry point used by both the scheduled -background task (every 12 h) and the manual sync API endpoint. +`run_full_pipeline` is the single entry point used by the scheduled +background task (every 12 h). + +`run_full_pipeline_async` creates a run record and returns immediately; +the caller is responsible for dispatching `_run_full_pipeline_bg` in a +background thread via `asyncio.create_task(asyncio.to_thread(...))`. """ import logging @@ -10,7 +14,7 @@ from sqlalchemy.orm import Session -from app.models.ingestion_run import IngestionRun, TriggeredBy +from app.models.ingestion_run import IngestionRun, RunStatus, TriggeredBy from app.services.ingestion_service import IngestionService logger = logging.getLogger(__name__) @@ -146,3 +150,148 @@ def run_full_pipeline( "input_tokens": result.input_tokens, "output_tokens": result.output_tokens, } + + +def run_full_pipeline_async( + db: Session, + triggered_by: TriggeredBy = TriggeredBy.manual, +) -> IngestionRun: + """Create an IngestionRun with status=running and return it immediately. + + The caller must dispatch `_run_full_pipeline_bg(run.id)` in a background + thread and claim the run in the run tracker. Does NOT run connectors or + the LLM. + """ + range_start = _get_pipeline_range_start(db) + range_end = datetime.now(timezone.utc) + + svc = IngestionService(db) + run = svc.create_run(range_start, range_end, triggered_by=triggered_by) + return run + + +def _run_full_pipeline_bg(run_id: int) -> None: + """Background worker: run connectors + LLM for an existing IngestionRun. + + Opens its own DB session (the HTTP request session is already closed). + Updates run.status to completed or failed when done and releases the + run from the tracker. + """ + from app.db.session import SessionLocal + from app.jobs.run_tracker import release_run + from app.llm.client import LLMClient, LLMError + from app.llm.prompt_builder import build_proposal_prompt + from app.models.experience import Experience + from app.models.ingestion_run import IngestionRun + from app.models.task import Task, TaskStatus + from app.models.task_proposal import ProposalCreatedBy, ProposalType, TaskProposal + + db = SessionLocal() + try: + run = db.query(IngestionRun).filter(IngestionRun.id == run_id).first() + if not run: + logger.error("Background pipeline: run %d not found", run_id) + return + + # --- Connector phase --- + svc = IngestionService(db) + try: + svc._run_connectors(run, set_final_status=False) + except Exception as exc: + logger.error("Background pipeline connector phase failed for run %d: %s", run_id, exc) + run.status = RunStatus.failed + run.finished_at = datetime.now(timezone.utc) + run.error_summary = str(exc) + db.commit() + return + + # Refresh batches after connector phase flush. + db.refresh(run) + if not run.batches: + logger.warning("Background pipeline run %d produced no batches — skipping LLM", run_id) + run.status = RunStatus.completed + run.finished_at = datetime.now(timezone.utc) + db.commit() + return + + # --- LLM proposal phase --- + active_tasks = ( + db.query(Task).filter(Task.status.notin_([TaskStatus.done, TaskStatus.cancelled])).all() + ) + active_experiences = db.query(Experience).filter(Experience.active.is_(True)).all() + experience_name_to_id = {exp.folder_path: exp.id for exp in active_experiences} + + system_prompt, user_prompt = build_proposal_prompt( + run, db, active_tasks, active_experiences + ) + + try: + t0 = time.monotonic() + result = LLMClient().generate_proposals_with_meta(system_prompt, user_prompt) + duration_ms = round((time.monotonic() - t0) * 1000) + except LLMError as exc: + logger.error("Background pipeline LLM call failed for run %d: %s", run_id, exc) + run.status = RunStatus.failed + run.error_summary = str(exc) + db.commit() + return + + batch_id = run.batches[0].id if run.batches else None + now = datetime.now(timezone.utc) + saved_count = 0 + for p in result.batch.proposals: + due_at = None + if p.proposed_due_at: + try: + due_at = datetime.fromisoformat(p.proposed_due_at.replace("Z", "+00:00")) + except ValueError: + pass + experience_id = ( + experience_name_to_id.get(p.proposed_experience_name) + if p.proposed_experience_name + else None + ) + proposal = TaskProposal( + proposal_type=ProposalType(p.proposal_type), + status="pending", + task_id=p.task_id, + proposed_title=p.proposed_title, + proposed_description=p.proposed_description, + proposed_status=p.proposed_status, + proposed_experience_id=experience_id, + proposed_due_at=due_at, + proposed_external_ref=p.proposed_external_ref, + reason_summary=p.reason_summary, + created_at=now, + created_by=ProposalCreatedBy.ai, + ingestion_batch_id=batch_id, + ) + db.add(proposal) + saved_count += 1 + + run.status = RunStatus.completed + run.finished_at = datetime.now(timezone.utc) + db.commit() + + logger.info( + "Background pipeline complete: run=%d proposals=%d duration=%dms", + run_id, + saved_count, + duration_ms, + ) + except Exception as exc: + logger.error( + "Background pipeline unexpected error for run %d: %s", run_id, exc, exc_info=True + ) + try: + run = db.query(IngestionRun).filter(IngestionRun.id == run_id).first() + if run and run.status == RunStatus.running: + run.status = RunStatus.failed + run.error_summary = f"Unexpected error: {exc}" + db.commit() + except Exception: + pass + raise + finally: + db.close() + release_run(run_id) diff --git a/backend/app/jobs/propose_job.py b/backend/app/jobs/propose_job.py new file mode 100644 index 0000000..13e94ab --- /dev/null +++ b/backend/app/jobs/propose_job.py @@ -0,0 +1,138 @@ +"""Background job: LLM proposal generation for an existing IngestionRun. + +`run_propose_bg(run_id)` is dispatched via `asyncio.create_task(asyncio.to_thread(...))` +from the `POST /ingestion/runs/{id}/propose` endpoint. It opens its own DB +session, calls the LLM, persists proposals, and releases the propose tracker +slot when done. +""" + +import logging +import time +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + + +def run_propose_bg(run_id: int) -> None: + """Run LLM proposal generation for an existing IngestionRun in the background. + + Opens its own DB session. Updates run.status to failed on error. + Always calls release_propose(run_id) in a finally block. + """ + from app.db.session import SessionLocal + from app.jobs.run_tracker import release_propose + from app.llm.client import LLMClient, LLMError + from app.llm.prompt_builder import build_proposal_prompt + from app.models.experience import Experience + from app.models.ingestion_run import IngestionRun, RunStatus + from app.models.task import Task, TaskStatus + from app.models.task_proposal import ProposalCreatedBy, ProposalType, TaskProposal + + db = SessionLocal() + try: + from sqlalchemy.orm import joinedload + + run = ( + db.query(IngestionRun) + .options(joinedload(IngestionRun.batches)) + .filter(IngestionRun.id == run_id) + .first() + ) + if not run: + logger.error("Propose job: run %d not found", run_id) + return + + # Mark run as running so the frontend poller sees the transition. + # Preserve the original finished_at from the connector phase so the + # duration display remains accurate after re-proposing. + original_finished_at = run.finished_at + run.status = RunStatus.running + run.finished_at = None + db.commit() + + batch_ids = [b.id for b in run.batches] + + active_tasks = ( + db.query(Task).filter(Task.status.notin_([TaskStatus.done, TaskStatus.cancelled])).all() + ) + active_experiences = db.query(Experience).filter(Experience.active.is_(True)).all() + experience_name_to_id = {exp.folder_path: exp.id for exp in active_experiences} + + system_prompt, user_prompt = build_proposal_prompt( + run, db, active_tasks, active_experiences + ) + + try: + t0 = time.monotonic() + result = LLMClient().generate_proposals_with_meta(system_prompt, user_prompt) + duration_ms = round((time.monotonic() - t0) * 1000) + except LLMError as exc: + logger.error("Propose job LLM call failed for run %d: %s", run_id, exc) + run.status = RunStatus.failed + run.error_summary = str(exc) + db.commit() + return + + # Delete old proposals only after LLM succeeds, so they're preserved on failure. + if batch_ids: + db.query(TaskProposal).filter(TaskProposal.ingestion_batch_id.in_(batch_ids)).delete( + synchronize_session=False + ) + + batch_id = run.batches[0].id if run.batches else None + now = datetime.now(timezone.utc) + saved_count = 0 + for p in result.batch.proposals: + due_at = None + if p.proposed_due_at: + try: + due_at = datetime.fromisoformat(p.proposed_due_at.replace("Z", "+00:00")) + except ValueError: + pass + experience_id = ( + experience_name_to_id.get(p.proposed_experience_name) + if p.proposed_experience_name + else None + ) + proposal = TaskProposal( + proposal_type=ProposalType(p.proposal_type), + status="pending", + task_id=p.task_id, + proposed_title=p.proposed_title, + proposed_description=p.proposed_description, + proposed_status=p.proposed_status, + proposed_experience_id=experience_id, + proposed_due_at=due_at, + proposed_external_ref=p.proposed_external_ref, + reason_summary=p.reason_summary, + created_at=now, + created_by=ProposalCreatedBy.ai, + ingestion_batch_id=batch_id, + ) + db.add(proposal) + saved_count += 1 + + run.status = RunStatus.completed + run.finished_at = original_finished_at or datetime.now(timezone.utc) + db.commit() + + logger.info( + "Propose job complete: run=%d proposals=%d duration=%dms", + run_id, + saved_count, + duration_ms, + ) + except Exception as exc: + logger.error("Propose job unexpected error for run %d: %s", run_id, exc, exc_info=True) + try: + run = db.query(IngestionRun).filter(IngestionRun.id == run_id).first() + if run and run.status == RunStatus.running: + run.status = RunStatus.failed + run.error_summary = f"Unexpected error: {exc}" + db.commit() + except Exception: + pass + raise + finally: + db.close() + release_propose(run_id) diff --git a/backend/app/jobs/run_tracker.py b/backend/app/jobs/run_tracker.py new file mode 100644 index 0000000..4e09e8f --- /dev/null +++ b/backend/app/jobs/run_tracker.py @@ -0,0 +1,60 @@ +"""In-memory concurrency guard for background ingestion jobs. + +Uses two module-level sets to track which run IDs have in-flight +operations. A threading.Lock protects the check-then-add pattern +against TOCTOU races when multiple requests arrive concurrently. + +Sets: + _active_runs — full pipeline / connector-only runs in flight + _active_proposes — per-run LLM propose calls in flight +""" + +import threading + +_lock = threading.Lock() +_active_runs: set[int] = set() +_active_proposes: set[int] = set() + + +def claim_run(run_id: int) -> bool: + """Attempt to claim a run ID as active. + + Returns True if successfully claimed (not already active). + Returns False if the run ID is already in the active set. + """ + with _lock: + if run_id in _active_runs: + return False + _active_runs.add(run_id) + return True + + +def release_run(run_id: int) -> None: + """Release a previously claimed run ID.""" + with _lock: + _active_runs.discard(run_id) + + +def claim_propose(run_id: int) -> bool: + """Attempt to claim a run ID for an LLM propose call. + + Returns True if successfully claimed (not already proposing). + Returns False if already in the active proposes set. + """ + with _lock: + if run_id in _active_proposes: + return False + _active_proposes.add(run_id) + return True + + +def release_propose(run_id: int) -> None: + """Release a previously claimed propose run ID.""" + with _lock: + _active_proposes.discard(run_id) + + +def has_active_run() -> bool: + """Check if any run is currently active.""" + with _lock: + return len(_active_runs) > 0 diff --git a/backend/app/main.py b/backend/app/main.py index 08b585a..6014f78 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -48,8 +48,31 @@ async def _scheduler_loop() -> None: logger.error("Scheduler loop error: %s", exc, exc_info=True) +async def _reset_stale_runs() -> None: + """Reset any IngestionRuns stuck in 'running' to 'failed' on startup. + + Prevents the concurrency guard from permanently blocking after a server + crash or restart while a run was in progress. + """ + from app.db.session import SessionLocal + from app.models.ingestion_run import IngestionRun, RunStatus + + db = SessionLocal() + try: + stale = db.query(IngestionRun).filter(IngestionRun.status == RunStatus.running).all() + for run in stale: + run.status = RunStatus.failed + run.error_summary = "server restarted while run was in progress" + if stale: + db.commit() + logger.warning("Reset %d stale running runs to failed", len(stale)) + finally: + db.close() + + @asynccontextmanager async def lifespan(app: FastAPI): + await _reset_stale_runs() task = asyncio.create_task(_scheduler_loop()) logger.info("Background pipeline scheduler started (interval=%dh)", _SCHEDULE_INTERVAL_HOURS) try: diff --git a/backend/app/services/ingestion_service.py b/backend/app/services/ingestion_service.py index eb500b5..ab446a6 100644 --- a/backend/app/services/ingestion_service.py +++ b/backend/app/services/ingestion_service.py @@ -31,12 +31,17 @@ class IngestionService: def __init__(self, db: Session) -> None: self.db = db - def trigger_run( + def create_run( self, range_start: datetime, range_end: datetime, triggered_by: TriggeredBy = TriggeredBy.manual, ) -> IngestionRun: + """Create an IngestionRun record with status=running and flush (no connectors). + + Returns the run object so the caller can use run.id to dispatch a + background job. + """ run = IngestionRun( status=RunStatus.running, range_start=range_start, @@ -46,11 +51,24 @@ def trigger_run( ) self.db.add(run) self.db.flush() + self.db.commit() + return run - self._run_connectors(run) + def trigger_run( + self, + range_start: datetime, + range_end: datetime, + triggered_by: TriggeredBy = TriggeredBy.manual, + ) -> IngestionRun: + """Create a run record and immediately run connectors (synchronous path). + + Used by the scheduler's synchronous background thread. + """ + run = self.create_run(range_start, range_end, triggered_by=triggered_by) + self._run_connectors(run, set_final_status=True) return run - def _run_connectors(self, run: IngestionRun) -> None: + def _run_connectors(self, run: IngestionRun, *, set_final_status: bool = True) -> None: connector_factories = [ ( "github", @@ -120,27 +138,33 @@ def _run_connectors(self, run: IngestionRun) -> None: errors.append(error_msg) continue - run.finished_at = datetime.now(timezone.utc) + self.db.flush() - if errors and not run.batches: - self.db.flush() - # Re-check after flush - batch_count = ( - self.db.query(IngestionBatch) - .filter(IngestionBatch.ingestion_run_id == run.id) - .count() - ) - if batch_count == 0: - run.status = RunStatus.failed - run.error_summary = "; ".join(errors) + if set_final_status: + run.finished_at = datetime.now(timezone.utc) + + if errors: + batch_count = ( + self.db.query(IngestionBatch) + .filter(IngestionBatch.ingestion_run_id == run.id) + .count() + ) + if batch_count == 0: + run.status = RunStatus.failed + run.error_summary = "; ".join(errors) + else: + run.status = RunStatus.completed else: run.status = RunStatus.completed - else: - run.status = RunStatus.completed self.db.commit() - def rerun(self, run_id: int) -> IngestionRun | None: + def reset_run_for_rerun(self, run_id: int) -> IngestionRun | None: + """Delete existing batches and reset the run record to status=running. + + Returns the reset run object (no connectors run yet). Returns None if + the run does not exist. + """ run = self.db.get(IngestionRun, run_id) if not run: return None @@ -162,8 +186,14 @@ def rerun(self, run_id: int) -> IngestionRun | None: run.started_at = datetime.now(timezone.utc) run.finished_at = None run.error_summary = None - self.db.flush() + self.db.commit() + return run + def rerun(self, run_id: int) -> IngestionRun | None: + """Reset a run and immediately run connectors (synchronous path).""" + run = self.reset_run_for_rerun(run_id) + if not run: + return None self._run_connectors(run) return run diff --git a/docker-compose.yml b/docker-compose.yml index 4864424..5ab1c9d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,6 +7,7 @@ services: - "7400:7400" volumes: - ./backend:/app + - /app/.venv - ./data:/data - ./vault:/vault environment: @@ -16,9 +17,10 @@ services: - backend/.env healthcheck: test: ["CMD", "curl", "-f", "http://localhost:7400/health"] - interval: 5s - timeout: 3s - retries: 5 + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s frontend: build: diff --git a/frontend/src/api/useIngestion.ts b/frontend/src/api/useIngestion.ts index 8192ad8..f95e738 100644 --- a/frontend/src/api/useIngestion.ts +++ b/frontend/src/api/useIngestion.ts @@ -1,5 +1,6 @@ // src/api/useIngestion.ts import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { useEffect, useRef, useState } from "react"; export type SourceId = "github" | "gmail" | "slack" | "canvas"; @@ -75,6 +76,12 @@ export interface IngestionRunDetail extends IngestionRunSummary { batches: IngestionBatchDetail[]; } +async function fetchRun(runId: number): Promise { + const res = await fetch(`/api/ingestion/runs/${runId}`); + if (!res.ok) throw new Error("Failed to fetch run"); + return res.json(); +} + export function useIngestionRuns() { return useQuery({ queryKey: ["ingestion-runs"], @@ -98,8 +105,41 @@ export function useIngestionRun(id: number | null) { }); } +// ── Run status poller ────────────────────────────────────────────────────────── + +/** + * Polls GET /ingestion/runs/{runId} every 2.5 s while the run is in "running" + * status. Stops polling automatically once the status transitions. + */ +function useRunStatusPoller(runId: number | null) { + return useQuery({ + queryKey: ["ingestion-run", runId], + queryFn: () => fetchRun(runId!), + enabled: runId !== null, + refetchInterval: (query) => (query.state.data?.status === "running" ? 2500 : false), + }); +} + export function useCreateIngestionRun() { const qc = useQueryClient(); + const [activeRunId, setActiveRunId] = useState(null); + const invalidatedForRef = useRef(null); + + const polled = useRunStatusPoller(activeRunId); + + // When the polled run transitions out of "running", invalidate caches (once per run). + useEffect(() => { + if ( + activeRunId !== null && + polled.data && + polled.data.status !== "running" && + invalidatedForRef.current !== activeRunId + ) { + invalidatedForRef.current = activeRunId; + qc.invalidateQueries({ queryKey: ["ingestion-runs"] }); + } + }, [activeRunId, polled.data, qc]); + return useMutation({ mutationFn: async (params: { start_date: string; end_date?: string }) => { const res = await fetch("/api/ingestion/runs", { @@ -111,7 +151,7 @@ export function useCreateIngestionRun() { const body = await res.text().catch(() => res.statusText); throw new Error(`Create run failed (${res.status}): ${body}`); } - return res.json() as Promise; + return res.json() as Promise; }, onMutate: async (params) => { await qc.cancelQueries({ queryKey: ["ingestion-runs"] }); @@ -140,7 +180,8 @@ export function useCreateIngestionRun() { qc.setQueryData(["ingestion-runs"], context.previous); } }, - onSettled: () => { + onSuccess: (data) => { + setActiveRunId(data.id); qc.invalidateQueries({ queryKey: ["ingestion-runs"] }); }, }); @@ -148,6 +189,24 @@ export function useCreateIngestionRun() { export function useRerunIngestionRun() { const qc = useQueryClient(); + const [activeRunId, setActiveRunId] = useState(null); + const invalidatedForRef = useRef(null); + + const polled = useRunStatusPoller(activeRunId); + + // When the polled run transitions out of "running", invalidate caches (once per run). + useEffect(() => { + if ( + activeRunId !== null && + polled.data && + polled.data.status !== "running" && + invalidatedForRef.current !== activeRunId + ) { + invalidatedForRef.current = activeRunId; + qc.invalidateQueries({ queryKey: ["ingestion-runs"] }); + } + }, [activeRunId, polled.data, qc]); + return useMutation({ mutationFn: async (runId: number) => { const res = await fetch(`/api/ingestion/runs/${runId}/rerun`, { @@ -157,9 +216,10 @@ export function useRerunIngestionRun() { const body = await res.text().catch(() => res.statusText); throw new Error(`Rerun failed (${res.status}): ${body}`); } - return res.json() as Promise; + return res.json() as Promise; }, - onSuccess: (_data, runId) => { + onSuccess: (data, runId) => { + setActiveRunId(data.id); qc.invalidateQueries({ queryKey: ["ingestion-runs"] }); qc.invalidateQueries({ queryKey: ["ingestion-run", runId] }); }, @@ -203,14 +263,10 @@ export interface RunProposal { export interface ProposeResult { run_id: number; - model: string; - proposals_saved: number; - input_tokens: number; - output_tokens: number; - duration_ms: number; + status: "running"; } -export function useRunProposals(runId: number | null) { +export function useRunProposals(runId: number | null, refetchWhileProposing = false) { return useQuery({ queryKey: ["ingestion-run-proposals", runId], queryFn: async () => { @@ -219,6 +275,7 @@ export function useRunProposals(runId: number | null) { return res.json(); }, enabled: runId !== null, + refetchInterval: refetchWhileProposing ? 2500 : false, }); } @@ -239,7 +296,49 @@ export function useRunPrompt(runId: number | null) { export function useProposeTasksForRun() { const qc = useQueryClient(); - return useMutation({ + const [proposingRunId, setProposingRunId] = useState(null); + const invalidatedForRef = useRef(null); + + // Also poll the run itself so we can detect completion / failure. + const polledRun = useQuery({ + queryKey: ["ingestion-run", proposingRunId], + queryFn: () => fetchRun(proposingRunId!), + enabled: proposingRunId !== null, + refetchInterval: (query) => (query.state.data?.status === "running" ? 2500 : false), + }); + + // Derive proposing state from run data. + const isProposing = + proposingRunId !== null && (!polledRun.data || polledRun.data.status === "running"); + + // Poll the proposals list while a propose job is in flight. + useQuery({ + queryKey: ["ingestion-run-proposals", proposingRunId], + queryFn: async () => { + const res = await fetch(`/api/ingestion/runs/${proposingRunId}/proposals`); + if (!res.ok) throw new Error("Failed to fetch proposals"); + return res.json(); + }, + enabled: proposingRunId !== null && isProposing, + refetchInterval: isProposing ? 2500 : false, + }); + + // When the run is no longer running, invalidate caches (once per run). + useEffect(() => { + if ( + proposingRunId !== null && + polledRun.data && + polledRun.data.status !== "running" && + invalidatedForRef.current !== proposingRunId + ) { + invalidatedForRef.current = proposingRunId; + qc.invalidateQueries({ queryKey: ["ingestion-run-proposals", proposingRunId] }); + qc.invalidateQueries({ queryKey: ["proposals"] }); + qc.invalidateQueries({ queryKey: ["ingestion-runs"] }); + } + }, [proposingRunId, polledRun.data, qc]); + + const mutation = useMutation({ mutationFn: async (runId: number): Promise => { const res = await fetch(`/api/ingestion/runs/${runId}/propose`, { method: "POST" }); if (!res.ok) { @@ -249,10 +348,17 @@ export function useProposeTasksForRun() { return res.json(); }, onSuccess: (_data, runId) => { - qc.invalidateQueries({ queryKey: ["ingestion-run-proposals", runId] }); - qc.invalidateQueries({ queryKey: ["proposals"] }); + invalidatedForRef.current = null; + setProposingRunId(runId); + // Immediately invalidate so the run list shows "running" status. + qc.invalidateQueries({ queryKey: ["ingestion-runs"] }); + }, + onError: () => { + setProposingRunId(null); }, }); + + return { mutation, isProposing }; } // ── Prompt & LLM preview types ─────────────────────────────────────────────── @@ -306,12 +412,7 @@ export function useLLMPreview() { export interface SyncResult { run_id: number; - status: string; - proposals_saved: number; - duration_ms?: number; - input_tokens?: number; - output_tokens?: number; - error?: string; + status: "running"; } export function useSyncPipeline() { @@ -331,3 +432,81 @@ export function useSyncPipeline() { }, }); } + +/** + * Full-pipeline sync with async polling support. + * + * Fires POST /sync → receives {run_id, status: "running"} → polls + * GET /ingestion/runs/{run_id} every 2.5 s until the run is no longer + * "running", then invalidates ingestion-runs and proposals caches. + * + * Returns { trigger, isRunning, run, error }. + */ +export function useSyncPipelineWithPolling() { + const qc = useQueryClient(); + const [runId, setRunId] = useState(null); + const [error, setError] = useState(null); + const invalidatedForRef = useRef(null); + + // On mount, check if there's already a running run (survives navigation/refresh). + const runsQuery = useQuery({ + queryKey: ["ingestion-runs"], + queryFn: async () => { + const res = await fetch("/api/ingestion/runs"); + if (!res.ok) return []; + return res.json(); + }, + }); + + useEffect(() => { + if (runId === null && runsQuery.data) { + const running = runsQuery.data.find((r) => r.status === "running"); + if (running) { + setRunId(running.id); + } + } + }, [runId, runsQuery.data]); + + const run = useQuery({ + queryKey: ["ingestion-run", runId], + queryFn: () => fetchRun(runId!), + enabled: runId !== null, + refetchInterval: (query) => (query.state.data?.status === "running" ? 2500 : false), + }); + + // Derive running state from query data. + const isRunning = runId !== null && (!run.data || run.data.status === "running"); + + // When status transitions out of "running", refresh caches (once per run). + useEffect(() => { + if ( + runId !== null && + run.data && + run.data.status !== "running" && + invalidatedForRef.current !== runId + ) { + invalidatedForRef.current = runId; + qc.invalidateQueries({ queryKey: ["ingestion-runs"] }); + qc.invalidateQueries({ queryKey: ["proposals"] }); + } + }, [runId, run.data, qc]); + + const trigger = async () => { + setError(null); + setRunId(null); + invalidatedForRef.current = null; + try { + const res = await fetch("/api/ingestion/sync", { method: "POST" }); + if (!res.ok) { + const body = await res.text().catch(() => res.statusText); + throw new Error(`Sync failed (${res.status}): ${body}`); + } + const data: SyncResult = await res.json(); + setRunId(data.run_id); + } catch (err) { + setError(err instanceof Error ? err : new Error(String(err))); + } + }; + + return { trigger, isRunning, run: run.data ?? null, error }; +} diff --git a/frontend/src/components/layout/Topbar.tsx b/frontend/src/components/layout/Topbar.tsx index bfe4243..b166c57 100644 --- a/frontend/src/components/layout/Topbar.tsx +++ b/frontend/src/components/layout/Topbar.tsx @@ -1,9 +1,9 @@ // src/components/layout/Topbar.tsx -import { useState } from "react"; +import { useEffect, useState } from "react"; import { useNavigate } from "react-router-dom"; import type { Task } from "../../types"; import { usePendingProposalCount } from "../../api/useProposals"; -import { useSyncPipeline } from "../../api/useIngestion"; +import { useSyncPipelineWithPolling } from "../../api/useIngestion"; import styles from "./Topbar.module.css"; interface TopbarProps { @@ -38,21 +38,42 @@ function LogoCup() { export default function Topbar({ tasks, onNewTask }: TopbarProps) { const navigate = useNavigate(); const pendingProposals = usePendingProposalCount(); - const sync = useSyncPipeline(); + const { trigger, isRunning, run, error } = useSyncPipelineWithPolling(); const [syncMessage, setSyncMessage] = useState(null); - function handleSync() { + // Detect run status transitions during render (React recommended pattern + // for adjusting state when props/derived data change). + const [prevRunStatus, setPrevRunStatus] = useState(null); + const currentRunStatus = run?.status ?? null; + if (currentRunStatus !== prevRunStatus) { + setPrevRunStatus(currentRunStatus); + if (currentRunStatus === "failed") { + setSyncMessage("sync failed"); + } else if (currentRunStatus === "completed") { + const count = run!.proposal_count; + setSyncMessage(`+${count} proposal${count !== 1 ? "s" : ""}`); + } + } + + // Detect trigger errors during render. + const [prevError, setPrevError] = useState(null); + if (error !== prevError) { + setPrevError(error); + if (error) { + setSyncMessage("sync failed"); + } + } + + // Auto-dismiss sync message after 4 seconds. + useEffect(() => { + if (!syncMessage) return; + const t = setTimeout(() => setSyncMessage(null), 10000); + return () => clearTimeout(t); + }, [syncMessage]); + + async function handleSync() { setSyncMessage(null); - sync.mutate(undefined, { - onSuccess: (data) => { - setSyncMessage(`+${data.proposals_saved} proposal${data.proposals_saved !== 1 ? "s" : ""}`); - setTimeout(() => setSyncMessage(null), 4000); - }, - onError: () => { - setSyncMessage("sync failed"); - setTimeout(() => setSyncMessage(null), 4000); - }, - }); + await trigger(); } const openCount = tasks.filter((t) => @@ -87,15 +108,15 @@ export default function Topbar({ tasks, onNewTask }: TopbarProps) { @@ -596,9 +570,9 @@ function RunRow({ run }: { run: IngestionRunSummary }) {