Skip to content

Commit a4a8b32

Browse files
committed
docs: add in-depth worker documentation
Cover architecture, BERTopic pipeline, quality metrics, multilingual support, error handling, deployment, development workflow, and API contract. MDX format matching docs.dev.faculytics for future integration.
1 parent 0258124 commit a4a8b32

9 files changed

Lines changed: 886 additions & 0 deletions

File tree

docs/architecture.mdx

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
---
2+
title: "Architecture"
3+
description: "File structure, module responsibilities, and data flow through the topic modeling worker."
4+
---
5+
6+
## File Structure
7+
8+
```
9+
src/
10+
├── handler.py # RunPod entry point — validation, orchestration, error handling
11+
├── config.py # Constants: model name, device, version, default hyperparameters
12+
├── models.py # Pydantic request/response schemas (mirrors Zod DTOs in API)
13+
├── topic_model.py # BERTopic pipeline: UMAP → HDBSCAN → c-TF-IDF → KeyBERTInspired
14+
└── evaluate.py # Quality metrics: NPMI, diversity, silhouette, embedding coherence
15+
```
16+
17+
## Module Responsibilities
18+
19+
### `handler.py` — Entry Point
20+
21+
The RunPod serverless handler. Performs:
22+
23+
1. **Input parsing** — extracts `input` from the RunPod event envelope, validates with Pydantic
24+
2. **Parameter merging** — overlays request params onto RUN 012 defaults
25+
3. **Auto-scaling** — adjusts `min_topic_size` and `umap_n_neighbors` for small datasets
26+
4. **Validation** — checks minimum item count, embedding dimensionality, zero vectors
27+
5. **Orchestration** — calls `run_bertopic()`, `extract_topic_info()`, `get_assignments()`, `compute_metrics()`
28+
6. **Error routing** — domain errors return `status: "failed"` (no BullMQ retry); unexpected exceptions propagate to RunPod (triggers retry)
29+
30+
### `config.py` — Configuration
31+
32+
Static configuration, no environment variables:
33+
34+
```python
35+
LABSE_MODEL = "sentence-transformers/LaBSE"
36+
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
37+
WORKER_VERSION = "1.0.0"
38+
39+
# RUN 012 defaults — proven optimal from experimentation
40+
DEFAULT_PARAMS = {
41+
"min_topic_size": 15,
42+
"nr_topics": 20,
43+
"umap_n_neighbors": 20,
44+
"umap_n_components": 10,
45+
}
46+
```
47+
48+
### `models.py` — Schemas
49+
50+
Pydantic models that mirror the Zod schemas in `api.faculytics/src/modules/analysis/dto/topic-model-worker.dto.ts`. All models use `ConfigDict(extra="ignore")` to tolerate envelope fields (`jobId`, `version`, `type`, `metadata`, `publishedAt`) without validation errors.
51+
52+
### `topic_model.py` — BERTopic Pipeline
53+
54+
The core ML pipeline. See [Pipeline](/docs/pipeline) for details.
55+
56+
### `evaluate.py` — Quality Metrics
57+
58+
Computes five quality metrics on the fitted model. See [Metrics](/docs/metrics) for details.
59+
60+
## Data Flow
61+
62+
```mermaid
63+
flowchart TD
64+
A[RunPod event] --> B[handler.py]
65+
B --> C{Validate input}
66+
C -- Invalid --> D[Return status: failed]
67+
C -- Valid --> E[Auto-scale params for small datasets]
68+
E --> F[Extract texts, embeddings, submission IDs]
69+
F --> G{Zero vector check}
70+
G -- All zero --> D
71+
G -- Some zero --> H[Filter zero vectors]
72+
G -- None zero --> I[run_bertopic]
73+
H --> I
74+
I --> J[extract_topic_info]
75+
J --> K{0 topics?}
76+
K -- Yes --> D
77+
K -- No --> L[get_assignments]
78+
L --> M[compute_metrics]
79+
M --> N[Return TopicModelResponse]
80+
```
81+
82+
## Global State
83+
84+
The LaBSE model is loaded once at module import time (container start) and shared across all handler invocations:
85+
86+
```python
87+
embed_model = SentenceTransformer(LABSE_MODEL, device=DEVICE)
88+
```
89+
90+
This avoids cold-start latency on subsequent requests. The model is ~1.8 GB and is baked into the Docker image during build.

docs/contract.mdx

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
---
2+
title: "API Contract"
3+
description: "Request and response schemas for the topic modeling worker — field definitions, types, and examples."
4+
---
5+
6+
**Source of truth:** `api.faculytics/src/modules/analysis/dto/topic-model-worker.dto.ts` (Zod schemas)
7+
8+
**Worker schemas:** `src/models.py` (Pydantic, must stay in sync with Zod)
9+
10+
## Endpoint
11+
12+
`POST {TOPIC_MODEL_WORKER_URL}`
13+
14+
When deployed on RunPod, the actual endpoint is:
15+
16+
```
17+
POST https://api.runpod.ai/v2/<endpoint-id>/runsync
18+
Headers: { Authorization: Bearer <RUNPOD_API_KEY> }
19+
Body: { input: <request payload> }
20+
```
21+
22+
The RunPod envelope (`input` wrapper, `output` unwrapping) is handled by the API's `RunPodBatchProcessor`.
23+
24+
## Request
25+
26+
```json
27+
{
28+
"items": [
29+
{
30+
"submissionId": "uuid-string",
31+
"text": "The pace was too fast, couldn't follow along.",
32+
"embedding": [0.123, -0.456, 0.789, "... (768 floats)"]
33+
}
34+
],
35+
"params": {
36+
"min_topic_size": 15,
37+
"nr_topics": 20,
38+
"umap_n_neighbors": 20,
39+
"umap_n_components": 10
40+
}
41+
}
42+
```
43+
44+
### Request Fields
45+
46+
| Field | Type | Required | Default | Description |
47+
| --- | --- | --- | --- | --- |
48+
| `items` | array | Yes || Submissions that passed the sentiment gate |
49+
| `items[].submissionId` | string | Yes || Unique submission identifier |
50+
| `items[].text` | string | Yes || Pre-cleaned qualitative comment (`cleanedComment`) |
51+
| `items[].embedding` | number[768] | Yes || Pre-computed LaBSE 768-dim embedding |
52+
| `params` | object | No | RUN 012 defaults | BERTopic hyperparameters |
53+
| `params.min_topic_size` | int | No | 15 | Minimum documents per topic cluster |
54+
| `params.nr_topics` | int | No | 20 | Target topic count (merges until reached) |
55+
| `params.umap_n_neighbors` | int | No | 20 | UMAP local neighborhood size |
56+
| `params.umap_n_components` | int | No | 10 | UMAP output dimensions |
57+
58+
The worker uses `ConfigDict(extra="ignore")` on all Pydantic models, so additional envelope fields sent by the API (`jobId`, `version`, `type`, `metadata`, `publishedAt`) are silently ignored during validation.
59+
60+
## Response — Success
61+
62+
```json
63+
{
64+
"version": "1.0.0",
65+
"status": "completed",
66+
"topics": [
67+
{
68+
"topicIndex": 0,
69+
"rawLabel": "0_fast_rushed_pace",
70+
"keywords": ["fast", "rushed", "pace", "speed", "hurry", "quick", "follow", "slow", "behind", "catch"],
71+
"docCount": 45
72+
}
73+
],
74+
"assignments": [
75+
{
76+
"submissionId": "uuid-string",
77+
"topicIndex": 0,
78+
"probability": 0.7234
79+
}
80+
],
81+
"metrics": {
82+
"npmi_coherence": 0.1523,
83+
"topic_diversity": 0.8200,
84+
"outlier_ratio": 0.1150,
85+
"silhouette_score": 0.2341,
86+
"embedding_coherence": 0.6102
87+
},
88+
"outlierCount": 12,
89+
"completedAt": "2026-03-21T10:35:00.000Z"
90+
}
91+
```
92+
93+
## Response — Failure
94+
95+
```json
96+
{
97+
"version": "1.0.0",
98+
"status": "failed",
99+
"error": "Received 8 items, need at least 15 (min_topic_size) for topic modeling",
100+
"completedAt": "2026-03-21T10:35:00.000Z"
101+
}
102+
```
103+
104+
### Response Fields
105+
106+
| Field | Type | Present | Description |
107+
| --- | --- | --- | --- |
108+
| `version` | string | Always | Worker version (from `config.WORKER_VERSION`) |
109+
| `status` | `"completed"` \| `"failed"` | Always | Outcome status |
110+
| `topics` | array | On success | Discovered topic clusters |
111+
| `topics[].topicIndex` | int || BERTopic topic ID (0, 1, 2, ...) |
112+
| `topics[].rawLabel` | string || Auto-generated label (e.g., `"0_fast_rushed_pace"`) |
113+
| `topics[].keywords` | string[] || Top 10 keywords from KeyBERTInspired |
114+
| `topics[].docCount` | int || Documents in this cluster |
115+
| `assignments` | array | On success | Per-document topic assignments |
116+
| `assignments[].submissionId` | string || Matches input `submissionId` |
117+
| `assignments[].topicIndex` | int || Assigned topic index |
118+
| `assignments[].probability` | number (0-1) || Assignment confidence (4 decimal places) |
119+
| `metrics` | object | On success | Model quality metrics (see [Metrics](/docs/metrics)) |
120+
| `outlierCount` | int | On success | Documents assigned to topic -1 |
121+
| `error` | string | On failure | Human-readable error message |
122+
| `completedAt` | ISO datetime | Always | Processing completion timestamp |
123+
124+
## API-Side Processing
125+
126+
After receiving the response, the `TopicModelProcessor` in the API:
127+
128+
1. Validates the response against `topicModelWorkerResponseSchema` (Zod)
129+
2. Creates `Topic` entities for each topic (with `rawLabel`, `keywords`, `docCount`)
130+
3. Creates `TopicAssignment` entities — filters out assignments with probability ≤ 0.01
131+
4. Marks the highest-probability assignment per submission as `isDominant`
132+
5. Persists metrics on the `TopicModelRun` entity
133+
6. Calls the orchestrator to advance the pipeline to topic labeling
134+
135+
## Notes
136+
137+
- Outlier documents (topic -1) are **not** included in the `assignments` array
138+
- The `rawLabel` is later enriched with a human-readable `label` by the topic labeling stage (LLM)
139+
- Embeddings must be 768-dim LaBSE vectors — the same model used by the embedding worker

docs/deployment.mdx

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
---
2+
title: "Deployment"
3+
description: "Docker image build, RunPod serverless configuration, and production deployment."
4+
---
5+
6+
The worker is deployed as a RunPod serverless endpoint running on GPU instances.
7+
8+
## Docker Image
9+
10+
The Dockerfile uses RunPod's PyTorch base image with CUDA support:
11+
12+
```dockerfile
13+
FROM runpod/pytorch:2.4.0-py3.11-cuda12.4.1-devel-ubuntu22.04
14+
15+
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
16+
17+
WORKDIR /app
18+
19+
COPY pyproject.toml .python-version ./
20+
COPY uv.loc[k] ./
21+
22+
RUN uv sync --frozen --no-dev --no-install-project || uv sync --no-dev --no-install-project
23+
24+
# Bake LaBSE into image (~1.8 GB) to avoid cold-start download
25+
RUN uv run python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('sentence-transformers/LaBSE')"
26+
27+
COPY src/ src/
28+
29+
CMD ["uv", "run", "python", "-m", "src.handler"]
30+
```
31+
32+
### Key Build Decisions
33+
34+
- **LaBSE baked in** — the model is downloaded during build (`~1.8 GB`). This eliminates cold-start latency from model downloads on fresh containers.
35+
- **uv for dependency management** — faster than pip, with lockfile support. Falls back to non-frozen install if no lockfile exists.
36+
- **No dev dependencies**`--no-dev` keeps the image lean (no pytest, ruff).
37+
- **Source copied last** — Docker layer caching means dependency installation only reruns when `pyproject.toml` or `uv.lock` change.
38+
39+
### Building
40+
41+
```bash
42+
docker build -t topic-worker .
43+
```
44+
45+
The image is ~8-10 GB due to CUDA runtime + PyTorch + LaBSE model.
46+
47+
### Pushing to Registry
48+
49+
```bash
50+
docker tag topic-worker <registry>/topic-worker:latest
51+
docker push <registry>/topic-worker:latest
52+
```
53+
54+
## RunPod Configuration
55+
56+
### Serverless Endpoint Setup
57+
58+
1. Create a serverless endpoint on [RunPod](https://www.runpod.io/)
59+
2. Point it to the Docker image in your registry
60+
3. Configure GPU type (any CUDA-capable GPU works; 16GB+ VRAM recommended)
61+
4. Set the endpoint URL in the API's `.env`:
62+
63+
```bash
64+
TOPIC_MODEL_WORKER_URL=https://api.runpod.ai/v2/<endpoint-id>/runsync
65+
RUNPOD_API_KEY=<your-key>
66+
```
67+
68+
### Request Flow
69+
70+
```
71+
API → POST /v2/<endpoint-id>/runsync
72+
Body: { input: { items: [...], params: {...} } }
73+
Headers: { Authorization: Bearer <RUNPOD_API_KEY> }
74+
75+
RunPod → Starts container (or uses warm instance)
76+
→ Calls handler({ input: { items: [...], params: {...} } })
77+
78+
Worker → Returns result dict
79+
80+
RunPod → Wraps in { id, status: "COMPLETED", output: <result> }
81+
→ Returns to API
82+
```
83+
84+
### Scaling
85+
86+
| Setting | Recommended |
87+
| --- | --- |
88+
| Min workers | 0 (scale to zero when idle) |
89+
| Max workers | 1-2 (topic modeling is a batch operation, not high-throughput) |
90+
| Idle timeout | 30s (keep warm for short periods between pipeline stages) |
91+
| Execution timeout | 300s (matches `BULLMQ_TOPIC_MODEL_HTTP_TIMEOUT_MS`) |
92+
93+
## Configuration
94+
95+
The worker has no environment variables — all configuration is in `src/config.py`:
96+
97+
| Config | Value | Purpose |
98+
| --- | --- | --- |
99+
| `LABSE_MODEL` | `sentence-transformers/LaBSE` | Embedding model for KeyBERTInspired |
100+
| `DEVICE` | `cuda` or `cpu` (auto-detected) | PyTorch device |
101+
| `WORKER_VERSION` | `1.0.0` | Returned in responses, stored on `TopicModelRun.workerVersion` |
102+
| `DEFAULT_PARAMS` | RUN 012 values | Hyperparameter defaults |
103+
104+
## Dependencies
105+
106+
Core runtime dependencies (`pyproject.toml`):
107+
108+
| Package | Version | Purpose |
109+
| --- | --- | --- |
110+
| `runpod` | ≥ 1.7.0 | RunPod serverless handler framework |
111+
| `pydantic` | ≥ 2.0 | Request/response validation |
112+
| `sentence-transformers` | ≥ 3.0 | LaBSE model loading |
113+
| `bertopic` | ≥ 0.16.0 | Topic modeling pipeline |
114+
| `umap-learn` | ≥ 0.5.6 | Dimensionality reduction |
115+
| `hdbscan` | ≥ 0.8.33 | Density-based clustering |
116+
| `scikit-learn` | ≥ 1.4.0 | Silhouette score, CountVectorizer |
117+
| `gensim` | ≥ 4.3.0 | NPMI coherence computation |
118+
| `numpy` | ≥ 1.26.0 | Array operations |

0 commit comments

Comments
 (0)