Skip to content

Commit 8537f16

Browse files
authored
feat: Document chunking capabilities to VLM/standard Docling pipelines using Docling's HybridChunker. (opendatahub-io#71)
##
1 parent b8d0fc3 commit 8537f16

12 files changed

Lines changed: 835 additions & 22 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
.venv
55
.env
66
venv
7+
diagrams/

kubeflow-pipelines/README.md

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,47 @@ Two KFP pipelines are included:
1717
- Two customizable pipelines to suit different needs:
1818
- Standard PDF pipeline (backends, OCR engines, table structure, image export)
1919
- VLM pipeline (Docling VLM or Granite-Vision pipeline options; remote VLM service supported)
20+
- **Optional document chunking** using Docling's HybridChunker
2021
- Multiple input sources: HTTP/S URLs or S3/S3-compatible APIs like MinIO
2122
- Secret-based configuration:
2223
- Remote VLM API configuration via a single mounted Kubernetes Secret
2324
- S3 endpoint and credentials via a single mounted Kubernetes Secret
2425
- Tunable performance and quality: threads, timeouts, OCR forcing, table mode, PDF backends, enrichments
2526
- Works on OpenShift AI/Kubeflow Pipelines
2627

28+
### Pipeline Architecture
29+
30+
The following diagram shows the overall pipeline flow with optional chunking:
31+
32+
![Pipeline Architecture](/assets/pipeline_architecture.png)
33+
34+
**Input path:**
35+
- **PDF Files**`import_pdfs``docling_convert_standard/vlm` → Markdown + Docling JSON
36+
37+
When `chunk_enabled=True`, the conversion output flows through `docling_chunk` to produce chunked JSON files for RAG workflows.
38+
2739
## 📦 File Structure
2840

2941
```bash
3042
kubeflow-pipelines
3143
|
32-
|- docling-standard
33-
| |- docling_convert_components.py
34-
| |- docling_convert_pipeline.py
35-
| |- docling_convert_pipeline_compiled.yaml (generated)
44+
|- common/
45+
| |- __init__.py
46+
| |- components.py # Shared components (import_pdfs, docling_chunk, etc.)
47+
| |- constants.py # Shared constants (base images)
48+
|
49+
|- docling-standard/
50+
| |- standard_components.py
51+
| |- standard_convert_pipeline.py
52+
| |- standard_convert_pipeline_compiled.yaml (generated)
53+
| |- local_run.py # Local testing script
3654
| |- requirements.txt
3755
|
38-
|- docling-vlm
39-
|- docling_convert_components.py
40-
|- docling_convert_pipeline.py
41-
|- docling_convert_pipeline_compiled.yaml (generated)
56+
|- docling-vlm/
57+
|- vlm_components.py
58+
|- vlm_convert_pipeline.py
59+
|- vlm_convert_pipeline_compiled.yaml (generated)
60+
|- local_run.py # Local testing script
4261
|- requirements.txt
4362
```
4463

@@ -158,6 +177,23 @@ If you'd like to consume documents stored in an S3-compatible object storage rat
158177
Toggle enrichments via boolean parameters:
159178
- `docling_enrich_code`, `docling_enrich_formula`, `docling_enrich_picture_classes`, `docling_enrich_picture_description`.
160179
180+
#### 7) Chunking converted documents
181+
182+
Both pipelines support optional document chunking using Docling's [HybridChunker](https://docling-project.github.io/docling/examples/hybrid_chunking/). This splits converted documents into smaller, semantically meaningful chunks ideal for RAG (Retrieval-Augmented Generation) workflows.
183+
184+
**Chunking parameters:**
185+
- `docling_chunk_enabled`: Set to `True` to enable chunking after conversion (default: `False`).
186+
- `docling_chunk_max_tokens`: Maximum tokens per chunk (default: `512`). Adjust based on your embedding model's context limit.
187+
- `docling_chunk_merge_peers`: If `True`, merge adjacent small chunks for better context (default: `True`).
188+
189+
**Tokenizer:** Chunking uses the `sentence-transformers/all-MiniLM-L6-v2` tokenizer for accurate token counting, ensuring chunks are sized appropriately for common embedding models.
190+
191+
**Chunked output location:**
192+
When chunking is enabled, an additional output file is created for each converted document:
193+
- **Filename format**: `{original_name}_chunks.jsonl`
194+
- **Location**: Same output directory as the converted `.json` and `.md` files
195+
- To find the output location, check the Graph of your pipeline Run, click the _docling-chunk_ box, and look in the _Output artifacts_ section.
196+
161197
## 🔧 Advanced customizations
162198
163199
- Increase `num_splits` to **parallelize** across more workers (uses KFP `ParallelFor`).

kubeflow-pipelines/common/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@
66
"""
77

88
# Import all common components to make them easily accessible
9-
from .components import create_pdf_splits, download_docling_models, import_pdfs
9+
from .components import (
10+
create_pdf_splits,
11+
docling_chunk,
12+
download_docling_models,
13+
import_pdfs,
14+
)
1015
from .constants import DOCLING_BASE_IMAGE, PYTHON_BASE_IMAGE
1116

1217
__all__ = [
1318
"import_pdfs",
1419
"create_pdf_splits",
1520
"download_docling_models",
21+
"docling_chunk",
1622
"PYTHON_BASE_IMAGE",
1723
"DOCLING_BASE_IMAGE",
1824
]

kubeflow-pipelines/common/components.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,172 @@ def download_docling_models(
241241
raise ValueError(
242242
f"Invalid pipeline_type: {pipeline_type}. Must be 'standard' or 'vlm'"
243243
)
244+
245+
246+
@dsl.component(
247+
base_image=DOCLING_BASE_IMAGE,
248+
)
249+
def docling_chunk(
250+
input_path: dsl.Input[dsl.Artifact],
251+
output_path: dsl.Output[dsl.Artifact],
252+
max_tokens: int = 512,
253+
merge_peers: bool = True,
254+
):
255+
"""
256+
Chunk Docling documents using HybridChunker. Takes converted docling JSON files as input
257+
and produces chunked JSONL files with semantic chunks suitable for RAG.
258+
259+
Output format is JSONL (one JSON object per line) for easy inspection and streaming.
260+
261+
Args:
262+
input_path: Path to the input directory containing Docling JSON files
263+
output_path: Path to the output directory for the chunked JSONL files
264+
max_tokens: Maximum number of tokens per chunk
265+
merge_peers: Whether to merge smaller chunks at the same level
266+
"""
267+
import json # pylint: disable=import-outside-toplevel
268+
from datetime import datetime, timezone # pylint: disable=import-outside-toplevel
269+
from pathlib import Path # pylint: disable=import-outside-toplevel
270+
271+
# HybridChunker = Docling's smart chunking class that combines:
272+
# 1. Document structure awareness
273+
# 2. Token-based splitting
274+
from docling.chunking import HybridChunker # pylint: disable=import-outside-toplevel
275+
from docling_core.transforms.chunker.tokenizer.huggingface import (
276+
HuggingFaceTokenizer,
277+
) # pylint: disable=import-outside-toplevel
278+
from docling_core.types import DoclingDocument # pylint: disable=import-outside-toplevel
279+
from transformers import AutoTokenizer # pylint: disable=import-outside-toplevel
280+
281+
# Convert KFP artifact paths to Path objects
282+
input_path_p = Path(input_path.path)
283+
output_path_p = Path(output_path.path)
284+
output_path_p.mkdir(parents=True, exist_ok=True)
285+
286+
# Initialize tokenizer for HybridChunker (new API)
287+
# Using a lightweight sentence-transformer model for tokenization
288+
EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
289+
try:
290+
hf_tokenizer = AutoTokenizer.from_pretrained(
291+
EMBED_MODEL_ID,
292+
resume_download=True,
293+
timeout=60,
294+
)
295+
print(f"docling-chunk: loaded tokenizer from {EMBED_MODEL_ID}", flush=True)
296+
except Exception as e:
297+
print(f"docling-chunk: ERROR loading tokenizer: {e}", flush=True)
298+
raise RuntimeError(
299+
f"Failed to load tokenizer model {EMBED_MODEL_ID}. "
300+
"Ensure network access to HuggingFace Hub or pre-download the model."
301+
) from e
302+
303+
tokenizer = HuggingFaceTokenizer(
304+
tokenizer=hf_tokenizer,
305+
max_tokens=max_tokens,
306+
)
307+
308+
# Initialize Hybrid chunker with user-specified parameters
309+
# tokenizer: The tokenizer wrapper to use for counting tokens (includes max_tokens)
310+
# merge_peers: if true, smaller adjacent chunks will be merged together
311+
chunker = HybridChunker(
312+
tokenizer=tokenizer,
313+
merge_peers=merge_peers,
314+
)
315+
316+
# Find all JSON files in the input directory
317+
json_files = list(input_path_p.glob("*.json"))
318+
if not json_files:
319+
print(f"docling-chunk: No JSON files found in {input_path_p}", flush=True)
320+
return
321+
322+
print(
323+
f"docling-chunk: processing {len(json_files)} files with max_tokens={max_tokens} and merge_peers={merge_peers}",
324+
flush=True,
325+
)
326+
327+
# Track processing results
328+
processed_count = 0
329+
skipped_files = []
330+
331+
# Process each file
332+
for json_file in json_files:
333+
print(f"docling-chunk: processing {json_file}", flush=True)
334+
335+
# Load and validate the JSON file
336+
try:
337+
with open(json_file, "r", encoding="utf-8") as f:
338+
doc_data = json.load(f)
339+
except json.JSONDecodeError as e:
340+
print(
341+
f"docling-chunk: skipping {json_file.name} - invalid JSON: {e}",
342+
flush=True,
343+
)
344+
skipped_files.append((json_file.name, f"invalid JSON: {e}"))
345+
continue
346+
347+
# Parse the JSON data into a DoclingDocument object
348+
# This validates that the JSON conforms to the DoclingDocument schema
349+
try:
350+
doc = DoclingDocument.model_validate(doc_data)
351+
except Exception as e:
352+
# Catches pydantic.ValidationError and any other validation issues
353+
print(
354+
f"docling-chunk: skipping {json_file.name} - not a valid DoclingDocument: {e}",
355+
flush=True,
356+
)
357+
skipped_files.append((json_file.name, f"validation failed: {e}"))
358+
continue
359+
360+
# Chunk the document using HybridChunker
361+
chunks = list(chunker.chunk(dl_doc=doc))
362+
363+
# Generate output filename: original_name_chunks.jsonl
364+
output_filename = f"{json_file.stem}_chunks.jsonl"
365+
output_file = output_path_p / output_filename
366+
367+
# Get current timestamp in ISO format
368+
timestamp = datetime.now(timezone.utc).isoformat()
369+
370+
# Chunking config (for reproducibility)
371+
chunking_config = {
372+
"max_tokens": max_tokens,
373+
"merge_peers": merge_peers,
374+
"tokenizer_model": EMBED_MODEL_ID,
375+
}
376+
377+
# Write chunks as JSONL (one JSON object per line)
378+
with open(output_file, "w", encoding="utf-8") as f:
379+
for idx, chunk in enumerate(chunks):
380+
# Get contextualized text for this chunk
381+
chunk_text = chunker.contextualize(chunk=chunk)
382+
383+
# Build the chunk object
384+
chunk_obj = {
385+
"timestamp": timestamp,
386+
"source_document": json_file.name,
387+
"chunk_index": idx,
388+
"chunking_config": chunking_config,
389+
"text": chunk_text,
390+
}
391+
392+
# Write as a single line of JSON
393+
f.write(json.dumps(chunk_obj, ensure_ascii=False) + "\n")
394+
395+
print(
396+
f"docling-chunk: saved {len(chunks)} chunks to {output_filename}",
397+
flush=True,
398+
)
399+
processed_count += 1
400+
401+
# Report summary
402+
print(
403+
f"docling-chunk: done - processed {processed_count}/{len(json_files)} files",
404+
flush=True,
405+
)
406+
if skipped_files:
407+
print(
408+
f"docling-chunk: skipped {len(skipped_files)} invalid files:",
409+
flush=True,
410+
)
411+
for filename, reason in skipped_files:
412+
print(f" - {filename}: {reason}", flush=True)

kubeflow-pipelines/docling-standard/README.md

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,39 @@ The following configuration options are available as KFP parameters when you _Cr
2727
- `pdf_filenames`: List of PDF file names to process, separated by commas.
2828
- `pdf_from_s3`: If `True`, PDF files will be fetched from an S3-compatible object storage rather than `pdf_base_url`. A secret must be configured as described in [docs](../README.md).
2929

30+
### Chunking options
31+
32+
Optional document chunking using Docling's [HybridChunker](https://docling-project.github.io/docling/examples/hybrid_chunking/):
33+
34+
- `docling_chunk_enabled`: If `True`, chunk converted documents into smaller pieces (default: `False`).
35+
- `docling_chunk_max_tokens`: Maximum tokens per chunk (default: `512`).
36+
- `docling_chunk_merge_peers`: If `True`, merge adjacent small chunks for better context (default: `True`).
37+
38+
Chunking uses the `sentence-transformers/all-MiniLM-L6-v2` tokenizer for accurate token counting.
39+
40+
**Chunked output**: When enabled, creates `{filename}_chunks.jsonl` files (one JSON object per line) in the same output directory as the converted documents. See [main docs](../README.md) for output format details.
41+
42+
## Local testing
43+
44+
You can test the pipeline locally using Docker before deploying to KFP.
45+
46+
### Prerequisites
47+
48+
```bash
49+
pip install docker kfp
50+
```
51+
52+
Requires a Docker-compatible daemon (Docker or Podman socket).
53+
54+
### Run locally
55+
56+
```bash
57+
cd data-processing/kubeflow-pipelines/docling-standard
58+
python local_run.py
59+
```
60+
61+
This runs `convert_pipeline_local()` which converts PDFs and chunks the output.
62+
3063
## Compiling from source
3164

3265
### Clone repository, create venv, install dependencies
@@ -45,5 +78,4 @@ This generates `standard_convert_pipeline_compiled.yaml`:
4578

4679
```bash
4780
python standard_convert_pipeline.py
48-
```
49-
81+
```

kubeflow-pipelines/docling-standard/local_run.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
from typing import List
44

55
sys.path.insert(0, str(Path(__file__).parent.parent))
6-
from common import create_pdf_splits, download_docling_models, import_pdfs
6+
from common import (
7+
create_pdf_splits,
8+
docling_chunk,
9+
download_docling_models,
10+
import_pdfs,
11+
)
712
from kfp import dsl, local
813
from standard_components import docling_convert_standard
914

@@ -15,6 +20,9 @@ def take_first_split(splits: List[List[str]]) -> List[str]:
1520

1621
@dsl.pipeline()
1722
def convert_pipeline_local():
23+
"""
24+
Local pipeline for testing standard conversion with chunking.
25+
"""
1826
importer = import_pdfs(
1927
filenames="2203.01017v2.pdf,2206.01062.pdf",
2028
base_url="https://github.com/docling-project/docling/raw/v2.43.0/tests/data/pdf",
@@ -32,15 +40,20 @@ def convert_pipeline_local():
3240

3341
first_split = take_first_split(splits=pdf_splits.output)
3442

35-
docling_convert_standard(
43+
converter = docling_convert_standard(
3644
input_path=importer.outputs["output_path"],
3745
artifacts_path=artifacts.outputs["output_path"],
3846
pdf_filenames=first_split.output,
3947
)
4048

49+
docling_chunk(
50+
input_path=converter.outputs["output_path"],
51+
max_tokens=512,
52+
merge_peers=True,
53+
)
54+
4155

4256
def main() -> None:
43-
# Requires: pip install docker; and a Docker-compatible daemon (Docker or Podman socket)
4457
local.init(runner=local.DockerRunner())
4558
convert_pipeline_local()
4659

0 commit comments

Comments
 (0)