forked from endee-io/endee
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindexer.py
More file actions
110 lines (86 loc) · 3.69 KB
/
indexer.py
File metadata and controls
110 lines (86 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
indexer.py
Connects to the Endee vector database, creates an index, and upserts
document embeddings with metadata.
"""
import json
from pathlib import Path
from typing import List, Dict, Any
from endee import Endee, Precision
from embedder import Embedder
INDEX_NAME = "industrial_docs"
ENDEE_HOST = "http://localhost:8080"
DATA_FILE = Path(__file__).parent / "documents.json"
class DocumentIndexer:
"""Handles all Endee index creation and population."""
def __init__(self, host: str = ENDEE_HOST, auth_token: str = ""):
self.client = Endee(auth_token) if auth_token else Endee()
self.client.set_base_url(f"{host}/api/v1")
self.embedder = Embedder()
self.index_name = INDEX_NAME
# ------------------------------------------------------------------
# Index management
# ------------------------------------------------------------------
def index_exists(self) -> bool:
try:
indexes = self.client.list_indexes()
return any(idx.name == self.index_name for idx in indexes)
except Exception:
return False
def create_index(self, recreate: bool = False) -> None:
if self.index_exists():
if recreate:
print(f"[Indexer] Deleting existing index '{self.index_name}'...")
self.client.delete_index(self.index_name)
else:
print(f"[Indexer] Index '{self.index_name}' already exists. Skipping creation.")
return
print(f"[Indexer] Creating index '{self.index_name}' (dim={self.embedder.dimension})...")
self.client.create_index(
name=self.index_name,
dimension=self.embedder.dimension,
space_type="cosine",
precision=Precision.INT8, # memory-efficient
)
print("[Indexer] Index created successfully.")
# ------------------------------------------------------------------
# Document ingestion
# ------------------------------------------------------------------
def load_documents(self) -> List[Dict[str, Any]]:
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def build_embed_text(self, doc: Dict[str, Any]) -> str:
"""Concatenate title + content for richer semantic representation."""
return f"{doc['title']}. {doc['content']}"
def ingest(self, documents: List[Dict[str, Any]] | None = None) -> int:
if documents is None:
documents = self.load_documents()
index = self.client.get_index(name=self.index_name)
texts = [self.build_embed_text(d) for d in documents]
print(f"[Indexer] Generating embeddings for {len(texts)} documents...")
vectors = self.embedder.embed(texts)
items = []
for doc, vec in zip(documents, vectors):
items.append({
"id": doc["id"],
"vector": vec,
"meta": {
"title": doc["title"],
"category": doc["category"],
"content": doc["content"],
},
})
print(f"[Indexer] Upserting {len(items)} vectors into Endee...")
index.upsert(items)
print("[Indexer] Ingestion complete.")
return len(items)
# ------------------------------------------------------------------
# One-shot setup
# ------------------------------------------------------------------
def setup(self, recreate: bool = False) -> None:
"""Full pipeline: create index + ingest all documents."""
self.create_index(recreate=recreate)
self.ingest()
if __name__ == "__main__":
indexer = DocumentIndexer()
indexer.setup(recreate=True)