Skip to content

Commit f0ed493

Browse files
committed
feat: add document and embedding pipeline with hybrid search
Introduce a document system with chunking, embedding generation (Ollama/OpenAI), and async pipeline for semantic search. Refactor hybrid search from entity package into dedicated embedding package, replace concrete ChunkRepository with EmbeddingRepository, and add document REST API and MCP tools.
1 parent 26c949c commit f0ed493

36 files changed

+3434
-297
lines changed

cli/embed.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package cli
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"strings"
8+
9+
"github.com/MakeNowJust/heredoc"
10+
"github.com/raystack/compass/core/chunking"
11+
"github.com/raystack/compass/core/document"
12+
"github.com/raystack/compass/core/embedding"
13+
"github.com/raystack/compass/core/entity"
14+
"github.com/raystack/compass/core/namespace"
15+
"github.com/raystack/compass/internal/config"
16+
compassserver "github.com/raystack/compass/internal/server"
17+
"github.com/raystack/compass/store/postgres"
18+
"github.com/spf13/cobra"
19+
)
20+
21+
func embedCommand(cfg *config.Config) *cobra.Command {
22+
var embedType string
23+
var batchSize int
24+
25+
cmd := &cobra.Command{
26+
Use: "embed",
27+
Short: "Generate embeddings for entities and documents",
28+
Long: "Batch generate embeddings for all entities and/or documents. Requires an embedding provider configured.",
29+
Example: heredoc.Doc(`
30+
$ compass embed
31+
$ compass embed --type entity
32+
$ compass embed --type document
33+
$ compass embed --batch-size 50
34+
`),
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
compassserver.InitLogger(cfg.LogLevel)
37+
return runEmbed(cmd.Context(), cfg, embedType, batchSize)
38+
},
39+
}
40+
41+
cmd.Flags().StringVar(&embedType, "type", "all", "Type to embed: entity, document, or all")
42+
cmd.Flags().IntVar(&batchSize, "batch-size", 100, "Number of items to process per batch")
43+
44+
return cmd
45+
}
46+
47+
func runEmbed(ctx context.Context, cfg *config.Config, embedType string, batchSize int) error {
48+
if !cfg.Embedding.Enabled {
49+
return fmt.Errorf("embedding is not enabled in config")
50+
}
51+
52+
// Init embedding provider
53+
provider, err := initProvider(cfg.Embedding)
54+
if err != nil {
55+
return err
56+
}
57+
slog.Info("embedding provider initialized", "provider", provider.Name())
58+
59+
// Init postgres
60+
pgClient, err := postgres.NewClient(cfg.DB)
61+
if err != nil {
62+
return fmt.Errorf("connect to postgres: %w", err)
63+
}
64+
defer pgClient.Close()
65+
66+
embeddingRepo, err := postgres.NewEmbeddingRepository(pgClient)
67+
if err != nil {
68+
return err
69+
}
70+
71+
ns := namespace.DefaultNamespace
72+
73+
if embedType == "all" || embedType == "entity" {
74+
entityRepo, err := postgres.NewEntityRepository(pgClient)
75+
if err != nil {
76+
return err
77+
}
78+
if err := embedEntities(ctx, entityRepo, embeddingRepo, provider, ns, batchSize, cfg.Embedding); err != nil {
79+
return fmt.Errorf("embed entities: %w", err)
80+
}
81+
}
82+
83+
if embedType == "all" || embedType == "document" {
84+
docRepo, err := postgres.NewDocumentRepository(pgClient)
85+
if err != nil {
86+
return err
87+
}
88+
if err := embedDocuments(ctx, docRepo, embeddingRepo, provider, ns, batchSize, cfg.Embedding); err != nil {
89+
return fmt.Errorf("embed documents: %w", err)
90+
}
91+
}
92+
93+
slog.Info("embedding complete")
94+
return nil
95+
}
96+
97+
func embedEntities(ctx context.Context, entityRepo entity.Repository, embeddingRepo embedding.Repository,
98+
provider embedding.Provider, ns *namespace.Namespace, batchSize int, cfg config.EmbeddingConfig) error {
99+
100+
offset := 0
101+
total := 0
102+
103+
for {
104+
entities, err := entityRepo.GetAll(ctx, ns, entity.Filter{Size: batchSize, Offset: offset})
105+
if err != nil {
106+
return err
107+
}
108+
if len(entities) == 0 {
109+
break
110+
}
111+
112+
for _, ent := range entities {
113+
chunks := chunking.SerializeEntity(ent)
114+
if len(chunks) == 0 {
115+
continue
116+
}
117+
118+
text := chunks[0].Context + "\n\n" + chunks[0].Content
119+
vec, err := provider.Embed(ctx, text)
120+
if err != nil {
121+
slog.Error("failed to embed entity", "urn", ent.URN, "error", err)
122+
continue
123+
}
124+
125+
embs := []embedding.Embedding{{
126+
EntityURN: ent.URN,
127+
ContentID: ent.ID,
128+
ContentType: "entity",
129+
Content: chunks[0].Content,
130+
Context: chunks[0].Context,
131+
Vector: vec,
132+
Position: 0,
133+
Heading: chunks[0].Heading,
134+
TokenCount: chunking.EstimateTokens(chunks[0].Content),
135+
}}
136+
137+
if err := embeddingRepo.UpsertBatch(ctx, ns, embs); err != nil {
138+
slog.Error("failed to store entity embedding", "urn", ent.URN, "error", err)
139+
continue
140+
}
141+
total++
142+
}
143+
144+
slog.Info("embedded entities", "batch", offset/batchSize+1, "count", len(entities), "total", total)
145+
offset += batchSize
146+
147+
if len(entities) < batchSize {
148+
break
149+
}
150+
}
151+
152+
slog.Info("entity embedding complete", "total", total)
153+
return nil
154+
}
155+
156+
func embedDocuments(ctx context.Context, docRepo document.Repository, embeddingRepo embedding.Repository,
157+
provider embedding.Provider, ns *namespace.Namespace, batchSize int, cfg config.EmbeddingConfig) error {
158+
159+
docs, err := docRepo.GetAll(ctx, ns, document.Filter{Size: 10000})
160+
if err != nil {
161+
return err
162+
}
163+
164+
total := 0
165+
for _, doc := range docs {
166+
chunks := chunking.SplitDocument(doc.Title, doc.Body, chunking.Options{
167+
MaxTokens: cfg.MaxTokens,
168+
Overlap: cfg.Overlap,
169+
Title: doc.Title,
170+
})
171+
if len(chunks) == 0 {
172+
continue
173+
}
174+
175+
// Prepare texts for batch embedding
176+
texts := make([]string, len(chunks))
177+
for i, c := range chunks {
178+
if c.Context != "" {
179+
texts[i] = c.Context + "\n\n" + c.Content
180+
} else {
181+
texts[i] = c.Content
182+
}
183+
}
184+
185+
vectors, err := provider.EmbedBatch(ctx, texts)
186+
if err != nil {
187+
slog.Error("failed to embed document", "title", doc.Title, "entity_urn", doc.EntityURN, "error", err)
188+
continue
189+
}
190+
191+
embs := make([]embedding.Embedding, len(chunks))
192+
for i, c := range chunks {
193+
var vec []float32
194+
if i < len(vectors) {
195+
vec = vectors[i]
196+
}
197+
embs[i] = embedding.Embedding{
198+
EntityURN: doc.EntityURN,
199+
ContentID: doc.ID,
200+
ContentType: "document",
201+
Content: c.Content,
202+
Context: c.Context,
203+
Vector: vec,
204+
Position: c.Position,
205+
Heading: c.Heading,
206+
TokenCount: chunking.EstimateTokens(c.Content),
207+
}
208+
}
209+
210+
if err := embeddingRepo.UpsertBatch(ctx, ns, embs); err != nil {
211+
slog.Error("failed to store document embeddings", "title", doc.Title, "error", err)
212+
continue
213+
}
214+
total += len(embs)
215+
}
216+
217+
slog.Info("document embedding complete", "total_chunks", total, "documents", len(docs))
218+
return nil
219+
}
220+
221+
func initProvider(cfg config.EmbeddingConfig) (embedding.Provider, error) {
222+
switch strings.ToLower(cfg.Provider) {
223+
case "openai":
224+
if cfg.OpenAI.APIKey == "" {
225+
return nil, fmt.Errorf("openai api_key is required")
226+
}
227+
return embedding.NewOpenAI(cfg.OpenAI), nil
228+
case "ollama", "":
229+
return embedding.NewOllama(cfg.Ollama), nil
230+
default:
231+
return nil, fmt.Errorf("unknown embedding provider: %s", cfg.Provider)
232+
}
233+
}

cli/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func New(cliConfig *config.Config) *cobra.Command {
5757
configCommand(cliConfig),
5858
namespacesCommand(cliConfig),
5959
entitiesCommand(cliConfig),
60+
embedCommand(cliConfig),
6061
versionCmd(),
6162
)
6263

core/chunking/chunker.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package chunking
2+
3+
// Chunk is a text fragment ready for embedding.
4+
type Chunk struct {
5+
Content string // the text to embed
6+
Context string // contextual prefix for embedding quality
7+
Heading string // section heading
8+
Position int // ordering within parent
9+
}
10+
11+
// Options configures chunking behavior.
12+
type Options struct {
13+
MaxTokens int // target chunk size in tokens (default 512)
14+
Overlap int // overlap tokens between adjacent chunks (default 50)
15+
Title string // parent title for contextual prefix
16+
}
17+
18+
func (o Options) maxTokens() int {
19+
if o.MaxTokens <= 0 {
20+
return 512
21+
}
22+
return o.MaxTokens
23+
}
24+
25+
func (o Options) overlap() int {
26+
if o.Overlap <= 0 {
27+
return 50
28+
}
29+
return o.Overlap
30+
}
31+
32+
// EstimateTokens estimates the token count for a text string.
33+
// Uses a simple heuristic: ~1.3 tokens per word on average.
34+
func EstimateTokens(text string) int {
35+
words := 0
36+
inWord := false
37+
for _, r := range text {
38+
if r == ' ' || r == '\n' || r == '\t' || r == '\r' {
39+
inWord = false
40+
} else if !inWord {
41+
inWord = true
42+
words++
43+
}
44+
}
45+
return (words * 4) / 3
46+
}

core/chunking/serializer.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package chunking
2+
3+
import (
4+
"fmt"
5+
"sort"
6+
"strings"
7+
8+
"github.com/raystack/compass/core/entity"
9+
)
10+
11+
// SerializeEntity converts an entity's metadata into readable text for embedding.
12+
// Most entities produce a single chunk.
13+
func SerializeEntity(ent entity.Entity) []Chunk {
14+
var b strings.Builder
15+
16+
fmt.Fprintf(&b, "Name: %s\n", ent.Name)
17+
fmt.Fprintf(&b, "Type: %s\n", ent.Type)
18+
fmt.Fprintf(&b, "URN: %s\n", ent.URN)
19+
if ent.Source != "" {
20+
fmt.Fprintf(&b, "Source: %s\n", ent.Source)
21+
}
22+
if ent.Description != "" {
23+
fmt.Fprintf(&b, "Description: %s\n", ent.Description)
24+
}
25+
26+
if len(ent.Properties) > 0 {
27+
b.WriteString("Properties:\n")
28+
serializeProperties(&b, ent.Properties, " ")
29+
}
30+
31+
context := fmt.Sprintf("%s: %s (%s)", ent.Type, ent.Name, ent.URN)
32+
33+
return []Chunk{{
34+
Content: b.String(),
35+
Context: context,
36+
Heading: string(ent.Type) + ": " + ent.Name,
37+
Position: 0,
38+
}}
39+
}
40+
41+
func serializeProperties(b *strings.Builder, props map[string]interface{}, indent string) {
42+
// Sort keys for deterministic output
43+
keys := make([]string, 0, len(props))
44+
for k := range props {
45+
keys = append(keys, k)
46+
}
47+
sort.Strings(keys)
48+
49+
for _, k := range keys {
50+
v := props[k]
51+
switch val := v.(type) {
52+
case map[string]interface{}:
53+
fmt.Fprintf(b, "%s%s:\n", indent, k)
54+
serializeProperties(b, val, indent+" ")
55+
case []interface{}:
56+
serializeSlice(b, k, val, indent)
57+
default:
58+
fmt.Fprintf(b, "%s%s: %v\n", indent, k, v)
59+
}
60+
}
61+
}
62+
63+
func serializeSlice(b *strings.Builder, key string, items []interface{}, indent string) {
64+
if len(items) == 0 {
65+
return
66+
}
67+
68+
// Check if items are simple strings/numbers
69+
allSimple := true
70+
for _, item := range items {
71+
switch item.(type) {
72+
case string, float64, int, bool:
73+
default:
74+
allSimple = false
75+
}
76+
}
77+
78+
if allSimple {
79+
vals := make([]string, len(items))
80+
for i, item := range items {
81+
vals[i] = fmt.Sprintf("%v", item)
82+
}
83+
fmt.Fprintf(b, "%s%s: %s\n", indent, key, strings.Join(vals, ", "))
84+
return
85+
}
86+
87+
// Complex items (e.g., columns)
88+
fmt.Fprintf(b, "%s%s:\n", indent, key)
89+
for _, item := range items {
90+
if m, ok := item.(map[string]interface{}); ok {
91+
// Try to format as "name (type)" for column-like structures
92+
name, _ := m["name"].(string)
93+
typ, _ := m["type"].(string)
94+
if name != "" && typ != "" {
95+
fmt.Fprintf(b, "%s - %s (%s)\n", indent, name, typ)
96+
continue
97+
}
98+
}
99+
fmt.Fprintf(b, "%s - %v\n", indent, item)
100+
}
101+
}

0 commit comments

Comments
 (0)