Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
63ad1e4
refactor: remove ununsed notebook
mali-git Nov 18, 2025
23dfa98
docs: add tutorial for generating embeddings
mali-git Nov 18, 2025
32ea27d
chore: improve figure
mali-git Nov 18, 2025
cbfa05d
refactor: remove ununse notebook
mali-git Nov 18, 2025
bbedaee
chore: add notebooks
mali-git Nov 19, 2025
2009cad
feat: Implement paired threshold filtering for JSONL files
ajude2s Jan 20, 2026
7cc62aa
feat: Enhance threshold filter pipeline for paired JSONL processing
ajude2s Jan 20, 2026
2531ee8
feat: Add per-folder threshold overrides for score filtering in thres…
ajude2s Jan 21, 2026
6cf62c5
feat: implement quantile pipeline for JSONL filtering with Slurm support
AbasKhan Jan 21, 2026
f940ee0
feat: Remove deprecated configurations and classes; add new threshold…
ajude2s Jan 21, 2026
a95742f
feat: Remove paths_file parameter from threshold filter pipeline and …
ajude2s Jan 21, 2026
503af94
feat: Remove threshold filter pipeline configuration file
ajude2s Jan 22, 2026
15aa33f
Merge branch 'jsonl_filtering' into quantile_calculation
ajude2s Jan 22, 2026
c2b055c
refactor: move datatrove pipelines into data_pipelines and centralize…
AbasKhan Jan 23, 2026
afaa85d
feat: Add quantile data and tests for quantile pipeline functionality
AbasKhan Jan 24, 2026
6315355
Merge branch 'quantile_cal_refactor' into quantile_calculation
ajude2s Jan 26, 2026
23ad444
fix: Update selection quantile input logic to be consistent and not 1…
AbasKhan Jan 26, 2026
c063e92
refactor: Refactored threshold filter pipeline with configuration and…
ajude2s Jan 26, 2026
e11440d
chore: Renamed config folder for data ablations and added proper sub …
AbasKhan Jan 26, 2026
b16d08d
refactor: renamed the pipeline for calculating quantiles to properly …
AbasKhan Jan 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ slurm_settings:
sbatch_args:
account: "p_gptx"
nodes: 1
ntasks: 1
ntasks: 1 # i think its better to use nodes_per_task right under slurm settings and remove this line
gres: gpu:1
partition: "capella"
time: "04:00:00"
Expand Down
96 changes: 96 additions & 0 deletions configs/data_mixes/filtering/threshold_filter_pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Threshold filter pipeline config (builder-style)
# Used by: ml_filter.data_processing.jsonl_filtering.threshold_filter_pipeline.run_threshold_filter_pipeline

running_on_slurm: false

params:
# Input JSONL locations (paired mode)
text_input_dir: /raid/s3/opengptx/jude/repos/ml_filter/ml_filter/src/ml_filter/data/text_jsonl
scores_input_dir: /raid/s3/opengptx/jude/repos/ml_filter/ml_filter/src/ml_filter/data/score_jsonl
domains_input_dir: /raid/s3/opengptx/jude/repos/ml_filter/ml_filter/src/ml_filter/data/domains_jsonl
paths_file: /raid/s3/opengptx/jude/repos/ml_filter/ml_filter/src/ml_filter/data/score_jsonl/dummy_paths.txt
glob_pattern: "**/*.jsonl" # optional
recursive: true
compression: infer # infer | gzip | zstd | null

# Filtering
# - score_keys: keys we read + require to exist on each line
# - thresholds_by_score_key: only these keys are actually used for filtering
score_keys:
- score_Gemma_Snowflake
- score_Llama_Snowflake
thresholds_by_score_key:
score_Gemma_Snowflake: 0.0
score_Llama_Snowflake: 0.0
# Optional per-folder overrides (top-level folder names)
thresholds_by_folder:
Deu_Latn:
score_Gemma_Snowflake: 1.5
score_Llama_Snowflake: 1.5
Fra_Latn:
score_Gemma_Snowflake: 1.6
score_Llama_Snowflake: 1.6
Ita_Latn:
score_Gemma_Snowflake: 1.4
score_Llama_Snowflake: 1.4
Spa_Latn:
score_Gemma_Snowflake: 1.7
score_Llama_Snowflake: 1.7

# Document field names in the JSONL
text_jsonl_id_key: document_id
score_jsonl_id_key: document_id
text_jsonl_text_key: text
domain_jsonl_id_key: document_id
domain_jsonl_domain_key: domain
accepted_domains:
- wikipedia.org
- stackexchange.com

# Paired alignment error handling
on_mismatch: raise # raise | skip_line | skip_file
max_mismatches_per_file: 0

# Optional word-count filter
min_num_words: null
num_words_column: text

# Output
output_dir: /raid/s3/opengptx/jude/repos/ml_filter/ml_filter/outputs/threshold_filter_pipeline_local_dummy
output_filename: "${file_relpath}"

# ------------------------------------------------------------
# Execution settings: choose ONE block depending on running_on_slurm
# ------------------------------------------------------------

local_settings:
tasks: 1
local_tasks: 1
local_rank_offset: 0
workers: -1
logging_dir: null

# slurm_settings:
# tasks: 1
# time: "00:30:00"
# partition: "default"
# cpus_per_task: 4
# mem_per_cpu_gb: 8
# workers: -1
# job_name: "threshold_filter_pipeline"
# qos: "normal"
# env_command: null
# condaenv: null
# venv_path: null
# sbatch_args: null
# max_array_size: 1001
# depends_job_id: null
# job_id_position: -1
# logging_dir: null
# skip_completed: true
# slurm_logs_folder: null
# mail_type: "ALL"
# mail_user: null
# requeue: true
# srun_args: null
# tasks_per_job: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
running_on_slurm: false

params:
text_input_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_filtering/filters/data/text_jsonl
scores_input_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_filtering/filters/data/score_jsonl
domains_input_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_filtering/filters/data/domain_jsonl
glob_pattern: null
recursive: true
compression: null

# --- Filtering ---
score_keys:
- score_Gemma_Snowflake
- score_Llama_Snowflake
thresholds_by_score_key:
score_Gemma_Snowflake: 0.0
score_Llama_Snowflake: 0.0
thresholds_by_folder:
Deu_Latn:
score_Gemma_Snowflake: 1.5
score_Llama_Snowflake: 1.5
Fra_Latn:
score_Gemma_Snowflake: 1.6
score_Llama_Snowflake: 1.6
Ita_Latn:
score_Gemma_Snowflake: 1.4
score_Llama_Snowflake: 1.4
Spa_Latn:
score_Gemma_Snowflake: 1.7
score_Llama_Snowflake: 1.7

text_jsonl_id_key: id
score_jsonl_id_key: id
text_jsonl_text_key: text
domain_jsonl_id_key: id
domain_jsonl_domain_key: domain
accepted_domains:
- wikipedia.org
- stackexchange.com

# --- Optional: word-count filter ---
min_num_words: null
num_words_column: text

# --- Paired alignment error handling ---
on_mismatch: raise
max_mismatches_per_file: 0

# --- Output ---
output_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_filtering/filters/output/threshold_filter_pipeline_local_dummy
output_filename: "${file_relpath}"

local_settings:
tasks: 1
local_tasks: 1
local_rank_offset: 0
workers: -1
logging_dir: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
params:
input_dir: /raid/s3/opengptx/akhan/ml_filter/data/dummy_quantile_data
glob_pattern: "**/*.jsonl"
output_dir: /raid/s3/opengptx/akhan/ml_filter/outputs
compression: null
output_compression: null
score_fields: ["score_llama", "score_mistral", "score_gemma"]
selection_quantile: 0.80
report_filename: quantile_report.yaml
quantile_data_dir: quantile_data

running_on_slurm: false

local_settings:
tasks: 1 # world_size (number of ranks / shards)
local_tasks: 1
local_rank_offset: 0
workers: 1

slurm_settings: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
params:
input_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/annotations/annotated_data
glob_pattern: "**/*.jsonl"
output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/quantiles
compression: null
output_compression: null
score_fields: ["score_llama", "score_mistral", "score_gemma"]
selection_quantile: 0.9
report_filename: quantile_report.yaml
quantile_data_dir: quantile_data

running_on_slurm: true

local_settings: null

slurm_settings:
sbatch_args:
account: "p_gptx"
nodes: 1
ntasks: 1 # i think its better to use nodes_per_task right under slurm settings and remove this line
gres: gpu:1
partition: "capella"
time: "00:30:00"
cpus_per_task: 4
mem_per_cpu_gb: 8
job_name: "quantile_pipeline"
qos: "normal"
venv_path: /data/cat/ws/alju972f-regression_heads/repos/env/jql_pipeline/bin/activate
tasks: 1
workers: 1
18 changes: 0 additions & 18 deletions configs/data_processing/xlm_roberta_tokenize.yaml

This file was deleted.

49 changes: 45 additions & 4 deletions documentation/pipelines.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Embedding & Annotation Pipelines
# Embedding, Annotation, and Ablation Pipelines

This document explains how to generate model embeddings for large JSONL corpora and then run regression / classification heads to obtain annotation scores at scale using MLFilter's Datatrove-based pipelines.

Expand Down Expand Up @@ -37,7 +37,7 @@ Notes:

## Overview

The workflow consists of two sequential pipelines:
The workflow consists of two sequential pipelines plus optional ablation runs:

1. Embedding Pipeline (`run_embedding_pipeline`)
Reads raw JSONL documents, tokenizes & feeds them through an embedding model, and stores embeddings (optionally with labels) into per-source HDF5 files.
Expand Down Expand Up @@ -198,6 +198,47 @@ Per embedding source file: `${source_filename}.jsonl` written to:
```
Each line contains original metadata (from `output_keys`) plus head outputs (scores / predictions).

---
## Quantile Ablation Pipeline

Computes per-language JSONL score quantiles using per-row averaged scores and emits a YAML report.

### YAML Schema (`QuantilePipelineParameters`)

| Field | Type | Description |
|-------|------|-------------|
| `input_dir` | str | Directory containing JSONL files. |
| `glob_pattern` | str | Glob selecting which JSONL files to process (e.g. `*.jsonl`). |
| `output_dir` | path | Base output directory. |
| `compression` | str/None | Compression for input JSONL files (`infer`, `gzip`, `zstd`, `None`). |
| `score_fields` | list[str] | Score fields to average per document (e.g. `["score_llama", "score_mistral"]`). |
| `selection_quantile` | float | Top fraction to keep (e.g. `0.2` keeps top 20%). |
| `report_filename` | str | Filename for the YAML report (default `quantile_report.yaml`). |

Execution mode fields mirror other pipelines: `running_on_slurm`, `local_settings` or `slurm_settings`.

### Minimal Local Example
```yaml
running_on_slurm: false
params:
input_dir: data/jsonl
glob_pattern: "*.jsonl"
output_dir: outputs
compression: infer
output_compression: gzip
score_fields: ["score_llama", "score_mistral", "score_gemma"]
selection_quantile: 0.2
local_settings: {}
```

### Running
```bash
ml_filter run_quantile_pipeline --config_file_path configs/quantile_job.yaml
```

### Outputs
- YAML report at `<output_dir>/<report_filename>` (or per-rank when running on Slurm).

---
## Chaining the Pipelines

Expand Down Expand Up @@ -226,8 +267,8 @@ Each line contains original metadata (from `output_keys`) plus head outputs (sco
## Programmatic Usage Sketch
```python
from pathlib import Path
from ml_filter.annotation.embedding_pipeline import run_embedding_pipeline
from ml_filter.annotation.annotation_pipeline import run_annotation_pipeline
from ml_filter.data_pipelines.annotation.embedding_pipeline import run_embedding_pipeline
from ml_filter.data_pipelines.annotation.annotation_pipeline import run_annotation_pipeline

run_embedding_pipeline(Path("configs/embedding_job.yaml"))
run_annotation_pipeline(Path("configs/annotation_job.yaml"))
Expand Down
Loading