diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 83d1e77..18e5cd9 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -1,34 +1,98 @@ -name: Deploy +name: SDLE Scans on: + workflow_dispatch: + inputs: + PR_number: + description: 'Pull request number' + required: true push: - branches: [main] + branches: [ main ] + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + +concurrency: + group: sdle-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true jobs: - test: + +# ----------------------------- +# 1) Trivy Scan (fixed) +# ----------------------------- + trivy_scan: + name: Trivy Vulnerability Scan runs-on: ubuntu-latest + env: + TRIVY_REPORT_FORMAT: table + TRIVY_SCAN_TYPE: fs + TRIVY_SCAN_PATH: . + TRIVY_EXIT_CODE: '1' + TRIVY_VULN_TYPE: os,library + TRIVY_SEVERITY: CRITICAL,HIGH steps: - - uses: actions/checkout@v3 - - - name: Run server tests - run: | - cd server - docker build -t server-test . - docker run server-test pytest + - uses: actions/checkout@v4 + + - name: Create report directory + run: mkdir -p trivy-reports + + - name: Run Trivy FS Scan + uses: aquasecurity/trivy-action@0.24.0 + with: + scan-type: 'fs' + scan-ref: '.' + scanners: 'vuln,misconfig,secret,license' + ignore-unfixed: true + format: 'table' + exit-code: '1' + output: 'trivy-reports/trivy_scan_report.txt' + vuln-type: 'os,library' + severity: 'CRITICAL,HIGH' - - name: Run client tests + - name: Upload Trivy Report + uses: actions/upload-artifact@v4 + with: + name: trivy-report + path: trivy-reports/trivy_scan_report.txt + - name: Show Trivy Report in Logs + if: failure() run: | - cd client - npm ci - npm test + echo "========= TRIVY FINDINGS =========" + cat trivy-reports/trivy_scan_report.txt + echo "=================================" - deploy: - needs: test +# ----------------------------- +# 2) Bandit Scan +# ----------------------------- + bandit_scan: + name: Bandit security scan runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - - name: Deploy to production - run: | - docker-compose -f docker-compose.prod.yml build - docker-compose -f docker-compose.prod.yml up -d + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: 'recursive' + fetch-depth: 0 + - uses: actions/setup-python@v5 + with: + python-version: "3.x" + - name: Install Bandit + run: pip install bandit + - name: Create Bandit configuration + run: | + cat > .bandit << 'EOF' + [bandit] + exclude_dirs = tests,test,venv,.venv,node_modules + skips = B101 + EOF + shell: bash + - name: Run Bandit scan + run: | + bandit -r . -ll -iii -f screen + bandit -r . -ll -iii -f html -o bandit-report.html + - name: Upload Bandit Report + uses: actions/upload-artifact@v4 + with: + name: bandit-report + path: bandit-report.html + retention-days: 30 \ No newline at end of file diff --git a/README.md b/README.md index 2356203..a24dd45 100644 --- a/README.md +++ b/README.md @@ -1,155 +1,441 @@ -# Enterprise RAG System (v1) +# Enterprise RAG + +Enterprise RAG is a workspace-scoped document intelligence platform for uploading PDFs, extracting text, indexing chunks with embeddings, and answering questions with grounded citations. + +The repository is organized as a monorepo with three application surfaces: +- `server`: FastAPI API and core RAG orchestration +- `worker`: Redis/RQ background jobs for extraction, indexing, and maintenance +- `client`: React + Vite frontend with Supabase authentication + +This README is written for the current codebase. It explains what the system does today, how the services interact, and how the architecture is intended to scale like an enterprise application. + +## Table of Contents + +- [Why This Exists](#why-this-exists) +- [What The System Does](#what-the-system-does) +- [System Architecture](#system-architecture) +- [How It Works End to End](#how-it-works-end-to-end) +- [Repository Layout](#repository-layout) +- [API Surface](#api-surface) +- [Data Model](#data-model) +- [Limits and Controls](#limits-and-controls) +- [Local Development](#local-development) +- [Environment Variables](#environment-variables) +- [Operational Notes](#operational-notes) +- [Current Status](#current-status) +- [Roadmap](#roadmap) + +## Why This Exists + +Typical RAG demos stop at a single script that embeds documents and sends a prompt to an LLM. That is not enough for a production system. + +This project is built around the concerns that matter in enterprise environments: +- strict workspace isolation +- authenticated access with bearer tokens +- asynchronous ingestion so uploads do not block API requests +- token budget enforcement with reservation and commit semantics +- query logging and observability +- repeatable local development with Docker Compose +- clean separation between API, workers, storage, and UI + +## What The System Does + +At a high level, the platform supports this flow: +1. A user signs in with Supabase Auth. +2. The user creates a workspace. +3. The client requests a signed upload URL from the API. +4. A PDF is uploaded to Supabase Storage. +5. The API confirms the upload and enqueues background jobs. +6. Workers extract page text, split it into chunks, and generate embeddings. +7. The document becomes queryable. +8. The user asks a question against a document. +9. The API embeds the question, retrieves the most relevant chunks, calls the LLM with grounded context, and returns an answer with citations. +10. Usage, latency, and errors are recorded for observability. + +## System Architecture + +### Logical View -Production-oriented monorepo for a workspace-scoped Retrieval-Augmented Generation (RAG) platform using Supabase Auth, FastAPI, Redis/RQ workers, PostgreSQL + pgvector, and a React client. +```mermaid +flowchart LR + User[User] --> Client[React Client] + Client --> Auth[Supabase Auth] + Client -->|JWT| API[FastAPI API] + + API --> DB[(PostgreSQL + pgvector)] + API --> Redis[(Redis)] + API --> Storage[Supabase Storage] + API --> OpenAI[OpenAI API] + + API -->|enqueue| ExtractQ[RQ ingest_extract] + API -->|enqueue| IndexQ[RQ ingest_index] + + ExtractQ --> ExtractWorker[Extraction Worker] + IndexQ --> IndexWorker[Indexing Worker] + + ExtractWorker --> Storage + ExtractWorker --> DB + ExtractWorker --> Redis + + IndexWorker --> DB + IndexWorker --> OpenAI + + API --> Client +``` + +### Runtime Topology + +```text ++--------------------+ +---------------------+ +| React Client | | Supabase | +| Vite app | | Auth + Storage | ++---------+----------+ +----------+----------+ + | ^ + | JWT / signed upload flow | + v | ++---------+-------------------------------------------+ +| FastAPI Server | +| - auth validation | +| - workspace-scoped APIs | +| - query orchestration | +| - token budget checks | ++---------+----------------------+----------------------+ + | | + | SQL | enqueue jobs + v v ++---------+----------+ +-------+----------------------+ +| PostgreSQL | | Redis / RQ | +| pgvector | | rate limiting + job queues | ++---------+----------+ +-------+----------------------+ + ^ | + | v + | +-------+----------------------+ + | | Worker Processes | + | | - extract PDF text | + | | - chunk pages | + | | - create embeddings | + | | - cleanup reservations | + | +------------------------------+ + | + +------------ OpenAI embeddings / chat model +``` + +### Enterprise Design Characteristics -This repository follows `AGENTS.md` as the locked architecture contract. Some modules are scaffolded with TODOs; this README distinguishes current implementation from planned flow. +- API and worker responsibilities are separated. +- Document ingestion is asynchronous and queue-backed. +- Every major operation is scoped by `workspace_id`. +- Token usage is tracked centrally by day. +- Redis is used both for rate limiting and job execution. +- Vector retrieval stays in Postgres with `pgvector` instead of introducing another data store. +- The frontend is a separate deployable artifact. -## Project Overview +## How It Works End to End -Enterprise RAG enables a user to: -- authenticate with Supabase -- create a single workspace (v1 constraint) -- upload and index PDFs (pipeline scaffolded) -- run grounded queries over selected documents (query pipeline scaffolded) -- enforce a strict daily token budget per workspace +### 1. Authentication and Workspace Resolution -## Architecture Diagram +The client authenticates with Supabase and sends the bearer token to the API. The server validates the token and derives the current user. Workspace-scoped endpoints then resolve the user's workspace before accessing documents or usage records. + +Primary files: +- `client/src/lib/supabase.ts` +- `server/app/api/deps.py` +- `server/app/core/auth.py` +- `server/app/api/workspaces.py` + +### 2. Upload and Ingestion Pipeline + +The upload pipeline is designed so the API never has to receive the PDF bytes directly. ```mermaid -flowchart LR - U[User] --> C[React Client\nVite] - C -->|Supabase Auth| SA[Supabase Auth] - C -->|Bearer JWT| API[FastAPI Server] +sequenceDiagram + participant C as Client + participant A as API + participant S as Supabase Storage + participant R as Redis/RQ + participant W1 as Extract Worker + participant W2 as Index Worker + participant D as PostgreSQL + + C->>A: POST /documents/upload-prepare + A->>D: create placeholder document row + A-->>C: signed upload URL + storage path + C->>S: upload PDF directly + C->>A: POST /documents/upload-complete + A->>R: enqueue extract job + R->>W1: ingest_extract + W1->>S: download PDF + W1->>D: write document_pages + W1->>R: enqueue ingest_index + R->>W2: ingest_index + W2->>D: write chunks + W2->>D: write chunk_embeddings + W2->>D: mark document ready/indexed +``` - API --> DB[(PostgreSQL + pgvector)] - API --> R[(Redis)] - API --> ST[(Supabase Storage)] - - API -->|enqueue jobs| Q[RQ Queues\ningest_extract / ingest_index] - Q --> W1[Worker Extract] - Q --> W2[Worker Index] - - W1 --> DB - W2 --> DB - W1 --> ST - API --> OAI[OpenAI\nEmbeddings + LLM] - W2 --> OAI +What happens in practice: +- `upload-prepare` validates file size, content type, workspace limits, and idempotency. +- The API stores a placeholder document record and returns a signed storage URL. +- `upload-complete` confirms the object exists in storage and enqueues extraction. +- `ingest_extract` downloads the PDF and writes extracted page text into `document_pages`. +- `ingest_index` chunks page text, generates embeddings, stores vectors, and marks the document ready. + +Primary files: +- `server/app/api/documents.py` +- `server/app/core/storage.py` +- `worker/jobs/ingest_extract.py` +- `worker/jobs/ingest_index.py` + +### 3. Query Pipeline + +The query flow is grounded retrieval, not free-form generation. + +```mermaid +sequenceDiagram + participant C as Client + participant A as API + participant D as PostgreSQL/pgvector + participant O as OpenAI + + C->>A: POST /query or POST /query/stream + A->>A: validate workspace, document, limits + A->>O: embed question + A->>D: retrieve top-k chunks by vector similarity + A->>A: reserve token budget + A->>O: generate grounded answer + A->>A: commit actual usage and release remainder + A->>D: write query log + A-->>C: answer + citations + usage ``` -ASCII view: +What the server does: +- embeds the question with `text-embedding-3-small` +- retrieves top chunks from `chunk_embeddings` and `chunks` +- builds a grounded prompt using retrieved content +- reserves the estimated token budget before the LLM call +- commits actual usage after the response returns +- logs citations, latency, and token usage + +Primary files: +- `server/app/api/query.py` +- `server/app/api/query_stream.py` +- `server/app/core/retrieval.py` +- `server/app/core/embeddings.py` +- `server/app/core/llm.py` +- `server/app/core/token_budget.py` + +### 4. Usage and Observability + +The system exposes both real-time usage and an observability summary. + +Current observability coverage includes: +- daily token usage and remaining budget +- total query count +- 24-hour query volume and error rate +- latency statistics +- document status summary +- top queried documents +- recent query failures + +Primary files: +- `server/app/api/usage.py` +- `server/app/db/models.py` +- `worker/jobs/maintenance.py` + +## Repository Layout ```text -Client (React) -> FastAPI -> Postgres(pgvector) - | | ^ - v v | - Supabase Auth Redis/RQ -> Worker(s) - | | - +---------------------------+ - Supabase Storage / OpenAI +enterprise-rag/ +├── client/ # React + Vite frontend +├── server/ # FastAPI API, core logic, DB layer +├── worker/ # Redis/RQ workers and maintenance jobs +├── scripts/ # DB bootstrap and utility scripts +├── infrastructure/ # Infrastructure placeholders +├── docker-compose.yml # Full local stack +├── docker-compose.prod.yml # Production-style compose file +├── AGENTS.md # Architecture contract and implementation notes +└── README.md ``` -## Tech Stack - -- Backend: FastAPI, SQLAlchemy, Pydantic Settings -- Database: PostgreSQL + pgvector -- Queue: Redis + RQ -- Auth/Storage: Supabase Auth + Storage -- AI: OpenAI (`text-embedding-3-small`, `gpt-4o-mini` per locked architecture) -- Frontend: React + TypeScript + Vite + Supabase JS -- Infra/Dev: Docker Compose, Nginx (production client image) - -## High-Level Flow - -### 1) Supabase Auth -- Client signs in via Supabase (`client/src/lib/supabase.ts`). -- Client gets `access_token` from session. -- Backend validates bearer token in `server/app/core/auth.py` using Supabase SDK (with REST fallback). - -### 2) Workspace Creation -- `POST /workspaces` creates one workspace per user. -- Enforced uniqueness: if existing workspace owned by user is found, returns `409`. -- A daily usage row (`workspace_daily_usage`) is initialized at creation. - -### 3) Token Budget Engine -- Budget tracked per workspace/day in `workspace_daily_usage`. -- Implemented operations: - - reserve (`reserve_tokens`) - - release (`release_tokens`) - - commit actual usage (`commit_usage`) - - read status (`get_budget_status`) -- `GET /usage/today` returns `{used,reserved,limit,remaining,resets_at}`. - -### 4) Document Ingestion -- Locked architecture defines `upload-prepare -> upload-complete -> extract -> chunk -> embed -> ready`. -- Current repo status: - - document/query endpoints are scaffolded placeholders - - worker jobs `ingest_extract` and `ingest_index` are TODO stubs - - DB schema and queue wiring are present - -### 5) RAG Query Flow -- Locked architecture requires strict grounded retrieval over workspace-scoped chunks. -- Current repo status: - - `/query` route exists but returns `Not implemented` - - retrieval/chunking/embeddings modules are scaffolded +### Important Directories -## Environment Variables +- `server/app/api`: REST and streaming endpoints +- `server/app/core`: auth, retrieval, embeddings, prompts, token budget +- `server/app/db`: SQLAlchemy models and DB session setup +- `server/app/schemas`: request and response models +- `worker/jobs`: extraction, indexing, maintenance jobs +- `client/src/pages`: authenticated and public application pages +- `client/src/components`: UI modules for upload, chat, usage, and layout + +## API Surface + +### Health and Auth + +- `GET /health` +- `GET /auth/me` + +### Workspace + +- `POST /workspaces` +- `GET /workspaces/me` + +### Documents + +- `GET /documents` +- `GET /documents/{document_id}` +- `GET /documents/{document_id}/pages/{page_number}` +- `POST /documents/upload-prepare` +- `POST /documents/upload-complete` +- `POST /documents/{document_id}/retry` +- `POST /documents/{document_id}/reindex` +- `DELETE /documents/{document_id}` + +### Query and Retrieval + +- `POST /query` +- `POST /query/stream` +- `GET /citations/{chunk_id}` +- `GET /queries` +- `GET /queries/{query_id}` + +### Chat Sessions + +- `POST /chats/sessions` +- `PATCH /chats/sessions/{session_id}` +- `GET /chats/sessions` +- `GET /chats/sessions/{session_id}` + +### Usage and Observability + +- `GET /usage/today` +- `GET /usage/observability` + +## Data Model + +Core tables in the current implementation: + +- `workspaces`: tenant root for all user content +- `documents`: uploaded PDF metadata and pipeline status +- `document_pages`: extracted page text +- `chunks`: page-bounded text chunks used for retrieval +- `chunk_embeddings`: vector embeddings stored in `pgvector` +- `workspace_daily_usage`: daily token accounting with reserved and used buckets +- `query_logs`: query history, citations, latency, and token metrics +- `chat_sessions`: persisted chat metadata and messages + +### Status Lifecycle + +Document lifecycle in the current codebase: + +```text +pending_upload/uploading -> uploaded -> extracting -> indexing -> ready/indexed + \-> failed +``` + +## Limits and Controls + +Current enforced limits from the application config and rate limiter: + +- `1` workspace per user +- up to `100` documents per workspace +- maximum file size: `20 MB` +- supported upload type: `application/pdf` +- maximum query length: `500` characters +- retrieval depth: `top_k = 5` +- LLM max output tokens: `2000` +- daily token limit: `100000` tokens per workspace +- upload prepare rate limit: `10` requests per minute per workspace +- upload complete rate limit: `20` requests per minute per workspace +- query rate limit: `100` requests per minute per workspace + +### Token Budget Model + +The token budget is managed with reservation semantics so concurrent requests do not overspend the daily allowance. + +Flow: +1. Estimate query embedding + prompt + max output cost. +2. Reserve the estimated tokens. +3. Execute the LLM call. +4. Commit actual tokens used. +5. Release any unused reservation. +6. Periodically clean stale reservations. + +This logic is implemented in `server/app/core/token_budget.py` and `worker/jobs/maintenance.py`. + +## Local Development -Core env is defined in `.env.example`. +### Prerequisites + +- Docker and Docker Compose +- Node.js 20+ if running the client outside Docker +- Python 3.11 if running the API or worker outside Docker +- A Supabase project +- An OpenAI API key for embeddings and answer generation + +### Quick Start With Docker Compose + +1. Create your environment file. + +```bash +cp .env.example .env +``` + +2. Fill in at least these values: ```bash -# Supabase SUPABASE_URL= SUPABASE_SERVICE_ROLE_KEY= SUPABASE_ANON_KEY= SUPABASE_JWT_SECRET= -SUPABASE_KEY= # compatibility alias - -# AI OPENAI_API_KEY= +DATABASE_URL=postgresql://postgres:postgres@localhost:5432/enterprise_rag +REDIS_URL=redis://localhost:6379/0 +``` -# Data/queue -DATABASE_URL= -REDIS_URL= - -# App -ENVIRONMENT=development -API_HOST=0.0.0.0 -API_PORT=8000 -DAILY_TOKEN_LIMIT=100000 +3. Start the stack. -# Client -VITE_API_URL=http://localhost:8000 -VITE_SUPABASE_URL= -VITE_SUPABASE_ANON_KEY= +```bash +docker-compose up --build ``` -What matters most right now: -- `SUPABASE_URL` + service role key for backend token validation -- `VITE_SUPABASE_URL` + anon key for client auth -- `DATABASE_URL` for server + workers -- `REDIS_URL` for workers +4. Open the services: +- client: `http://localhost:5173` +- api: `http://localhost:8000` +- rq dashboard: `http://localhost:9181` -## Run Locally (Backend + Frontend) - -### Option A: Docker Compose (recommended) +### Useful Commands ```bash -cp .env.example .env +# start everything +docker-compose up + +# rebuild and start docker-compose up --build -``` -Services: -- API: `http://localhost:8000` -- Client: `http://localhost:5173` -- RQ Dashboard: `http://localhost:9181` +# stop services +docker-compose down + +# stop and remove volumes +docker-compose down -v -### Option B: Run modules directly +# run DB migrations from the server container +docker-compose exec server alembic upgrade head + +# view server logs +docker-compose logs -f server + +# view worker logs +docker-compose logs -f worker-extract +docker-compose logs -f worker-index +``` + +### Run Services Individually Server: ```bash cd server -python -m venv .venv && source .venv/bin/activate +python -m venv .venv +source .venv/bin/activate pip install -r requirements.txt uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 ``` @@ -162,70 +448,120 @@ npm install npm run dev -- --host 0.0.0.0 ``` -Worker (example queue): +Worker: ```bash cd worker -python -m venv .venv && source .venv/bin/activate +python -m venv .venv +source .venv/bin/activate pip install -r requirements.txt -QUEUE_NAME=ingest_extract REDIS_URL=redis://localhost:6379/0 python worker.py +QUEUE_NAME=ingest_extract python worker.py ``` -## Run With Supabase +## Environment Variables + +Root `.env.example` is the primary template for local development. -1. Create Supabase project. -2. Fill `.env` with Supabase URL, service role key, anon key. -3. Apply schema: +### Required Core Variables ```bash -psql "$DATABASE_URL" -f scripts/schema.supabase.sql -``` +SUPABASE_URL=https://your-project.supabase.co +SUPABASE_SERVICE_ROLE_KEY=your-service-role-key +SUPABASE_ANON_KEY=your-anon-key +SUPABASE_JWT_SECRET=your-jwt-secret +SUPABASE_STORAGE_BUCKET=documents -4. Start stack (`docker-compose up --build`). -5. Open client and sign in. +OPENAI_API_KEY=sk-... -Basic API check with JWT: - -```bash -curl -H "Authorization: Bearer " \ - http://localhost:8000/auth/me +DATABASE_URL=postgresql://postgres:postgres@localhost:5432/enterprise_rag +REDIS_URL=redis://localhost:6379/0 ``` -Create workspace: +### Useful Application Variables ```bash -curl -X POST http://localhost:8000/workspaces \ - -H "Authorization: Bearer " \ - -H "Content-Type: application/json" \ - -d '{"name":"My Workspace"}' +ENVIRONMENT=development +API_HOST=0.0.0.0 +API_PORT=8000 +DAILY_TOKEN_LIMIT=100000 +RESERVATION_TTL_SECONDS=600 +LOG_EACH_QUERY=false +EMBEDDING_MODEL=text-embedding-3-small +VITE_API_URL=http://localhost:8000 +VITE_SUPABASE_URL=https://your-project.supabase.co +VITE_SUPABASE_ANON_KEY=your-anon-key ``` -Get usage today: +## Operational Notes -```bash -curl -H "Authorization: Bearer " \ - http://localhost:8000/usage/today -``` +### Workspace Isolation -## Folder Structure Summary +The system is designed around `workspace_id` as the isolation boundary. Document access, usage tracking, retrieval, and query logs are all scoped to a workspace. -```text -enterprise-rag/ -├── server/ # FastAPI API + token budget + DB models -├── client/ # React/Vite frontend with Supabase auth -├── worker/ # RQ workers (extract/index + maintenance) -├── scripts/ # DB schema/bootstrap scripts -├── infrastructure/ # Terraform/K8s placeholders -├── docker-compose.yml -└── AGENTS.md # Locked architecture contract -``` +### Storage Strategy + +PDF binaries live in Supabase Storage. Extracted text, chunks, metadata, and embeddings live in Postgres. + +### Vector Search Strategy + +Embeddings are stored in `chunk_embeddings.embedding` using `pgvector`. Retrieval uses cosine distance and returns the most relevant chunk candidates for a single document. + +### Failure Recovery + +- failed documents can be retried with `POST /documents/{document_id}/retry` +- already processed documents can be reindexed with `POST /documents/{document_id}/reindex` +- stale token reservations can be cleared by the maintenance job +- document deletion removes metadata first, then attempts storage cleanup + +### Frontend Application Areas + +Current routed pages in the client: +- `/login` +- `/signup` +- `/workspace` +- `/app/upload` +- `/app/chat` +- `/app/observability` +- `/app/workspace` + +## Current Status + +The repository is beyond a scaffold. These capabilities are already present in code: + +- JWT-backed auth integration with Supabase +- one-workspace-per-user model +- signed upload preparation and upload completion +- background extraction and indexing jobs +- page text persistence +- chunk persistence and vector embedding storage +- grounded query endpoint +- streaming query endpoint using SSE +- citation source retrieval +- query history APIs +- chat session APIs +- usage and observability endpoints +- Docker-based local runtime + +Known gaps or areas still being hardened: +- not every table in the architecture contract is represented yet in SQLAlchemy models +- `server/app/core/chunking.py` remains a placeholder while worker-side chunking is active +- production deployment still needs full operational hardening, secrets handling, and CI maturity +- test coverage is still light for end-to-end ingestion and retrieval + +## Roadmap + +Near-term improvements that fit the current architecture: + +1. Move chunking into a shared core module so API and workers use one implementation. +2. Expand integration tests around upload, extraction, indexing, and query behavior. +3. Add stronger metrics, worker lifecycle hooks, and scheduled maintenance execution. +4. Harden migration coverage for all current tables and status transitions. +5. Expand query scope from single-document search to selected multi-document search where needed. +6. Improve production deployment docs and CI/CD validation. -## Development Order Roadmap +## Related Docs -1. Complete document API contracts (`upload-prepare`, `upload-complete`, list/status). -2. Implement extraction worker (`worker/jobs/ingest_extract.py`) and page persistence. -3. Implement chunking + embeddings pipeline (`server/app/core/chunking.py`, `embeddings.py`, `worker/jobs/ingest_index.py`). -4. Implement retrieval + grounded query endpoint (`server/app/api/query.py`, `core/retrieval.py`). -5. Add stale reservation scheduled maintenance integration and observability. -6. Expand client from test harness to full app pages (`Documents`, `Query`, `Usage`, `Dashboard`). -7. Harden with integration tests (auth, workspace isolation, ingestion, retrieval, budget edge cases). +- `AGENTS.md`: architecture contract and implementation guidance +- `server/README.md`: server-specific notes +- `worker/README.md`: worker-specific notes +- `client/README.md`: client-specific notes diff --git a/client/README.md b/client/README.md index a78c1ad..edb8e19 100644 --- a/client/README.md +++ b/client/README.md @@ -1,96 +1,212 @@ -# Client (`client/`) +# Client + +`client/` is the React frontend for Enterprise RAG. It handles authentication with Supabase, workspace onboarding, document upload UX, document-aware chat, and observability views. + +## Responsibilities + +- sign in and sign up users with Supabase Auth +- keep the current session and bearer token in client state +- route users through workspace creation before entering the app shell +- upload PDFs through the API + Supabase signed URL flow +- display ingestion progress and document status +- run streaming grounded chat against the selected document +- show citations, sources, usage, and observability metrics + +## Tech Stack + +- React 18 +- TypeScript +- Vite +- React Router +- Supabase JS +- Tailwind CSS utilities +- Axios and fetch-based API calls + +## App Structure + +```text +client/ +├── src/ +│ ├── App.tsx # top-level routes +│ ├── context/AuthContext.tsx # session lifecycle and auth actions +│ ├── lib/api.ts # typed API client +│ ├── lib/supabase.ts # Supabase client +│ ├── pages/ +│ │ ├── Login.tsx +│ │ ├── Signup.tsx +│ │ ├── WorkspaceGate.tsx +│ │ ├── UploadPage.tsx +│ │ ├── ChatPage.tsx +│ │ ├── ObservabilityPage.tsx +│ │ └── WorkspaceInfoPage.tsx +│ ├── components/ +│ │ ├── layout/ +│ │ ├── upload/ +│ │ ├── chat/ +│ │ └── documents/ +│ └── styles/ +├── package.json +└── vite.config.ts +``` -Frontend for Enterprise RAG v1 using React + Vite + TypeScript + Tailwind. +## Route Map -## Implemented UI/UX Scope +Public routes: +- `/login` +- `/signup` -- Auth: `/login`, `/signup` -- Workspace creation gate: `/workspace` -- Protected app shell with persistent sidebar + top bar: `/app/*` -- Upload and ingestion tracking: `/app/upload` - - Includes per-row `Delete` action (calls `DELETE /documents/{id}`) -- Document-context chat scaffold (stubbed reply): `/app/chat` -- Separate workspace info page: `/app/workspace` +Authenticated routes: +- `/workspace` +- `/app/upload` +- `/app/chat` +- `/app/observability` +- `/app/workspace` -## Theme +Top-level flow in `src/App.tsx`: +1. unauthenticated users are redirected to `/login` +2. authenticated users are redirected to `/workspace` +3. workspace setup routes the user into `/app/*` +4. protected app routes render inside the shared app shell -- Background: `#FFFFFF` -- Accent: `#F97316` -- Primary text: `#111827` -- Borders/surfaces: `#E5E7EB` and `#F9FAFB` +## UX Flows -## Environment Variables +### Authentication -Create `client/.env`: +`AuthContext` is the session source of truth. -```bash -VITE_API_URL=http://localhost:8000 -VITE_SUPABASE_URL=https://your-project.supabase.co -VITE_SUPABASE_ANON_KEY=your-public-anon-key -``` +Current behavior: +- reads the initial Supabase session on load +- subscribes to auth state changes +- exposes `signIn`, `signUp`, and `signOut` +- registers a global unauthorized handler so API `401` responses log the user out and redirect to `/login` -## Run +Primary files: +- `src/context/AuthContext.tsx` +- `src/lib/supabase.ts` +- `src/lib/api.ts` -```bash -cd client -npm install -npm run dev -``` +### Document Upload -Open: `http://localhost:5173` +Upload UX is centered in `UploadPage` and `components/upload/UploadPanel`. -## Routing Flow +Flow: +1. request `POST /documents/upload-prepare` +2. upload the PDF directly to the signed URL +3. notify the backend with `POST /documents/upload-complete` +4. poll the document list while processing is active +5. let the user delete documents from the table -1. Unauthenticated users land on `/login` or `/signup`. -2. After login/signup, app redirects to `/workspace`. -3. `/workspace`: - - If workspace exists: redirect to `/app/upload` - - If workspace is missing: show create workspace card -4. `/app/*` is protected and uses a shared shell: - - `/app/upload` - - `/app/chat` - - `/app/workspace` +Current behavior: +- shows live document status in a table +- refreshes every 4 seconds while documents are processing +- treats `ready` and `indexed` as successful ingest states + +### Chat and Streaming Query + +`ChatPage` drives the main RAG interaction. + +Current behavior: +- binds the active document from the app shell context +- streams answer deltas from `POST /query/stream` +- loads citations and source text on demand +- persists and restores draft transcript state locally +- stores chat sessions with `/chats/sessions` +- updates usage metrics in the app shell after responses + +Primary files: +- `src/pages/ChatPage.tsx` +- `src/components/chat/*` +- `src/lib/api.ts` + +### Observability + +`ObservabilityPage` renders a workspace-level operational summary using `GET /usage/observability`. + +Displayed metrics: +- total queries +- 24-hour queries and errors +- error rate +- average latency +- p95 latency +- tokens used and remaining +- document pipeline health +- top queried documents +- recent query errors ## API Contracts Used -- `GET /workspaces/me` +The client currently uses these backend APIs: + +- `GET /auth/me` - `POST /workspaces` +- `GET /workspaces/me` - `GET /documents` +- `GET /documents/{document_id}` +- `GET /documents/{document_id}/pages/{page_number}` - `POST /documents/upload-prepare` - `POST /documents/upload-complete` -- `GET /documents/{id}` -- Optional: `GET /usage/today` (fallbacks to workspace usage if unavailable) - -All API requests include: +- `DELETE /documents/{document_id}` +- `POST /query/stream` +- `GET /citations/{chunk_id}` +- `GET /queries` +- `GET /queries/{query_id}` +- `POST /chats/sessions` +- `PATCH /chats/sessions/{session_id}` +- `GET /chats/sessions` +- `GET /chats/sessions/{session_id}` +- `GET /usage/today` +- `GET /usage/observability` + +All authenticated requests send: ```http -Authorization: Bearer +Authorization: Bearer ``` -Behavior: -- Global `401` handling signs out and redirects to `/login` -- `/app/upload` polls documents every 4 seconds while docs are still processing +## Environment -## Upload Pipeline UX +Create `client/.env` with: -Per file upload task states: +```bash +VITE_API_URL=http://localhost:8000 +VITE_SUPABASE_URL=https://your-project.supabase.co +VITE_SUPABASE_ANON_KEY=your-public-anon-key +``` + +## Run -- `queued` -- `preparing` -- `uploading` -- `completing` -- backend-driven: `extracting`, `indexing`, `indexed`, `failed` +```bash +cd client +npm install +npm run dev -- --host 0.0.0.0 +``` + +Build for production: -The upload queue supports multiple files (up to 100) with client-side concurrency limit of 4. +```bash +npm run build +``` + +Preview the built app: + +```bash +npm run preview +``` + +## Development Commands + +```bash +cd client +npm run dev +npm run build +npm run lint +npm run test +``` -## Manual Test Plan +## Notes for Contributors -1. Sign up or log in. -2. Confirm redirect to `/workspace`. -3. If prompted, create workspace. -4. Confirm redirect to `/app/upload`. -5. Upload multiple PDFs and watch per-file state changes. -6. Wait until at least one document reaches `Indexed`. -7. Select the indexed document from the left sidebar and verify navigation to `/app/chat`. -8. Send a message in chat and verify stub response appears. -9. Open `/app/workspace`, verify usage metrics, document counts, and refresh behavior. +- `src/lib/api.ts` is the contract layer; keep request and response types aligned with the backend +- auth redirects are centralized through the unauthorized handler, so avoid duplicating logout-on-401 logic elsewhere +- chat is document-scoped in the current UI model +- upload and polling behavior assume asynchronous backend processing states +- preserve the existing visual language unless the task is explicitly a redesign diff --git a/server/README.md b/server/README.md index 1ca88b8..be98c11 100644 --- a/server/README.md +++ b/server/README.md @@ -1,145 +1,207 @@ -# Server Module (`server/`) +# Server -## Purpose +`server/` is the FastAPI backend for Enterprise RAG. It owns authentication, workspace resolution, document APIs, grounded query orchestration, token budget enforcement, and observability endpoints. -`server/` is the FastAPI backend for Enterprise RAG. It is responsible for: -- Supabase JWT validation -- workspace ownership and isolation checks -- token budget accounting and reservation safety -- API surface for auth/workspace/documents/query/usage -- persistence through SQLAlchemy models over PostgreSQL +## Responsibilities -Current implementation includes auth/workspace/usage endpoints and token-budget core logic; documents/query/retrieval/chunking/embeddings are scaffolded for next implementation stages. +- validate Supabase JWTs +- resolve the caller's workspace +- manage document upload preparation and upload completion +- enqueue extraction and indexing jobs +- execute grounded RAG queries and SSE streaming queries +- track daily token usage with reserve/commit/release semantics +- expose query history, citations, chat sessions, and observability data -## FastAPI Structure Breakdown +## Module Layout ```text server/ ├── app/ -│ ├── main.py # FastAPI app, CORS, router mounting -│ ├── config.py # env settings + UTC helpers -│ ├── api/ -│ │ ├── auth.py # GET /auth/me -│ │ ├── workspaces.py # POST /workspaces, GET /workspaces/me -│ │ ├── usage.py # GET /usage/today -│ │ ├── documents.py # scaffold -│ │ └── query.py # scaffold -│ ├── core/ -│ │ ├── auth.py # JWT validation via Supabase -│ │ ├── token_budget.py # reserve/release/commit/status -│ │ ├── chunking.py # scaffold -│ │ ├── embeddings.py # scaffold -│ │ └── retrieval.py # scaffold -│ ├── db/ -│ │ ├── session.py # engine/session/base -│ │ └── models.py # ORM models -│ ├── schemas/ # request/response models -│ └── storage/client.py # storage scaffold -├── migrations/ -└── tests/ +│ ├── main.py # FastAPI app and router registration +│ ├── config.py # environment settings and UTC helpers +│ ├── api/ # route handlers +│ ├── core/ # auth, retrieval, llm, embeddings, token budget +│ ├── db/ # SQLAlchemy session, models, repositories +│ ├── schemas/ # request and response schemas +│ ├── storage/ # Supabase storage integration +│ └── utils/ # rate limiting and logging helpers +├── migrations/ # Alembic environment and revisions +├── tests/ +├── requirements.txt +└── pyproject.toml ``` -## Core Modules - -### `auth` -- `server/app/core/auth.py` -- Validates bearer JWT against Supabase. -- Primary path: `supabase.auth.get_user(jwt)`. -- Fallback path: direct REST call to `/auth/v1/user` when SDK/httpx compatibility issues occur. - -### `token_budget` -- `server/app/core/token_budget.py` -- Implements row-safe daily accounting in `workspace_daily_usage` with lock semantics. -- Handles: - - `reserve_tokens` - - `release_tokens` - - `commit_usage` - - `get_budget_status` -- Uses upsert-style row initialization (`get_or_create` behavior) for both PostgreSQL and SQLite. - -### `database` -- `server/app/db/session.py`: SQLAlchemy engine/session lifecycle. -- `server/app/db/models.py`: ORM for `workspaces`, `documents`, `workspace_daily_usage`. -- Full target schema exists in `scripts/schema.local.sql` and `scripts/schema.supabase.sql`. - -### `api routers` -Mounted in `server/app/main.py`: -- `/auth` -- `/workspaces` -- `/documents` (scaffold) -- `/query` (scaffold) -- `/usage` - -## How JWT Validation Works - -Request path: +## API Surface + +Routes registered in `app/main.py`: + +- `GET /health` +- `GET /auth/me` +- `POST /workspaces` +- `GET /workspaces/me` +- `GET /documents` +- `GET /documents/{document_id}` +- `GET /documents/{document_id}/pages/{page_number}` +- `POST /documents/upload-prepare` +- `POST /documents/upload-complete` +- `POST /documents/{document_id}/retry` +- `POST /documents/{document_id}/reindex` +- `DELETE /documents/{document_id}` +- `POST /query` +- `POST /query/stream` +- `GET /citations/{chunk_id}` +- `GET /queries` +- `GET /queries/{query_id}` +- `POST /chats/sessions` +- `PATCH /chats/sessions/{session_id}` +- `GET /chats/sessions` +- `GET /chats/sessions/{session_id}` +- `GET /usage/today` +- `GET /usage/observability` + +## Request Flow + +### Auth and Workspace Resolution + 1. Client sends `Authorization: Bearer `. -2. `get_current_user` (`app/api/deps.py`) enforces bearer format. -3. `validate_jwt_and_get_user` (`app/core/auth.py`) verifies token with Supabase. -4. On success, request receives `AuthenticatedUser` with `user_id/email/role`. +2. `app/api/deps.py` validates bearer presence. +3. `app/core/auth.py` validates the JWT with Supabase. +4. `get_workspace_id()` resolves the current user's workspace. +5. Workspace-scoped routes operate only within that resolved `workspace_id`. -Minimal check: +Primary files: +- `app/api/deps.py` +- `app/core/auth.py` +- `app/api/workspaces.py` -```bash -curl -H "Authorization: Bearer " \ - http://localhost:8000/auth/me +### Document Upload Lifecycle + +```text +upload-prepare -> upload-complete -> enqueue extract -> extract pages + -> enqueue index -> create chunks/embeddings -> ready ``` -## How Workspace Isolation Works +What the API does: +- validates file size and content type +- enforces document count limits +- creates placeholder document records +- issues signed upload URLs through Supabase Storage +- verifies uploaded object existence +- enqueues RQ jobs for extraction and indexing +- supports retry, reindex, and delete flows -Isolation model in current implementation: -- `GET /workspaces/me` resolves workspace by `owner_id == current_user.id`. -- `get_workspace_id` dependency returns only the caller-owned workspace id. -- `GET /usage/today` is scoped via that dependency. +Primary files: +- `app/api/documents.py` +- `app/core/storage.py` -Architecture contract requires every data query to include `workspace_id` filters. The current workspace and usage flows follow this; document/query endpoints are pending implementation. +### Query Lifecycle -## How Token Reservation Model Works +```text +question -> embed question -> retrieve top-k chunks -> reserve tokens + -> call LLM -> commit actual usage -> return answer + citations +``` -Core behavior (`app/core/token_budget.py`): +What happens in code: +- question embedding comes from `app/core/embeddings.py` +- vector retrieval comes from `app/core/retrieval.py` +- grounded prompting and LLM calls come from `app/core/llm.py` and `app/core/prompts.py` +- token reservation and usage accounting come from `app/core/token_budget.py` +- query metadata is logged into `query_logs` when enabled -```python -reserve -> tokens_reserved += amount (if within DAILY_TOKEN_LIMIT) -release -> tokens_reserved -= amount -commit -> tokens_reserved -= amount; tokens_used += amount -status -> remaining = limit - (used + reserved) -``` +Primary files: +- `app/api/query.py` +- `app/api/query_stream.py` +- `app/core/retrieval.py` +- `app/core/llm.py` +- `app/core/token_budget.py` -Concurrency safety: -- uses transactional row locks (`SELECT ... FOR UPDATE` where supported) -- tested for reservation race behavior in `server/tests/test_token_budget.py` +## Core Subsystems -Example usage endpoint: +### Token Budget -```bash -curl -H "Authorization: Bearer " \ - http://localhost:8000/usage/today -``` +The server enforces a daily workspace token limit using `workspace_daily_usage`. + +Operations: +- `reserve_tokens()` +- `release_tokens()` +- `commit_usage()` +- `get_budget_status()` + +Behavior: +- reservations are acquired before LLM work +- actual usage is committed after the model returns +- unused reserved tokens are released +- stale reservations are cleaned separately by the worker maintenance job + +Primary file: +- `app/core/token_budget.py` + +### Retrieval + +The retrieval layer reads vectors from `chunk_embeddings` and chunk/page text from Postgres. + +Current behavior: +- query embedding is turned into a `pgvector` literal +- cosine distance query returns top-k chunks for one document +- each result includes chunk text, page text, score, and token count + +Primary file: +- `app/core/retrieval.py` -## Important Environment Variables +### Rate Limiting -From `server/.env.example` and `app/config.py`: +Redis-backed per-workspace rate limiting is enforced in the API. + +Current limits: +- queries: `100` requests / `60s` +- upload prepare: `10` requests / `60s` +- upload complete and retry/reindex mutations: `20` requests / `60s` + +Primary files: +- `app/core/rate_limit.py` +- `app/utils/rate_limit.py` + +## Data Model + +The server depends on these core tables: + +- `workspaces` +- `documents` +- `document_pages` +- `chunks` +- `chunk_embeddings` +- `workspace_daily_usage` +- `query_logs` +- `chat_sessions` + +Current SQLAlchemy models are defined in `app/db/models.py`. Full schema scripts live under `../scripts/`. + +## Environment + +Minimal server environment: ```bash -SUPABASE_URL= -SUPABASE_SERVICE_ROLE_KEY= -SUPABASE_KEY= # optional alias -DATABASE_URL= -REDIS_URL= +SUPABASE_URL=https://your-project.supabase.co +SUPABASE_SERVICE_ROLE_KEY=your-service-role-key +SUPABASE_KEY=your-service-role-key +SUPABASE_STORAGE_BUCKET=documents + +DATABASE_URL=postgresql://postgres:postgres@localhost:5432/enterprise_rag +REDIS_URL=redis://localhost:6379/0 +OPENAI_API_KEY=sk-... + ENVIRONMENT=development API_HOST=0.0.0.0 API_PORT=8000 DAILY_TOKEN_LIMIT=100000 RESERVATION_TTL_SECONDS=600 +LLM_MODEL=gpt-4o-mini +EMBEDDING_MODEL=text-embedding-3-small ``` -Notes: -- `SUPABASE_SERVICE_ROLE_KEY` is preferred; `SUPABASE_KEY` is accepted as fallback. -- `RESERVATION_TTL_SECONDS` is used by maintenance cleanup logic (currently in worker). +## Run -## How To Run Server - -### Local Python +### Local ```bash cd server @@ -161,22 +223,20 @@ Health check: curl http://localhost:8000/health ``` -## Future Expansion Notes - -Planned next backend milestones (aligned with locked architecture): -- implement document upload lifecycle and metadata validation -- add extraction/chunking/embedding orchestration endpoints -- implement pgvector retrieval and grounded `/query` -- add workspace-scoped rate limiting and structured logging -- complete repository layer under `app/db/repositories/` -- expand Alembic migrations and integration tests beyond health/token-budget +## Development Commands -## Rate Limiting +```bash +cd server +pytest +ruff check . +black --check . +mypy app +``` -Rate limiting is implemented with Redis counters: +## Notes for Contributors -- `POST /query`: enforced in `server/app/api/query.py` (`_enforce_query_rate_limit`, 100 requests / 60s / workspace). -- `POST /documents/upload-prepare`: enforced in `server/app/api/documents.py` via `app.utils.rate_limit.enforce_workspace_rate_limit` (10 requests / 60s / workspace). -- `POST /documents/upload-complete`: enforced in `server/app/api/documents.py` via the same limiter (20 requests / 60s / workspace). -- `POST /documents/{document_id}/retry`: enforced in `server/app/api/documents.py` using the document mutation limiter (20 requests / 60s / workspace). -- `POST /documents/{document_id}/reindex`: enforced in `server/app/api/documents.py` using the document mutation limiter (20 requests / 60s / workspace). +- keep every data access path workspace-scoped +- do not bypass the token budget layer for query-time model usage +- document status transitions matter because workers and UI depend on them +- `app/core/chunking.py` is still a placeholder; worker-side chunking is currently the active path +- if you add endpoints, register them in `app/main.py` and add corresponding schemas diff --git a/worker/README.md b/worker/README.md index 80c069e..45135fc 100644 --- a/worker/README.md +++ b/worker/README.md @@ -1,138 +1,171 @@ -# Worker Module (`worker/`) +# Worker -## Purpose of Worker Service +`worker/` runs asynchronous background jobs for Enterprise RAG. It is responsible for the long-running work that should not happen in the request path: PDF extraction, chunk indexing, embedding generation, and maintenance cleanup. -`worker/` runs asynchronous background jobs for ingestion and operational maintenance. It is designed to decouple long-running PDF processing/indexing tasks from synchronous API requests. +## Responsibilities -Current state: -- queue runner is implemented (`worker.py`) -- ingestion jobs are scaffolded stubs -- stale token reservation cleanup logic is implemented (`jobs/maintenance.py`) +- consume RQ jobs from Redis +- extract page text from uploaded PDFs +- persist `document_pages` +- split page text into chunks +- generate embeddings for chunks +- mark documents as `ready` or `failed` +- clean stale token reservations + +## Module Layout + +```text +worker/ +├── jobs/ +│ ├── ingest_extract.py # PDF download and page extraction +│ ├── ingest_index.py # chunking and embedding generation +│ └── maintenance.py # stale reservation cleanup +├── shared/ # shared package placeholder +├── worker.py # RQ worker entrypoint +├── requirements.txt +└── tests/ +``` -## RQ Queue Architecture +## Queue Model -`worker/worker.py`: -- reads `QUEUE_NAME` (or CLI arg) and `REDIS_URL` -- creates Redis connection -- creates RQ `Queue` -- starts `Worker([queue])` +The worker process reads one or more queue names from `QUEUE_NAME` or the first CLI argument. -Queue names used by compose: +Current queue names: - `ingest_extract` - `ingest_index` -Example startup: +Entrypoint behavior in `worker.py`: +- connect to Redis +- create `Queue` objects for the configured names +- start an RQ `Worker` + +Example: ```bash cd worker QUEUE_NAME=ingest_extract REDIS_URL=redis://localhost:6379/0 python worker.py ``` -## `ingest_extract` vs `ingest_index` +## Job Flows ### `jobs/ingest_extract.py` -Intended responsibility (locked architecture): -- fetch uploaded PDF metadata/blob -- extract page text -- persist `document_pages` -- update document state (`uploaded` -> `indexing` or `failed`) -Current status: -- placeholder `run(document_id: str)` function +Purpose: +- download the uploaded PDF from Supabase Storage +- read it with `pypdf` +- extract per-page text +- replace any previous `document_pages` rows for idempotency +- update the document status to `indexing` +- enqueue `ingest_index` + +Current behavior: +- extraction is page-based +- temporary PDF file is written under `/tmp` +- failures mark the document as `failed` +- if the schema does not include `extracting`, the code falls back to `indexing` ### `jobs/ingest_index.py` -Intended responsibility (locked architecture): -- chunk extracted page text -- generate embeddings -- persist `chunks` and `chunk_embeddings` -- mark document `ready` when complete -Current status: -- placeholder `run(document_id: str)` function +Purpose: +- load extracted page text +- split each page into chunks +- insert chunk rows +- reserve token budget per chunk embedding +- call OpenAI embeddings +- insert vectors into `chunk_embeddings` +- commit token usage +- mark the document as `ready` or `indexed` + +Current chunking behavior: +- page-bounded chunks +- target chunk size: `500` tokens +- overlap: `100` tokens +- `tiktoken` when available, character approximation fallback otherwise + +Failure behavior: +- budget exhaustion marks the document `failed` +- any outstanding reservations are released on failure +- chunk rows are rebuilt idempotently for the document during reindex -## Maintenance Jobs (Token Cleanup) +### `jobs/maintenance.py` -`jobs/maintenance.py` implements `cleanup_stale_reservations()`: -- connects to DB via `DATABASE_URL` -- reads `RESERVATION_TTL_SECONDS` (default `600`) -- sets `tokens_reserved=0` for stale rows in `workspace_daily_usage` -- supports PostgreSQL and SQLite SQL variants +Purpose: +- clear stale rows in `workspace_daily_usage` where tokens remain reserved beyond TTL -Example invocation: +Current behavior: +- supports PostgreSQL and SQLite variants +- reads `DATABASE_URL` and `RESERVATION_TTL_SECONDS` +- returns affected row count + +Example: ```bash cd worker -DATABASE_URL=postgresql://postgres:postgres@localhost:5432/enterprise_rag \ -RESERVATION_TTL_SECONDS=600 \ python -c "from jobs.maintenance import cleanup_stale_reservations; print(cleanup_stale_reservations())" ``` -## How Redis Integrates +## Runtime Flow + +```text +API upload-complete + -> enqueue ingest_extract + -> extract pages from PDF + -> enqueue ingest_index + -> chunk pages + -> embed chunks + -> mark document ready +``` + +## Environment -Redis is both: -- transport for queued jobs -- worker coordination backend for RQ +Minimal worker environment: -In `docker-compose.yml`: -- Redis service runs at `redis://redis:6379/0` inside containers -- workers and server share that connection target for enqueue/consume workflows +```bash +DATABASE_URL=postgresql://postgres:postgres@localhost:5432/enterprise_rag +REDIS_URL=redis://localhost:6379/0 +SUPABASE_URL=https://your-project.supabase.co +SUPABASE_SERVICE_ROLE_KEY=your-service-role-key +SUPABASE_KEY=your-service-role-key +OPENAI_API_KEY=sk-... +QUEUE_NAME=ingest_extract +RESERVATION_TTL_SECONDS=600 +EMBEDDING_MODEL=text-embedding-3-small +``` -## How To Start Worker +## Run -### Local Python +### Local ```bash cd worker python -m venv .venv source .venv/bin/activate pip install -r requirements.txt -QUEUE_NAME=ingest_extract REDIS_URL=redis://localhost:6379/0 python worker.py +QUEUE_NAME=ingest_extract python worker.py ``` -Run index worker: +Run the index queue: ```bash -QUEUE_NAME=ingest_index REDIS_URL=redis://localhost:6379/0 python worker.py +QUEUE_NAME=ingest_index python worker.py ``` -### Docker Compose +Run both queues in one process: ```bash -docker-compose up worker-extract worker-index +QUEUE_NAME=ingest_extract,ingest_index python worker.py ``` -## Scaling Workers - -Current compose defaults: -- `worker-extract` replicas: `5` -- `worker-index` replicas: `3` - -Scaling options: -- increase replicas in compose/orchestrator -- split queue responsibilities by workload -- tune queue depth monitoring via RQ Dashboard (`:9181`) - -Recommended operational pattern: -- keep extract workers higher than index if extraction is I/O-bound -- keep index workers sized for embedding throughput and API rate limits - -## Failure Handling - -Expected model per locked architecture: -- job exceptions move document to `failed` -- retry transient failures with bounded retries/backoff -- keep idempotency by document/chunk hashes -- log structured context for postmortems - -Current implemented behavior: -- RQ worker loop is active -- ingestion retry/error transitions are not yet implemented in job stubs -- maintenance cleanup function returns affected row count for observability +### Docker Compose -## Shared Code Integration +```bash +docker-compose up worker-extract worker-index +``` -`worker/shared/` is intended to mirror/reuse server logic (models/config/core) to avoid divergence. +## Development Notes -Current repo state: -- shared package is present as placeholder -- compose mounts `./server/app` into `/app/shared` for practical code sharing during development +- worker jobs are stateful with respect to document status transitions; keep them explicit +- release reserved tokens on every failure path +- chunking currently lives in `jobs/ingest_index.py`, not in shared core yet +- `worker/shared/` exists for future code consolidation, but the active shared path in local compose is the mounted server app code +- if you add new jobs, ensure the API enqueues them by import path and that the queue name is wired in Compose