-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
205 lines (167 loc) · 5.28 KB
/
main.py
File metadata and controls
205 lines (167 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
CocoIndex Workspace - Main Entry Point
This is the main entry point for the CocoIndex workspace. It:
1. Registers all flow definitions
2. Provides a FastAPI server with search endpoints
3. Integrates with CocoInsight for visualization
Usage:
# CLI Commands
cocoindex ls main.py # List all flows
cocoindex setup main.py # Setup backend
cocoindex update main.py # Update index
cocoindex update main.py -L # Live updates
cocoindex server main.py -ci # Start server with CocoInsight
# FastAPI Server (alternative)
python main.py # Start FastAPI on port 8000
"""
import os
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
import cocoindex
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from psycopg_pool import ConnectionPool
# Import all flows to register them
from flows.text_embedding import text_embedding_flow, search as text_search
from flows.llm_extraction import llm_extraction_flow
# Connection pool for direct database queries
_pool: ConnectionPool | None = None
def get_pool() -> ConnectionPool:
"""Get the database connection pool."""
global _pool
if _pool is None:
_pool = ConnectionPool(os.environ["COCOINDEX_DATABASE_URL"])
return _pool
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""FastAPI lifespan manager - initialize CocoIndex on startup."""
load_dotenv()
cocoindex.init()
# Initialize connection pool
global _pool
_pool = ConnectionPool(os.environ["COCOINDEX_DATABASE_URL"])
yield
# Cleanup
if _pool:
_pool.close()
# Create FastAPI app
app = FastAPI(
title="CocoIndex Workspace API",
description="Search and query indexed documents using CocoIndex",
version="0.1.0",
lifespan=lifespan,
)
# Add CORS middleware for CocoInsight and local development
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://cocoindex.io",
"http://localhost:3000",
"http://localhost:8000",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def root() -> dict[str, str]:
"""Root endpoint with API information."""
return {
"name": "CocoIndex Workspace API",
"version": "0.1.0",
"docs": "/docs",
"flows": "/flows",
}
@app.get("/health")
def health_check() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}
@app.get("/flows")
def list_flows() -> dict[str, Any]:
"""List all registered flows and their status."""
from cocoindex.flow import flows
flow_list = []
for name, flow in flows().items():
flow_list.append({
"name": name,
"type": type(flow).__name__,
})
return {
"count": len(flow_list),
"flows": flow_list,
}
@app.get("/search")
def search_endpoint(
q: str = Query(..., description="Search query"),
limit: int = Query(5, description="Number of results", ge=1, le=50),
) -> dict[str, Any]:
"""
Search for documents similar to the query.
Uses the TextEmbedding flow's query handler to perform
semantic similarity search.
"""
try:
output = text_search(q, top_k=limit)
return {
"query": q,
"count": len(output.results),
"results": [
{
"filename": r["filename"],
"text": r["text"],
"score": r["score"],
}
for r in output.results
],
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Search failed: {str(e)}. Make sure you've run 'cocoindex update main.py' first.",
)
@app.get("/search/raw")
def search_raw_endpoint(
q: str = Query(..., description="Search query"),
limit: int = Query(5, description="Number of results", ge=1, le=50),
) -> dict[str, Any]:
"""
Search with full results including embeddings.
Returns the complete QueryOutput from the search handler.
"""
try:
output = text_search(q, top_k=limit)
return {
"query": q,
"count": len(output.results),
"results": output.results,
"query_info": {
"similarity_metric": output.query_info.similarity_metric.name
if output.query_info
else None,
},
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Search failed: {str(e)}",
)
def main() -> None:
"""Run the FastAPI server."""
load_dotenv()
cocoindex.init()
print("\n" + "=" * 60)
print("CocoIndex Workspace Server")
print("=" * 60)
print("\nEndpoints:")
print(" GET / - API information")
print(" GET /health - Health check")
print(" GET /flows - List flows")
print(" GET /search - Search documents")
print(" GET /docs - Swagger UI")
print("\nStarting server on http://0.0.0.0:8000")
print("=" * 60 + "\n")
uvicorn.run(app, host="0.0.0.0", port=8000)
if __name__ == "__main__":
main()