diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 18e5cd9..6df6ce3 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -7,7 +7,7 @@ on: description: 'Pull request number' required: true push: - branches: [ main ] + branches: [ main, develop ] pull_request: types: [opened, synchronize, reopened, ready_for_review] @@ -37,10 +37,11 @@ jobs: run: mkdir -p trivy-reports - name: Run Trivy FS Scan - uses: aquasecurity/trivy-action@0.24.0 + uses: aquasecurity/trivy-action@v0.36.0 with: scan-type: 'fs' scan-ref: '.' + version: 'v0.70.0' scanners: 'vuln,misconfig,secret,license' ignore-unfixed: true format: 'table' @@ -95,4 +96,4 @@ jobs: with: name: bandit-report path: bandit-report.html - retention-days: 30 \ No newline at end of file + retention-days: 30 diff --git a/docker-compose.yml b/docker-compose.yml index 3f5e733..55356af 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,25 +1,4 @@ -version: '3.8' - services: - # PostgreSQL (local dev only - production uses Supabase) - postgres: - image: pgvector/pgvector:pg16 - container_name: rag-postgres - environment: - POSTGRES_DB: enterprise_rag - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - volumes: - - postgres_data:/var/lib/postgresql/data - - ./scripts/init-db.sql:/docker-entrypoint-initdb.d/init.sql - ports: - - "5432:5432" - healthcheck: - test: ["CMD-SHELL", "pg_isready -U postgres"] - interval: 5s - timeout: 5s - retries: 5 - # Redis (queue backend + cache) redis: image: redis:7-alpine @@ -60,8 +39,6 @@ services: ports: - "8000:8000" depends_on: - postgres: - condition: service_healthy redis: condition: service_healthy command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload @@ -89,7 +66,6 @@ services: - ./server/app:/app/shared # Share code depends_on: - redis - - postgres deploy: replicas: 5 command: python worker.py ingest_extract @@ -117,7 +93,6 @@ services: - ./server/app:/app/shared depends_on: - redis - - postgres deploy: replicas: 3 command: python worker.py ingest_index @@ -152,6 +127,5 @@ services: command: npm run dev -- --host 0.0.0.0 volumes: - postgres_data: redis_data: client_node_modules: diff --git a/docs/task/ingestion-follow-up-hardening-and-validation.md b/docs/task/ingestion-follow-up-hardening-and-validation.md new file mode 100644 index 0000000..bcfa46f --- /dev/null +++ b/docs/task/ingestion-follow-up-hardening-and-validation.md @@ -0,0 +1,294 @@ +# Task: Ingestion Follow-Up Hardening and Validation + +## Status + +Derived from the current ingestion workflow audit and the existing evaluation report. + +## Goal + +Capture the important ingestion gaps that are still not implemented or still weak, then define: + +1. how each gap should be implemented +2. what should be tested after implementation + +This file is meant to turn the audit into concrete follow-up work. + +## Why This Task Exists + +The repository already has a working ingestion pipeline for small PDFs, including: + +- batch upload prepare and complete +- ingestion runs +- extraction and indexing workers +- structured failure messages +- retry and reindex flows +- queue visibility +- token-budget reservation and cleanup + +But the system is still missing several important pieces before the ingestion path can be considered robust, measurable, and ready for higher-volume usage. + +## Scope + +This task covers the remaining weak points found in the audit: + +1. no repeatable load and throughput validation +2. weak ingestion observability +3. incomplete stale-state and run lifecycle recovery +4. schema and runtime status mismatch +5. query path still single-document +6. thin automated ingestion test coverage + +## Recommended Order + +1. Fix schema and status-model mismatch +2. Add stale document and run reconciliation +3. Add ingestion observability and operator health visibility +4. Add automated ingestion integration tests +5. Add repeatable load-test tooling +6. Extend retrieval and querying toward multi-document support + +The first three items are backend safety and operability work. They should happen before calling the ingestion system stable at larger scale. + +## Follow-Up Items + +### 1. Repeatable Load and Throughput Validation Is Missing + +#### Current Weakness + +The code supports batch ingestion, but there is no repeatable load-test harness for: + +- 10 PDFs +- 25 PDFs +- 50 PDFs +- 100 PDFs + +There is also no durable way to compare one ingestion revision against another. + +#### How To Implement + +- Add a dedicated load-test task doc and script set under `scripts/` for ingestion benchmarking. +- Create a repeatable test dataset of small text PDFs with known page counts and known expected outcomes. +- Add a driver that can: + - create or reuse a workspace + - prepare and complete uploads in bulk + - poll ingestion runs until terminal + - capture final document states + - measure total wall-clock run time + - measure per-document completion time +- Persist results to a simple JSON or markdown artifact so runs can be compared over time. +- Include scenarios with: + - all-valid batches + - mixed valid and invalid documents + - queue backlog conditions + +#### What To Test After Implementation + +- 10 valid small PDFs complete successfully. +- 25 valid small PDFs complete successfully. +- 50 valid small PDFs complete successfully. +- Mixed valid and invalid batches still complete with isolated failures. +- The load-test artifact records: + - total duration + - per-document duration + - success count + - failure count + - queue depth snapshots if available +- Re-running the same scenario produces comparable output structure. + +### 2. Ingestion Observability Is Still Weak + +#### Current Weakness + +The system exposes queue counts and document/run status, but it still lacks: + +- stage timing +- queue-depth history +- worker utilization visibility +- operator-facing ingestion health summary +- clear separation between expected document rejection and infrastructure failure + +#### How To Implement + +- Add stage timestamps or timing fields for key transitions: + - upload completed + - extract job enqueued + - extraction started + - extraction finished + - index job enqueued + - indexing started + - indexing finished +- Add an operator endpoint such as `GET /documents/ingestion-health` that returns: + - current queue counts + - failed registry counts + - oldest active document age + - stale active document count + - active ingestion run count + - stale reservation count if practical +- Add structured worker logs for: + - queue wait start/end + - extraction duration + - indexing duration + - embedding batch counts + - embedding API latency if practical +- Track expected validation failures separately from infrastructure failures in logs or API summaries. + +#### What To Test After Implementation + +- A successful document exposes complete stage timing. +- A failed validation document is visible as a document failure but not mislabeled as infrastructure failure. +- The health endpoint reports queue and stale-state summaries correctly during active ingestion. +- Worker logs contain enough information to distinguish: + - queue delay + - extraction time + - indexing time + - embedding time +- The UI or operator flow can identify whether a slowdown is caused by queue wait or processing. + +### 3. Stale-State and Run Lifecycle Recovery Is Incomplete + +#### Current Weakness + +There is failure callback coverage for killed jobs, but there is still no general reconciliation process for: + +- documents stuck in `uploaded` +- documents stuck in `extracting` +- documents stuck in `indexing` +- ingestion runs left in `preparing` or `processing` after work is effectively done + +#### How To Implement + +- Add a scheduled reconciliation job for ingestion state. +- Define configurable stale thresholds for: + - `uploaded` + - `extracting` + - `indexing` +- Reconciliation should: + - mark stale active documents as `failed` + - attach a structured transient-infrastructure error message + - refresh or recompute affected ingestion run statuses +- Consider deriving ingestion run status dynamically from current document states instead of trusting persisted run status alone. +- Ensure reconciliation is idempotent and safe to run frequently. + +#### What To Test After Implementation + +- A document intentionally left in `uploaded` beyond threshold is marked `failed`. +- A document intentionally left in `extracting` beyond threshold is marked `failed`. +- A document intentionally left in `indexing` beyond threshold is marked `failed`. +- A run with only terminal document states is reconciled to `completed`, `partial`, or `failed` correctly. +- Re-running reconciliation does not corrupt already-terminal rows. +- Retry still works correctly after reconciliation marks a document failed. + +### 4. Schema and Runtime Status Model Do Not Fully Match + +#### Current Weakness + +The code uses a richer runtime model including statuses like: + +- `uploading` +- `extracting` +- `indexed` + +But the SQL schema files still define a narrower `documents.status` check constraint. The code currently works around this mismatch defensively, which is fragile. + +#### How To Implement + +- Update `scripts/schema.local.sql` and `scripts/schema.supabase.sql` so the document status constraint matches the actual runtime statuses used by the application and workers. +- Review migrations or schema upgrade steps so existing environments can be brought to the same status model safely. +- Remove compatibility branches only after the schema is consistently aligned across environments. +- Ensure frontend status types, backend schemas, workers, and DB constraints all use the same status vocabulary. + +#### What To Test After Implementation + +- Fresh schema setup accepts every runtime status the code uses. +- Existing environments can migrate without data loss. +- Worker transitions no longer rely on fallback behavior for missing statuses. +- Upload, extract, index, retry, and reindex transitions all succeed against the aligned schema. +- Status values shown in the UI match the persisted DB states exactly. + +### 5. Retrieval and Querying Are Still Single-Document + +#### Current Weakness + +The ingestion side now supports many documents per workspace, but the query side still accepts one `document_id` and retrieves chunks from only one document at a time. + +This is not an ingestion breakage, but it is a real readiness gap for the intended multi-document system. + +#### How To Implement + +- Add a new query contract that accepts multiple selected document IDs. +- Enforce explicit query-side limits for: + - max selected documents + - max total pages across selected documents +- Update retrieval SQL to search across a selected document set instead of one document. +- Preserve citation quality by returning document metadata together with page and chunk references. +- Update query logs and observability so they record all searched documents, not one. + +#### What To Test After Implementation + +- A query across two ready documents returns citations from both when relevant. +- Query limits reject oversized document selections safely. +- Workspace isolation still holds for multi-document queries. +- Query logs record the selected document set accurately. +- Existing single-document flows continue to work. + +### 6. Automated Ingestion Test Coverage Is Too Thin + +#### Current Weakness + +The repository has helper-level tests for ingestion policy, ingestion run status derivation, and token-budget primitives, but it does not have strong automated coverage for: + +- batch upload orchestration +- partial success batches +- retry flows +- reindex flows +- stuck-job recovery +- low-budget ingestion failure +- queue failure callback behavior + +#### How To Implement + +- Add integration-style tests around the documents API and worker jobs. +- Use fixtures or mocks for: + - Supabase storage interactions + - OpenAI embeddings + - Redis/RQ enqueue behavior where full worker execution is not needed +- Add job-level tests for: + - extraction success + - page-limit failure + - unsupported-content failure + - indexing success + - budget failure + - cleanup after failure +- Add API tests for: + - batch prepare + - batch complete + - retry allowed vs retry denied + - reindex with and without existing pages + +#### What To Test After Implementation + +- Batch prepare returns mixed accepted and rejected results correctly. +- Batch complete enqueues extraction correctly and fails safely on missing objects. +- Retry only works for retryable failures. +- Reindex chooses extract or index path correctly based on existing page rows. +- Index budget exhaustion leaves no stranded reservations. +- Failure callback marks timed-out or killed jobs as failed. +- Cleanup removes stale chunks, embeddings, and pages when expected. + +## Acceptance Criteria + +This task should be considered complete only when: + +- the remaining ingestion weak points are each turned into explicit implementation work +- every item has a clear post-implementation validation plan +- the schema matches the runtime status model +- stale-state recovery exists for documents and runs +- ingestion observability is materially better +- automated ingestion coverage is broader than helper-only unit tests +- repeatable load validation exists for larger batches + +## Summary + +The ingestion pipeline is no longer missing its basic architecture. The important remaining work is hardening, observability, lifecycle recovery, automated validation, and true multi-document readiness. + +This task captures that follow-up work in a way that is concrete enough to implement and test. diff --git a/server/app/core/token_budget.py b/server/app/core/token_budget.py index e75d4b9..8822b6c 100644 --- a/server/app/core/token_budget.py +++ b/server/app/core/token_budget.py @@ -2,17 +2,20 @@ from contextlib import nullcontext from datetime import UTC, date, datetime, time, timedelta +from typing import Callable, TypeVar import uuid from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.sqlite import insert as sqlite_insert -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, sessionmaker from app.config import settings from app.core.errors import BudgetExceededError, InvalidReservationError from app.db.models import WorkspaceDailyUsage +T = TypeVar("T") + def _as_utc_date(usage_date_utc: date | datetime) -> date: if isinstance(usage_date_utc, datetime): @@ -71,6 +74,20 @@ def _transaction_context(db: Session): return db.begin() +def _run_in_isolated_session(db: Session, operation: Callable[[Session], T]) -> T: + isolated_factory = sessionmaker(bind=db.get_bind(), autocommit=False, autoflush=False) + isolated_db = isolated_factory() + try: + result = operation(isolated_db) + isolated_db.commit() + return result + except Exception: + isolated_db.rollback() + raise + finally: + isolated_db.close() + + def get_or_create_daily_row(db: Session, workspace_id: uuid.UUID, usage_date_utc: date | datetime) -> WorkspaceDailyUsage: usage_date = _as_utc_date(usage_date_utc) _insert_usage_row_if_missing(db, workspace_id, usage_date) @@ -85,15 +102,12 @@ def get_or_create_daily_row(db: Session, workspace_id: uuid.UUID, usage_date_utc return db.execute(stmt).scalar_one() -def reserve_tokens( +def _reserve_tokens_in_session( db: Session, workspace_id: uuid.UUID, amount: int, usage_date_utc: date | datetime, - reservation_ttl_seconds: int = 600, ) -> dict[str, int]: - reservation_amount = _ensure_non_negative_amount(amount) - _ensure_non_negative_amount(reservation_ttl_seconds, "reservation_ttl_seconds") token_limit = int(settings.DAILY_TOKEN_LIMIT) with _transaction_context(db): @@ -101,7 +115,7 @@ def reserve_tokens( used = int(row.tokens_used) reserved = int(row.tokens_reserved) remaining = token_limit - (used + reserved) - if reservation_amount > remaining: + if amount > remaining: raise BudgetExceededError( "Daily token limit reached for this workspace", used=used, @@ -109,7 +123,7 @@ def reserve_tokens( limit=token_limit, ) - row.tokens_reserved = reserved + reservation_amount + row.tokens_reserved = reserved + amount db.flush() reserved_now = int(row.tokens_reserved) remaining_now = max(0, token_limit - (used + reserved_now)) @@ -121,30 +135,38 @@ def reserve_tokens( } -def release_tokens(db: Session, workspace_id: uuid.UUID, amount: int, usage_date_utc: date | datetime) -> dict[str, int]: - release_amount = _ensure_non_negative_amount(amount) +def _release_tokens_in_session( + db: Session, + workspace_id: uuid.UUID, + amount: int, + usage_date_utc: date | datetime, +) -> dict[str, int]: with _transaction_context(db): row = get_or_create_daily_row(db, workspace_id, usage_date_utc) reserved = int(row.tokens_reserved) - if release_amount > reserved: + if amount > reserved: raise InvalidReservationError("Cannot release more tokens than currently reserved") - row.tokens_reserved = reserved - release_amount + row.tokens_reserved = reserved - amount db.flush() reserved_now = int(row.tokens_reserved) return {"reserved_now": reserved_now} -def commit_usage(db: Session, workspace_id: uuid.UUID, amount: int, usage_date_utc: date | datetime) -> dict[str, int]: - usage_amount = _ensure_non_negative_amount(amount) +def _commit_usage_in_session( + db: Session, + workspace_id: uuid.UUID, + amount: int, + usage_date_utc: date | datetime, +) -> dict[str, int]: with _transaction_context(db): row = get_or_create_daily_row(db, workspace_id, usage_date_utc) reserved = int(row.tokens_reserved) used = int(row.tokens_used) - if usage_amount > reserved: + if amount > reserved: raise InvalidReservationError("Cannot commit more tokens than currently reserved") - row.tokens_reserved = reserved - usage_amount - row.tokens_used = used + usage_amount + row.tokens_reserved = reserved - amount + row.tokens_used = used + amount db.flush() used_now = int(row.tokens_used) reserved_now = int(row.tokens_reserved) @@ -152,6 +174,73 @@ def commit_usage(db: Session, workspace_id: uuid.UUID, amount: int, usage_date_u return {"used_now": used_now, "reserved_now": reserved_now} +def reserve_tokens( + db: Session, + workspace_id: uuid.UUID, + amount: int, + usage_date_utc: date | datetime, + reservation_ttl_seconds: int = 600, +) -> dict[str, int]: + reservation_amount = _ensure_non_negative_amount(amount) + _ensure_non_negative_amount(reservation_ttl_seconds, "reservation_ttl_seconds") + if db.in_transaction(): + return _run_in_isolated_session( + db, + lambda isolated_db: _reserve_tokens_in_session( + isolated_db, + workspace_id=workspace_id, + amount=reservation_amount, + usage_date_utc=usage_date_utc, + ), + ) + return _reserve_tokens_in_session( + db, + workspace_id=workspace_id, + amount=reservation_amount, + usage_date_utc=usage_date_utc, + ) + + +def release_tokens(db: Session, workspace_id: uuid.UUID, amount: int, usage_date_utc: date | datetime) -> dict[str, int]: + release_amount = _ensure_non_negative_amount(amount) + if db.in_transaction(): + return _run_in_isolated_session( + db, + lambda isolated_db: _release_tokens_in_session( + isolated_db, + workspace_id=workspace_id, + amount=release_amount, + usage_date_utc=usage_date_utc, + ), + ) + return _release_tokens_in_session( + db, + workspace_id=workspace_id, + amount=release_amount, + usage_date_utc=usage_date_utc, + ) + + +def commit_usage(db: Session, workspace_id: uuid.UUID, amount: int, usage_date_utc: date | datetime) -> dict[str, int]: + usage_amount = _ensure_non_negative_amount(amount) + if db.in_transaction(): + return _run_in_isolated_session( + db, + lambda isolated_db: _commit_usage_in_session( + isolated_db, + workspace_id=workspace_id, + amount=usage_amount, + usage_date_utc=usage_date_utc, + ), + ) + return _commit_usage_in_session( + db, + workspace_id=workspace_id, + amount=usage_amount, + usage_date_utc=usage_date_utc, + ) + + def get_budget_status( db: Session, workspace_id: uuid.UUID, diff --git a/server/scripts/bootstrap_schema.py b/server/scripts/bootstrap_schema.py new file mode 100644 index 0000000..56d2409 --- /dev/null +++ b/server/scripts/bootstrap_schema.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +from sqlalchemy import text + +from app.db import models # noqa: F401 +from app.db.session import Base, engine + + +DDL_STATEMENTS = [ + """ + CREATE EXTENSION IF NOT EXISTS vector + """, + """ + CREATE EXTENSION IF NOT EXISTS pgcrypto + """, + """ + CREATE TABLE IF NOT EXISTS document_pages ( + id BIGSERIAL PRIMARY KEY, + workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + page_number INTEGER NOT NULL CHECK (page_number > 0), + content TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (document_id, page_number) + ) + """, + """ + CREATE INDEX IF NOT EXISTS idx_document_pages_workspace + ON document_pages(workspace_id) + """, + """ + CREATE INDEX IF NOT EXISTS idx_document_pages_document + ON document_pages(document_id) + """, + """ + CREATE TABLE IF NOT EXISTS chunks ( + id UUID PRIMARY KEY, + workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + page_start INTEGER NOT NULL CHECK (page_start > 0), + page_end INTEGER NOT NULL CHECK (page_end >= page_start), + chunk_index INTEGER NOT NULL CHECK (chunk_index >= 0), + content TEXT NOT NULL, + content_hash TEXT NOT NULL, + token_count INTEGER NOT NULL DEFAULT 0 CHECK (token_count >= 0), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """, + """ + CREATE UNIQUE INDEX IF NOT EXISTS idx_chunks_document_chunk_index + ON chunks(document_id, chunk_index) + """, + """ + CREATE INDEX IF NOT EXISTS idx_chunks_workspace_document + ON chunks(workspace_id, document_id) + """, + """ + CREATE TABLE IF NOT EXISTS chunk_embeddings ( + chunk_id UUID PRIMARY KEY REFERENCES chunks(id) ON DELETE CASCADE, + workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + embedding vector(1536) NOT NULL, + embedding_model TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """, + """ + CREATE INDEX IF NOT EXISTS idx_chunk_embeddings_workspace_document + ON chunk_embeddings(workspace_id, document_id) + """, + """ + CREATE INDEX IF NOT EXISTS idx_chunk_embeddings_vector + ON chunk_embeddings + USING hnsw (embedding vector_cosine_ops) + """, + """ + CREATE TABLE IF NOT EXISTS query_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + user_id TEXT NOT NULL, + query_text TEXT NOT NULL, + documents_searched UUID[] NOT NULL DEFAULT '{}', + retrieved_chunk_ids UUID[] NOT NULL DEFAULT '{}', + chunk_scores DOUBLE PRECISION[] NOT NULL DEFAULT '{}', + answer_text TEXT NULL, + error_message TEXT NULL, + retrieval_latency_ms INTEGER NOT NULL DEFAULT 0, + llm_latency_ms INTEGER NULL, + total_latency_ms INTEGER NOT NULL DEFAULT 0, + embedding_tokens_used BIGINT NOT NULL DEFAULT 0, + llm_input_tokens INTEGER NULL, + llm_output_tokens INTEGER NULL, + total_tokens_used BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """, + """ + CREATE INDEX IF NOT EXISTS idx_query_logs_workspace_created_at + ON query_logs(workspace_id, created_at DESC) + """, +] + + +def main() -> None: + Base.metadata.create_all(bind=engine) + with engine.begin() as conn: + for statement in DDL_STATEMENTS: + conn.execute(text(statement)) + print("Schema bootstrap completed.") + + +if __name__ == "__main__": + main() diff --git a/server/scripts/inspect_schema.py b/server/scripts/inspect_schema.py new file mode 100644 index 0000000..59073b2 --- /dev/null +++ b/server/scripts/inspect_schema.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from sqlalchemy import text + +from app.db.session import engine + + +def main() -> None: + with engine.connect() as conn: + table_rows = conn.execute( + text( + """ + SELECT table_schema, table_name + FROM information_schema.tables + WHERE table_schema NOT IN ('pg_catalog', 'information_schema') + ORDER BY table_schema, table_name + """ + ) + ) + extension_rows = conn.execute( + text( + """ + SELECT extname + FROM pg_extension + WHERE extname IN ('pgcrypto', 'vector') + ORDER BY extname + """ + ) + ) + + print("Tables:") + for row in table_rows: + print(f"{row.table_schema}.{row.table_name}") + + print("Extensions:") + for row in extension_rows: + print(row.extname) + + +if __name__ == "__main__": + main() diff --git a/server/tests/test_token_budget.py b/server/tests/test_token_budget.py index 921c16a..44e44fa 100644 --- a/server/tests/test_token_budget.py +++ b/server/tests/test_token_budget.py @@ -71,6 +71,47 @@ def test_release_reduces_reserved(db_session: Session, workspace_id: uuid.UUID, assert status["used"] == 0 +def test_budget_operations_use_short_transaction_when_caller_session_is_active( + sqlite_session_factory: sessionmaker, + limit_guard, +) -> None: + settings.DAILY_TOKEN_LIMIT = 1_000 + today = date(2026, 2, 15) + + setup_session = sqlite_session_factory() + workspace = Workspace(name="Active Transaction", owner_id=uuid.uuid4()) + setup_session.add(workspace) + setup_session.commit() + setup_session.close() + + caller_session = sqlite_session_factory() + observer_session = sqlite_session_factory() + try: + # Trigger SQLAlchemy autobegin on the caller session before reserving tokens. + caller_session.get(Workspace, workspace.id) + + reserve_result = reserve_tokens(caller_session, workspace.id, 200, today) + status_after_reserve = get_budget_status(observer_session, workspace.id, today) + + commit_result = commit_usage(caller_session, workspace.id, 125, today) + status_after_commit = get_budget_status(observer_session, workspace.id, today) + + release_result = release_tokens(caller_session, workspace.id, 75, today) + status_after_release = get_budget_status(observer_session, workspace.id, today) + + assert reserve_result["reserved"] == 200 + assert status_after_reserve["reserved"] == 200 + assert commit_result["used_now"] == 125 + assert status_after_commit["used"] == 125 + assert status_after_commit["reserved"] == 75 + assert release_result["reserved_now"] == 0 + assert status_after_release["used"] == 125 + assert status_after_release["reserved"] == 0 + finally: + caller_session.close() + observer_session.close() + + def test_resets_at_is_next_midnight_utc(db_session: Session, workspace_id: uuid.UUID, limit_guard) -> None: settings.DAILY_TOKEN_LIMIT = 1_000 usage_day = date(2026, 2, 15)