Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ type collectionList map[string]*rag.PersistentKB

var collections = collectionList{}

// lookupCollection returns the cached collection KB for name. If the cache
// holds a placeholder (nil entry — engine init failed at startup, e.g. the
// embedding service was momentarily unreachable when iterating over
// existing collections in registerAPIRoutes), it attempts to re-initialise
// the engine now so a transient outage doesn't permanently 404 a collection
// that still has data on disk / in the vector DB. Returns (nil, false) only
// when the collection isn't known on disk at all, or when re-init still
// fails. The package-level variable is set in registerAPIRoutes; before
// that function runs no handlers exist to call it.
var lookupCollection func(name string) (*rag.PersistentKB, bool)

// APIResponse represents a standardized API response
type APIResponse struct {
Success bool `json:"success"`
Expand Down Expand Up @@ -64,10 +75,19 @@ func errorResponse(code string, message string, details string) APIResponse {
}
}

// newVectorEngine constructs the underlying RAG store for a collection.
// Returns the constructed KB on success, or an error explaining the failure
// (configuration error, transient embedding/DB outage, etc.). Callers
// decide how to react — at startup we register a placeholder and continue;
// for runtime requests we surface the failure as a 502 so the caller can
// retry once the backend is healthy. Previously this function called
// os.Exit on any error, which crash-looped the server during transient
// embedding outages and crashed the whole process on a single bad
// runtime request.
func newVectorEngine(
vectorEngineType string,
llmClient *openai.Client,
apiURL, apiKey, collectionName, dbPath, embeddingModel string, maxChunkSize, chunkOverlap int) *rag.PersistentKB {
apiURL, apiKey, collectionName, dbPath, embeddingModel string, maxChunkSize, chunkOverlap int) (*rag.PersistentKB, error) {
var (
kb *rag.PersistentKB
err error
Expand All @@ -82,38 +102,57 @@ func newVectorEngine(
case "postgres":
databaseURL := os.Getenv("DATABASE_URL")
if databaseURL == "" {
xlog.Error("DATABASE_URL is required for PostgreSQL engine")
os.Exit(1)
return nil, fmt.Errorf("DATABASE_URL is required for postgres engine")
}
xlog.Info("PostgreSQL collection", "collectionName", collectionName, "databaseURL", databaseURL)
kb, err = rag.NewPersistentPostgresCollection(llmClient, collectionName, dbPath, fileAssets, embeddingModel, maxChunkSize, chunkOverlap, databaseURL)
default:
xlog.Error("Unknown vector engine", "engine", vectorEngineType)
os.Exit(1)
return nil, fmt.Errorf("unknown vector engine: %q", vectorEngineType)
}

// LocalRecall standalone server: a startup-time engine init failure is
// non-recoverable in this mode (no fallback or retry path), so preserve
// the original fail-fast behavior. Embedded callers (LocalAGI/LocalAI)
// get the error directly from the rag.NewPersistent* constructors and
// can choose to degrade gracefully.
if err != nil {
xlog.Error("Failed to create collection", "engine", vectorEngineType, "collection", collectionName, "error", err)
os.Exit(1)
return nil, fmt.Errorf("creating %s collection %q: %w", vectorEngineType, collectionName, err)
}
return kb
return kb, nil
}

// API routes for managing collections
func registerAPIRoutes(e *echo.Echo, openAIClient *openai.Client, maxChunkingSize, chunkOverlap int, apiKeys []string) {

// Load all collections
// Load all on-disk collections. Init failures (e.g. embedding service
// briefly unreachable) no longer crash the server: register a nil
// placeholder so lookupCollection can rehydrate lazily on first use.
colls := rag.ListAllCollections(collectionDBPath)
for _, c := range colls {
collection := newVectorEngine(vectorEngine, openAIClient, openAIBaseURL, openAIKey, c, collectionDBPath, embeddingModel, maxChunkingSize, chunkOverlap)
collection, err := newVectorEngine(vectorEngine, openAIClient, openAIBaseURL, openAIKey, c, collectionDBPath, embeddingModel, maxChunkingSize, chunkOverlap)
if err != nil {
xlog.Error("Failed to load collection at startup; will retry lazily on first request",
"collection", c, "engine", vectorEngine, "error", err)
}
collections[c] = collection
// Register the collection with the source manager
sourceManager.RegisterCollection(c, collection)
if collection != nil {
sourceManager.RegisterCollection(c, collection)
}
}

lookupCollection = func(name string) (*rag.PersistentKB, bool) {
kb, exists := collections[name]
if !exists {
return nil, false
}
if kb != nil {
return kb, true
}
// Placeholder: collection is known on disk but its engine wrapper
// failed to construct earlier. Try again now.
kb, err := newVectorEngine(vectorEngine, openAIClient, openAIBaseURL, openAIKey, name, collectionDBPath, embeddingModel, maxChunkingSize, chunkOverlap)
if err != nil {
xlog.Error("Failed to rehydrate collection on demand",
"collection", name, "engine", vectorEngine, "error", err)
return nil, false
}
collections[name] = kb
sourceManager.RegisterCollection(name, kb)
return kb, true
}

if len(apiKeys) > 0 {
Expand Down Expand Up @@ -161,7 +200,16 @@ func createCollection(collections collectionList, client *openai.Client, embeddi
return c.JSON(http.StatusBadRequest, errorResponse(ErrCodeInvalidRequest, "Invalid request", err.Error()))
}

collection := newVectorEngine(vectorEngine, client, openAIBaseURL, openAIKey, r.Name, collectionDBPath, embeddingModel, maxChunkingSize, chunkOverlap)
// If the engine can't construct the collection right now (transient
// embedding/DB outage, misconfiguration, …), surface that as 502 so
// the caller can retry. Returning success and storing a nil entry
// would leave the caller with a permanently-broken collection.
collection, err := newVectorEngine(vectorEngine, client, openAIBaseURL, openAIKey, r.Name, collectionDBPath, embeddingModel, maxChunkingSize, chunkOverlap)
if err != nil {
xlog.Error("Failed to create collection",
"collection", r.Name, "engine", vectorEngine, "error", err)
return c.JSON(http.StatusBadGateway, errorResponse(ErrCodeInternalError, "Vector backend unavailable", err.Error()))
}
collections[r.Name] = collection

// Register the new collection with the source manager
Expand All @@ -178,7 +226,7 @@ func createCollection(collections collectionList, client *openai.Client, embeddi
func deleteEntryFromCollection(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand Down Expand Up @@ -209,7 +257,7 @@ func deleteEntryFromCollection(collections collectionList) func(c echo.Context)
func reset(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand All @@ -231,7 +279,7 @@ func reset(collections collectionList) func(c echo.Context) error {
func search(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand Down Expand Up @@ -272,7 +320,7 @@ func search(collections collectionList) func(c echo.Context) error {
func listFiles(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand All @@ -297,7 +345,7 @@ func listFiles(collections collectionList) func(c echo.Context) error {
func getEntryContent(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand Down Expand Up @@ -333,7 +381,7 @@ func getEntryContent(collections collectionList) func(c echo.Context) error {
func getEntryRawFile(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand All @@ -357,7 +405,7 @@ func getEntryRawFile(collections collectionList) func(c echo.Context) error {
func uploadFile(collections collectionList, fileAssets string) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
xlog.Error("Collection not found")
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
Expand Down Expand Up @@ -435,7 +483,7 @@ func listCollections(c echo.Context) error {
func registerExternalSource(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand Down Expand Up @@ -501,7 +549,7 @@ func removeExternalSource(collections collectionList) func(c echo.Context) error
func listSources(collections collectionList) func(c echo.Context) error {
return func(c echo.Context) error {
name := c.Param("name")
collection, exists := collections[name]
collection, exists := lookupCollection(name)
if !exists {
return c.JSON(http.StatusNotFound, errorResponse(ErrCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name)))
}
Expand Down
Loading