@@ -4,10 +4,15 @@ import (
44 "context"
55 "encoding/json"
66 "fmt"
7+ "sync"
78 "time"
89
910 "github.com/google/uuid"
1011 "github.com/mark3labs/mcp-go/mcp"
12+ "go.opentelemetry.io/otel"
13+ "go.opentelemetry.io/otel/attribute"
14+ "go.opentelemetry.io/otel/codes"
15+ "go.opentelemetry.io/otel/trace"
1116
1217 "github.com/stacklok/toolhive/pkg/logger"
1318 "github.com/stacklok/toolhive/pkg/optimizer/db"
@@ -47,6 +52,11 @@ type Service struct {
4752 tokenCounter * tokens.Counter
4853 backendServerOps * db.BackendServerOps
4954 backendToolOps * db.BackendToolOps
55+ tracer trace.Tracer
56+
57+ // Embedding time tracking
58+ embeddingTimeMu sync.Mutex
59+ totalEmbeddingTime time.Duration
5060}
5161
5262// NewService creates a new ingestion service
@@ -80,27 +90,58 @@ func NewService(config *Config) (*Service, error) {
8090 // Initialize token counter
8191 tokenCounter := tokens .NewCounter ()
8292
83- // Create chromem-go embeddingFunc from our embedding manager
84- embeddingFunc := func (_ context.Context , text string ) ([]float32 , error ) {
93+ // Initialize tracer
94+ tracer := otel .Tracer ("github.com/stacklok/toolhive/pkg/optimizer/ingestion" )
95+
96+ svc := & Service {
97+ config : config ,
98+ database : database ,
99+ embeddingManager : embeddingManager ,
100+ tokenCounter : tokenCounter ,
101+ tracer : tracer ,
102+ totalEmbeddingTime : 0 ,
103+ }
104+
105+ // Create chromem-go embeddingFunc from our embedding manager with tracing
106+ embeddingFunc := func (ctx context.Context , text string ) ([]float32 , error ) {
107+ // Create a span for embedding calculation
108+ _ , span := svc .tracer .Start (ctx , "optimizer.ingestion.calculate_embedding" ,
109+ trace .WithAttributes (
110+ attribute .String ("operation" , "embedding_calculation" ),
111+ ))
112+ defer span .End ()
113+
114+ start := time .Now ()
115+
85116 // Our manager takes a slice, so wrap the single text
86117 embeddingsResult , err := embeddingManager .GenerateEmbedding ([]string {text })
87118 if err != nil {
119+ span .RecordError (err )
120+ span .SetStatus (codes .Error , err .Error ())
88121 return nil , err
89122 }
90123 if len (embeddingsResult ) == 0 {
91- return nil , fmt .Errorf ("no embeddings generated" )
124+ err := fmt .Errorf ("no embeddings generated" )
125+ span .RecordError (err )
126+ span .SetStatus (codes .Error , err .Error ())
127+ return nil , err
92128 }
129+
130+ // Track embedding time
131+ duration := time .Since (start )
132+ svc .embeddingTimeMu .Lock ()
133+ svc .totalEmbeddingTime += duration
134+ svc .embeddingTimeMu .Unlock ()
135+
136+ span .SetAttributes (
137+ attribute .Int64 ("embedding.duration_ms" , duration .Milliseconds ()),
138+ )
139+
93140 return embeddingsResult [0 ], nil
94141 }
95142
96- svc := & Service {
97- config : config ,
98- database : database ,
99- embeddingManager : embeddingManager ,
100- tokenCounter : tokenCounter ,
101- backendServerOps : db .NewBackendServerOps (database , embeddingFunc ),
102- backendToolOps : db .NewBackendToolOps (database , embeddingFunc ),
103- }
143+ svc .backendServerOps = db .NewBackendServerOps (database , embeddingFunc )
144+ svc .backendToolOps = db .NewBackendToolOps (database , embeddingFunc )
104145
105146 logger .Info ("Ingestion service initialized for event-driven ingestion (chromem-go)" )
106147 return svc , nil
@@ -129,6 +170,16 @@ func (s *Service) IngestServer(
129170 description * string ,
130171 tools []mcp.Tool ,
131172) error {
173+ // Create a span for the entire ingestion operation
174+ ctx , span := s .tracer .Start (ctx , "optimizer.ingestion.ingest_server" ,
175+ trace .WithAttributes (
176+ attribute .String ("server.id" , serverID ),
177+ attribute .String ("server.name" , serverName ),
178+ attribute .Int ("tools.count" , len (tools )),
179+ ))
180+ defer span .End ()
181+
182+ start := time .Now ()
132183 logger .Infof ("Ingesting server: %s (%d tools) [serverID=%s]" , serverName , len (tools ), serverID )
133184
134185 // Create backend server record (simplified - vMCP manages lifecycle)
@@ -144,25 +195,51 @@ func (s *Service) IngestServer(
144195
145196 // Create or update server (chromem-go handles embeddings)
146197 if err := s .backendServerOps .Update (ctx , backendServer ); err != nil {
198+ span .RecordError (err )
199+ span .SetStatus (codes .Error , err .Error ())
147200 return fmt .Errorf ("failed to create/update server %s: %w" , serverName , err )
148201 }
149202 logger .Debugf ("Created/updated server: %s" , serverName )
150203
151204 // Sync tools for this server
152205 toolCount , err := s .syncBackendTools (ctx , serverID , serverName , tools )
153206 if err != nil {
207+ span .RecordError (err )
208+ span .SetStatus (codes .Error , err .Error ())
154209 return fmt .Errorf ("failed to sync tools for %s: %w" , serverName , err )
155210 }
156211
157- logger .Infof ("Successfully ingested server %s with %d tools" , serverName , toolCount )
212+ duration := time .Since (start )
213+ span .SetAttributes (
214+ attribute .Int64 ("ingestion.duration_ms" , duration .Milliseconds ()),
215+ attribute .Int ("tools.ingested" , toolCount ),
216+ )
217+
218+ logger .Infow ("Successfully ingested server" ,
219+ "server_name" , serverName ,
220+ "server_id" , serverID ,
221+ "tools_count" , toolCount ,
222+ "duration_ms" , duration .Milliseconds ())
158223 return nil
159224}
160225
161226// syncBackendTools synchronizes tools for a backend server
162227func (s * Service ) syncBackendTools (ctx context.Context , serverID string , serverName string , tools []mcp.Tool ) (int , error ) {
228+ // Create a span for tool synchronization
229+ ctx , span := s .tracer .Start (ctx , "optimizer.ingestion.sync_backend_tools" ,
230+ trace .WithAttributes (
231+ attribute .String ("server.id" , serverID ),
232+ attribute .String ("server.name" , serverName ),
233+ attribute .Int ("tools.count" , len (tools )),
234+ ))
235+ defer span .End ()
236+
163237 logger .Debugf ("syncBackendTools: server=%s, serverID=%s, tool_count=%d" , serverName , serverID , len (tools ))
238+
164239 // Delete existing tools
165240 if err := s .backendToolOps .DeleteByServer (ctx , serverID ); err != nil {
241+ span .RecordError (err )
242+ span .SetStatus (codes .Error , err .Error ())
166243 return 0 , fmt .Errorf ("failed to delete existing tools: %w" , err )
167244 }
168245
@@ -178,6 +255,8 @@ func (s *Service) syncBackendTools(ctx context.Context, serverID string, serverN
178255 // Convert InputSchema to JSON
179256 schemaJSON , err := json .Marshal (tool .InputSchema )
180257 if err != nil {
258+ span .RecordError (err )
259+ span .SetStatus (codes .Error , err .Error ())
181260 return 0 , fmt .Errorf ("failed to marshal input schema for tool %s: %w" , tool .Name , err )
182261 }
183262
@@ -193,6 +272,8 @@ func (s *Service) syncBackendTools(ctx context.Context, serverID string, serverN
193272 }
194273
195274 if err := s .backendToolOps .Create (ctx , backendTool , serverName ); err != nil {
275+ span .RecordError (err )
276+ span .SetStatus (codes .Error , err .Error ())
196277 return 0 , fmt .Errorf ("failed to create tool %s: %w" , tool .Name , err )
197278 }
198279 }
@@ -228,6 +309,20 @@ func (s *Service) GetTotalToolTokens(ctx context.Context) int {
228309 return 0
229310}
230311
312+ // GetTotalEmbeddingTime returns the total time spent calculating embeddings
313+ func (s * Service ) GetTotalEmbeddingTime () time.Duration {
314+ s .embeddingTimeMu .Lock ()
315+ defer s .embeddingTimeMu .Unlock ()
316+ return s .totalEmbeddingTime
317+ }
318+
319+ // ResetEmbeddingTime resets the total embedding time counter
320+ func (s * Service ) ResetEmbeddingTime () {
321+ s .embeddingTimeMu .Lock ()
322+ defer s .embeddingTimeMu .Unlock ()
323+ s .totalEmbeddingTime = 0
324+ }
325+
231326// Close releases resources
232327func (s * Service ) Close () error {
233328 var errs []error
0 commit comments