This document provides a deep dive into the CocoIndex flows used in this project.
In CocoIndex, a flow is a data pipeline that:
- Reads data from sources
- Transforms the data through operations
- Exports results to targets
Flows are defined using Python decorators and are automatically managed by CocoIndex.
File: flows/text_embedding.py
This flow indexes documents for semantic vector search.
flowchart TD
A[LocalFile Source] -->|"Read .md, .txt, .py"| B[Raw Documents]
B -->|SplitRecursively| C[Text Chunks]
C -->|EmbedText| D[Vector Embeddings]
D -->|Collect| E[doc_embeddings]
E -->|Export| F[(PostgreSQL)]
subgraph "For each document"
B
C
D
end
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(
path="data/documents",
included_patterns=["**/*.md", "**/*.txt", "**/*.py"],
excluded_patterns=["**/.venv/**", "**/node_modules/**", ...],
),
refresh_interval=timedelta(seconds=5),
)Key settings:
- path: Directory to watch for documents
- included_patterns: File patterns to include (glob syntax)
- excluded_patterns: Patterns to exclude
- refresh_interval: How often to check for changes
Documents are split into overlapping chunks for better search results:
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown",
chunk_size=2000,
chunk_overlap=500,
)Why chunking?
- Large documents may exceed embedding model limits
- Smaller chunks give more precise search results
- Overlap ensures context isn't lost at chunk boundaries
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[NDArray[np.float32]]:
return text.transform(
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-small",
)
)Model details:
- Model: OpenAI
text-embedding-3-small - Dimensions: 1536
- Cost: $0.00002 per 1K tokens (very affordable)
doc_embeddings.export(
"doc_embeddings",
cocoindex.targets.Postgres(),
primary_key_fields=["filename", "location"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
)
],
)Index configuration:
- Primary key: Combination of filename and chunk location
- Vector index: HNSW index on embedding field
- Similarity metric: Cosine similarity
The flow includes a query handler for searching:
@text_embedding_flow.query_handler(
result_fields=cocoindex.QueryHandlerResultFields(
embedding=["embedding"],
score="score",
),
)
def search(query: str, top_k: int = 5) -> cocoindex.QueryOutput:
# Generate query embedding
query_vector = text_to_embedding.eval(query)
# Execute similarity search
with _connection_pool().connection() as conn:
cur.execute("""
SELECT filename, text, embedding, embedding <=> %s AS distance
FROM doc_embeddings
ORDER BY distance
LIMIT %s
""", (query_vector, top_k))
# ...Search algorithm:
- Convert query to embedding vector
- Use pgvector's
<=>operator for cosine distance - Return top-k most similar chunks
File: flows/llm_extraction.py
This flow extracts structured metadata from documents using an LLM.
flowchart TD
A[LocalFile Source] -->|"Read .md, .txt"| B[Raw Documents]
B -->|ExtractByLlm| C[DocumentMetadata]
C -->|compute_stats| D[DocumentStats]
D -->|Collect| E[extracted_metadata]
E -->|Export| F[(PostgreSQL)]
@dataclasses.dataclass
class DocumentMetadata:
title: str
summary: str
key_points: list[str]
topics: list[str]
document_type: str # article, tutorial, documentation, codedoc["metadata"] = doc["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.ANTHROPIC,
model="claude-3-haiku-20240307",
),
output_type=DocumentMetadata,
instruction="""
Output ONLY a JSON object with these fields...
""",
)
)Model selection:
- Claude 3 Haiku: Fast and cost-effective for structured extraction
- Alternative: Use
claude-3-5-sonnetfor better quality at higher cost
@cocoindex.op.function()
def compute_stats(metadata: DocumentMetadata) -> DocumentStats:
return DocumentStats(
num_key_points=len(metadata.key_points),
num_topics=len(metadata.topics),
)# Update all flows
cocoindex update main.py
# Update specific flow
cocoindex update main.py:TextEmbedding# Watch for changes and re-index automatically
cocoindex update main.py -L# Create database tables
cocoindex setup main.py -f# List all flows
cocoindex ls main.py
# Show flow details
cocoindex show main.py:TextEmbeddingIn flows/text_embedding.py, modify:
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-large", # Higher quality, more dimensions
)doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown",
chunk_size=1000, # Smaller chunks
chunk_overlap=200, # Less overlap
)cocoindex.sources.LocalFile(
path="data/documents",
included_patterns=[
"**/*.md",
"**/*.txt",
"**/*.py",
"**/*.js", # Add JavaScript
"**/*.ts", # Add TypeScript
],
)cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini", # Use OpenAI instead
)graph LR
subgraph "Flow Definition"
FD[flow_def decorator]
FB[FlowBuilder]
DS[DataScope]
end
subgraph "Sources"
LF[LocalFile]
API[API Source]
DB[Database Source]
end
subgraph "Transforms"
SR[SplitRecursively]
ET[EmbedText]
EL[ExtractByLlm]
end
subgraph "Targets"
PG[Postgres]
JSON[JSON File]
end
FD --> FB
FB --> LF
FB --> API
FB --> DB
LF --> SR
SR --> ET
ET --> PG
LF --> EL
EL --> PG
CocoIndex automatically batches API calls for efficiency.
Only changed documents are re-processed on subsequent runs.
Embeddings are cached to avoid redundant API calls.
Database connections are pooled for better performance.