diff --git a/internal/consolidation/promote.go b/internal/consolidation/promote.go index 1a02af7..2c1959a 100644 --- a/internal/consolidation/promote.go +++ b/internal/consolidation/promote.go @@ -31,20 +31,15 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo cl := NewConsolidationLogger(c.decisions, c.runID, c.normalizedModel()) - // Index merge proposals by memory position so we can skip merged memories - // in the create-new pass. Uses MemoryIndex from MergeProposal for exact matching. mergedIndices := make(map[int]bool) for _, merge := range merges { mergedIndices[merge.MemoryIndex] = true } - // Build set of memories to skip (already captured by existing behaviors). skipped := make(map[int]bool, len(skips)) for _, idx := range skips { skipped[idx] = true } mergeCount := 0 - // nodesCreatedByMerge counts new nodes added by supersede/supplement strategies - // (absorb modifies an existing node, so it doesn't create a new one). nodesCreatedByMerge := 0 for _, merge := range merges { @@ -53,7 +48,6 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo elapsed := time.Since(mergeStart).Milliseconds() if err != nil { - // Merge failed — unmark the memory so it falls through to create-as-new delete(mergedIndices, merge.MemoryIndex) cl.LogPromote("merge_failed", elapsed, map[string]any{ "target_id": merge.TargetID, @@ -74,17 +68,14 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo }) } - // Create nodes for non-merged memories and build pending→actual ID map promoted := 0 baseTS := time.Now().UnixNano() - pendingToActual := make(map[string]string) // "pending-N" → actual node ID + pendingToActual := make(map[string]string) for i, mem := range memories { pendingID := PendingNodeID(i) if skipped[i] { - // Skipped memories don't get nodes; co-occurrence edges referencing - // them will be filtered out by the pending-ID check below. cl.LogPromote("skip", 0, map[string]any{ "reason": "already_captured", "memory_kind": string(mem.Kind), @@ -92,7 +83,6 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo continue } if mergedIndices[i] { - // Map merged memory's pending ID to its merge target for _, merge := range merges { if merge.MemoryIndex == i { pendingToActual[pendingID] = merge.TargetID @@ -108,12 +98,9 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo node := c.buildPromoteNode(mem, c.runID, baseTS, i) if _, err := s.AddNode(ctx, node); err != nil { - if errors.Is(err, store.ErrDuplicateContent) { - // Map pending ID to the existing node so co-occurrence edges - // referencing this memory are preserved instead of silently dropped. - if existingID := extractDuplicateNodeID(err); existingID != "" { - pendingToActual[pendingID] = existingID - } + var dupErr *store.DuplicateContentError + if errors.As(err, &dupErr) { + pendingToActual[pendingID] = dupErr.ExistingID slog.Info("skipping duplicate content", "node_id", node.ID, "error", err) cl.LogPromote("skip", 0, map[string]any{ "reason": "duplicate_content", @@ -134,7 +121,6 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo }) } - // Rewrite pending IDs in edges to actual node IDs, then write to store for _, edge := range edges { if actual, ok := pendingToActual[edge.Source]; ok { edge.Source = actual @@ -142,7 +128,6 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo if actual, ok := pendingToActual[edge.Target]; ok { edge.Target = actual } - // Skip edges with unresolved pending IDs (shouldn't happen, but defensive) if strings.HasPrefix(edge.Source, "pending-") || strings.HasPrefix(edge.Target, "pending-") { continue } @@ -157,7 +142,6 @@ func (c *LLMConsolidator) Promote(ctx context.Context, memories []ClassifiedMemo }, nil } -// executeMerge applies a single merge proposal to the graph store. func (c *LLMConsolidator) executeMerge(ctx context.Context, merge MergeProposal, s store.GraphStore, runID string) error { switch merge.Strategy { case "absorb": @@ -171,22 +155,16 @@ func (c *LLMConsolidator) executeMerge(ctx context.Context, merge MergeProposal, } } -// executeAbsorb updates an existing node with merged content, bumps confidence, -// and appends source events to provenance. func (c *LLMConsolidator) executeAbsorb(ctx context.Context, merge MergeProposal, s store.GraphStore) error { - existing, err := s.GetNode(ctx, merge.TargetID) + node, err := getTargetNode(ctx, s, merge.TargetID) if err != nil { - return fmt.Errorf("fetching target node %s: %w", merge.TargetID, err) - } - if existing == nil { - return fmt.Errorf("target node not found: %s", merge.TargetID) + return err } - // Update content: prefer new canonical if it is longer/richer - if existing.Content == nil { - existing.Content = make(map[string]interface{}) + if node.Content == nil { + node.Content = make(map[string]interface{}) } - contentMap, _ := existing.Content["content"].(map[string]interface{}) + contentMap, _ := node.Content["content"].(map[string]interface{}) if contentMap == nil { contentMap = make(map[string]interface{}) } @@ -196,95 +174,61 @@ func (c *LLMConsolidator) executeAbsorb(ctx context.Context, merge MergeProposal } if merge.Memory.Content.Summary != "" { contentMap["summary"] = merge.Memory.Content.Summary - // Also update top-level name for UI/query consistency - existing.Content["name"] = merge.Memory.Content.Summary + node.Content["name"] = merge.Memory.Content.Summary } if len(merge.Memory.Content.Tags) > 0 { contentMap["tags"] = toInterfaceSlice(merge.Memory.Content.Tags) } - existing.Content["content"] = contentMap + node.Content["content"] = contentMap - // Bump confidence: take the max of existing and new, capped at 1.0 - if existing.Metadata == nil { - existing.Metadata = make(map[string]interface{}) + if node.Metadata == nil { + node.Metadata = make(map[string]interface{}) } - oldConf, _ := existing.Metadata["confidence"].(float64) + oldConf, _ := node.Metadata["confidence"].(float64) newConf := merge.Memory.Confidence maxConf := oldConf if newConf > oldConf { maxConf = newConf - existing.Metadata["confidence"] = newConf + node.Metadata["confidence"] = newConf } - // Append provenance - prov, _ := existing.Metadata["provenance"].(map[string]interface{}) + prov, _ := node.Metadata["provenance"].(map[string]interface{}) if prov == nil { prov = make(map[string]interface{}) } prov["consolidated_by"] = c.normalizedModel() prov["source_type"] = string(models.SourceTypeConsolidated) - now := time.Now().UTC() - prov["consolidated_at"] = now.Format(time.RFC3339) + prov["consolidated_at"] = time.Now().UTC().Format(time.RFC3339) prov["confidence"] = maxConf - // Merge source events with deduplication existingEvents, _ := prov["source_events"].([]interface{}) - seen := make(map[string]bool, len(existingEvents)) - for _, e := range existingEvents { - if str, ok := e.(string); ok { - seen[str] = true - } - } - for _, evtID := range merge.Memory.SourceEvents { - if !seen[evtID] { - existingEvents = append(existingEvents, evtID) - seen[evtID] = true - } - } - prov["source_events"] = existingEvents - existing.Metadata["provenance"] = prov + prov["source_events"] = mergeSourceEvents(existingEvents, merge.Memory.SourceEvents) + node.Metadata["provenance"] = prov - return s.UpdateNode(ctx, *existing) + return s.UpdateNode(ctx, node) } -// executeSupersede marks the old behavior as merged (soft-delete), creates a -// new node with combined lineage, and adds a supersedes edge. -// The new node is created first; the old node is only soft-deleted once the -// new node and edge are safely written (atomic w.r.t. partial failure). func (c *LLMConsolidator) executeSupersede(ctx context.Context, merge MergeProposal, s store.GraphStore, runID string) error { - existing, err := s.GetNode(ctx, merge.TargetID) + existing, err := getTargetNode(ctx, s, merge.TargetID) if err != nil { - return fmt.Errorf("fetching target node %s: %w", merge.TargetID, err) - } - if existing == nil { - return fmt.Errorf("target node not found: %s", merge.TargetID) + return err } - // Combine lineage: gather source events from old + new with deduplication - var combinedEvents []string - seen := make(map[string]bool) + var oldEvents []interface{} if oldProv, ok := existing.Metadata["provenance"].(map[string]interface{}); ok { - if oldEvents, ok := oldProv["source_events"].([]interface{}); ok { - for _, e := range oldEvents { - if str, ok := e.(string); ok && !seen[str] { - combinedEvents = append(combinedEvents, str) - seen[str] = true - } - } - } + oldEvents, _ = oldProv["source_events"].([]interface{}) } - for _, evtID := range merge.Memory.SourceEvents { - if !seen[evtID] { - combinedEvents = append(combinedEvents, evtID) - seen[evtID] = true + combinedIface := mergeSourceEvents(oldEvents, merge.Memory.SourceEvents) + combinedEvents := make([]string, 0, len(combinedIface)) + for _, e := range combinedIface { + if str, ok := e.(string); ok { + combinedEvents = append(combinedEvents, str) } } - // Create new node first (before any mutations to the old node) ts := time.Now().UnixNano() newID := fmt.Sprintf("supersede-%s-%d", merge.TargetID, ts) node := c.buildPromoteNode(merge.Memory, runID, ts, merge.MemoryIndex) - // Override provenance with combined lineage if node.Metadata == nil { node.Metadata = make(map[string]interface{}) } @@ -304,31 +248,24 @@ func (c *LLMConsolidator) executeSupersede(ctx context.Context, merge MergePropo return fmt.Errorf("creating superseding node: %w", err) } - // Add supersedes edge: new -> old edge := store.Edge{ - Source: newID, - Target: merge.TargetID, - Kind: EdgeKindSupersedes, - Weight: 1.0, - CreatedAt: time.Now(), + Source: newID, Target: merge.TargetID, + Kind: EdgeKindSupersedes, Weight: 1.0, CreatedAt: time.Now(), } if err := s.AddEdge(ctx, edge); err != nil { - // Clean up the orphaned new node before returning if rbErr := s.DeleteNode(ctx, newID); rbErr != nil { slog.Warn("supersede rollback: failed to delete orphaned node", "new_id", newID, "error", rbErr) } return fmt.Errorf("adding supersedes edge: %w", err) } - // Only soft-delete old node after new node + edge are safely written existing.Kind = store.NodeKindMerged if existing.Metadata == nil { existing.Metadata = make(map[string]interface{}) } existing.Metadata["merged_at"] = time.Now().UTC().Format(time.RFC3339) existing.Metadata["merged_reason"] = "superseded" - if err := s.UpdateNode(ctx, *existing); err != nil { - // Rollback: remove the edge and orphaned new node + if err := s.UpdateNode(ctx, existing); err != nil { if rbErr := s.RemoveEdge(ctx, newID, merge.TargetID, EdgeKindSupersedes); rbErr != nil { slog.Warn("supersede rollback: failed to remove edge", "new_id", newID, "target", merge.TargetID, "error", rbErr) } @@ -341,25 +278,16 @@ func (c *LLMConsolidator) executeSupersede(ctx context.Context, merge MergePropo return nil } -// executeSupplement keeps the existing behavior unchanged and creates a new node -// with a supplements edge pointing to the existing behavior. func (c *LLMConsolidator) executeSupplement(ctx context.Context, merge MergeProposal, s store.GraphStore, runID string) error { - // Verify target exists - existing, err := s.GetNode(ctx, merge.TargetID) - if err != nil { - return fmt.Errorf("fetching target node %s: %w", merge.TargetID, err) - } - if existing == nil { - return fmt.Errorf("target node not found: %s", merge.TargetID) + if _, err := getTargetNode(ctx, s, merge.TargetID); err != nil { + return err } - // Create supplementary node ts := time.Now().UnixNano() newID := fmt.Sprintf("supplement-%s-%d", merge.TargetID, ts) node := c.buildPromoteNode(merge.Memory, runID, ts, merge.MemoryIndex) node.ID = newID - // Add supplements provenance for self-describing nodes if node.Metadata == nil { node.Metadata = make(map[string]interface{}) } @@ -377,16 +305,11 @@ func (c *LLMConsolidator) executeSupplement(ctx context.Context, merge MergeProp return fmt.Errorf("creating supplement node: %w", err) } - // Add supplements edge: new detail -> existing behavior edge := store.Edge{ - Source: newID, - Target: merge.TargetID, - Kind: EdgeKindSupplements, - Weight: merge.Similarity, - CreatedAt: time.Now(), + Source: newID, Target: merge.TargetID, + Kind: EdgeKindSupplements, Weight: merge.Similarity, CreatedAt: time.Now(), } if err := s.AddEdge(ctx, edge); err != nil { - // Clean up the orphaned new node before returning if rbErr := s.DeleteNode(ctx, newID); rbErr != nil { slog.Warn("supplement rollback: failed to delete orphaned node", "new_id", newID, "error", rbErr) } @@ -396,8 +319,6 @@ func (c *LLMConsolidator) executeSupplement(ctx context.Context, merge MergeProp return nil } -// buildPromoteNode constructs a store.Node from a ClassifiedMemory with rich -// provenance including model, source events, confidence, and session context. func (c *LLMConsolidator) buildPromoteNode(mem ClassifiedMemory, runID string, baseTS int64, index int) store.Node { contentMap := map[string]interface{}{ "canonical": mem.Content.Canonical, @@ -412,7 +333,6 @@ func (c *LLMConsolidator) buildPromoteNode(mem ClassifiedMemory, runID string, b contentMap["workflow_data"] = mem.WorkflowData } - // Rich provenance prov := map[string]interface{}{ "source_type": string(models.SourceTypeConsolidated), "consolidated_by": c.normalizedModel(), @@ -422,7 +342,6 @@ func (c *LLMConsolidator) buildPromoteNode(mem ClassifiedMemory, runID string, b "consolidated_at": time.Now().UTC().Format(time.RFC3339), } - // Session context as provenance metadata if phase, ok := mem.SessionContext["session_phase"].(string); ok { prov["session_phase"] = phase } @@ -451,11 +370,7 @@ func (c *LLMConsolidator) buildPromoteNode(mem ClassifiedMemory, runID string, b } } -// persistRun writes a consolidation run record to the consolidation_runs table. -// It silently no-ops if the store does not support SQL (e.g., InMemoryGraphStore). -// Errors are logged but not fatal — run persistence is best-effort. func persistRun(ctx context.Context, s store.GraphStore, model string, rec ConsolidationRunRecord, runID string, mergeCount int) { - // Type-assert to get the underlying *sql.DB. type sqlDBProvider interface { DB() *sql.DB } @@ -478,24 +393,29 @@ func persistRun(ctx context.Context, s store.GraphStore, model string, rec Conso } } -// extractDuplicateNodeID parses the existing node ID from an ErrDuplicateContent -// error. The store returns errors in the format: -// -// "duplicate content: behavior has identical canonical content: ..." -// -// Returns the empty string if the ID cannot be extracted. -func extractDuplicateNodeID(err error) string { - msg := err.Error() - const prefix = "duplicate content: behavior " - const suffix = " has identical canonical content" - start := strings.Index(msg, prefix) - if start < 0 { - return "" - } - rest := msg[start+len(prefix):] - end := strings.Index(rest, suffix) - if end < 0 { - return "" - } - return rest[:end] +func getTargetNode(ctx context.Context, s store.GraphStore, targetID string) (store.Node, error) { + existing, err := s.GetNode(ctx, targetID) + if err != nil { + return store.Node{}, fmt.Errorf("fetching target node %s: %w", targetID, err) + } + if existing == nil { + return store.Node{}, fmt.Errorf("target node not found: %s", targetID) + } + return *existing, nil +} + +func mergeSourceEvents(existing []interface{}, incoming []string) []interface{} { + seen := make(map[string]bool, len(existing)) + for _, e := range existing { + if str, ok := e.(string); ok { + seen[str] = true + } + } + for _, evtID := range incoming { + if !seen[evtID] { + existing = append(existing, evtID) + seen[evtID] = true + } + } + return existing } diff --git a/internal/consolidation/promote_test.go b/internal/consolidation/promote_test.go index 9a3775e..0e97ae4 100644 --- a/internal/consolidation/promote_test.go +++ b/internal/consolidation/promote_test.go @@ -2,6 +2,7 @@ package consolidation import ( "context" + "fmt" "testing" "github.com/nvandessel/floop/internal/logging" @@ -9,6 +10,15 @@ import ( "github.com/nvandessel/floop/internal/store" ) +// errorGetNodeStore wraps InMemoryGraphStore but returns an error from GetNode. +type errorGetNodeStore struct { + store.InMemoryGraphStore +} + +func (s *errorGetNodeStore) GetNode(ctx context.Context, id string) (*store.Node, error) { + return nil, fmt.Errorf("simulated store error") +} + func testMemory(canonical string, kind models.BehaviorKind) ClassifiedMemory { return ClassifiedMemory{ Candidate: Candidate{ @@ -709,3 +719,94 @@ func TestLLMPromote_MergeMatchesByIndexNotText(t *testing.T) { t.Fatalf("expected 2 behavior nodes, got %d", len(nodes)) } } + +func TestGetTargetNode_NotFound(t *testing.T) { + ctx := context.Background() + s := store.NewInMemoryGraphStore() + _, err := getTargetNode(ctx, s, "nonexistent-id") + if err == nil { + t.Fatal("expected error for nonexistent node") + } +} + +func TestGetTargetNode_Found(t *testing.T) { + ctx := context.Background() + s := store.NewInMemoryGraphStore() + want := store.Node{ID: "bhv-found", Kind: store.NodeKindBehavior, Content: map[string]interface{}{"name": "test"}, Metadata: map[string]interface{}{"confidence": 0.9}} + if _, err := s.AddNode(ctx, want); err != nil { + t.Fatalf("AddNode: %v", err) + } + got, err := getTargetNode(ctx, s, "bhv-found") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got.ID != want.ID { + t.Errorf("ID = %q, want %q", got.ID, want.ID) + } +} + +func TestGetTargetNode_StoreError(t *testing.T) { + ctx := context.Background() + s := &errorGetNodeStore{} + _, err := getTargetNode(ctx, s, "any-id") + if err == nil { + t.Fatal("expected error from store") + } + if got := err.Error(); got != "fetching target node any-id: simulated store error" { + t.Errorf("unexpected error message: %s", got) + } +} + +func TestMergeSourceEvents_Dedup(t *testing.T) { + result := mergeSourceEvents([]interface{}{"evt-1", "evt-2"}, []string{"evt-2", "evt-3", "evt-4"}) + if len(result) != 4 { + t.Fatalf("expected 4 events, got %d: %v", len(result), result) + } +} + +func TestMergeSourceEvents_EmptyExisting(t *testing.T) { + result := mergeSourceEvents(nil, []string{"evt-1", "evt-2"}) + if len(result) != 2 { + t.Fatalf("expected 2 events, got %d", len(result)) + } +} + +func TestMergeSourceEvents_NonStringExisting(t *testing.T) { + result := mergeSourceEvents([]interface{}{42, "evt-1"}, []string{"evt-1", "evt-2"}) + if len(result) != 3 { + t.Fatalf("expected 3 events, got %d: %v", len(result), result) + } +} + +func TestLLMPromote_UnknownMergeStrategy(t *testing.T) { + c := newTestPromoteConsolidator() + ctx := context.Background() + s := store.NewInMemoryGraphStore() + existing := store.Node{ID: "bhv-target", Kind: store.NodeKindBehavior, Content: map[string]interface{}{"name": "Target"}, Metadata: map[string]interface{}{"confidence": 0.7}} + if _, err := s.AddNode(ctx, existing); err != nil { + t.Fatalf("AddNode: %v", err) + } + mem := testMemory("Test memory", models.BehaviorKindDirective) + merges := []MergeProposal{{Memory: mem, MemoryIndex: 0, TargetID: "bhv-target", Similarity: 0.9, Strategy: "unknown-strategy"}} + result, err := c.Promote(ctx, []ClassifiedMemory{mem}, nil, merges, nil, s) + if err != nil { + t.Fatalf("Promote should not fail: %v", err) + } + if result.Promoted != 1 { + t.Errorf("expected promoted=1, got %d", result.Promoted) + } +} + +func TestLLMPromote_SkipsMemory(t *testing.T) { + c := newTestPromoteConsolidator() + ctx := context.Background() + s := store.NewInMemoryGraphStore() + mem := testMemory("Should be skipped", models.BehaviorKindDirective) + result, err := c.Promote(ctx, []ClassifiedMemory{mem}, nil, nil, []int{0}, s) + if err != nil { + t.Fatalf("Promote: %v", err) + } + if result.Promoted != 0 { + t.Errorf("expected promoted=0, got %d", result.Promoted) + } +} diff --git a/internal/store/memory.go b/internal/store/memory.go index 19dd3c9..00dd5fe 100644 --- a/internal/store/memory.go +++ b/internal/store/memory.go @@ -52,7 +52,7 @@ func (s *InMemoryGraphStore) AddNode(ctx context.Context, node Node) (string, er if canonical := canonicalContent(node); canonical != "" { for id, existing := range s.nodes { if id != node.ID && canonicalContent(existing) == canonical { - return "", fmt.Errorf("duplicate content: behavior %s has identical canonical content: %w", id, ErrDuplicateContent) + return "", &DuplicateContentError{ExistingID: id} } } } diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go index 499fc2e..9aa766b 100644 --- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -357,7 +357,7 @@ func (s *SQLiteGraphStore) addBehaviorWith(ctx context.Context, q dbQuerier, nod contentHash, node.ID).Scan(&existingID) if err == nil { // Found existing behavior with same content - return "", fmt.Errorf("duplicate content: behavior %s has identical canonical content: %w", existingID, ErrDuplicateContent) + return "", &DuplicateContentError{ExistingID: existingID} } else if err != sql.ErrNoRows { // Unexpected error return "", fmt.Errorf("check for duplicate content: %w", err) diff --git a/internal/store/store.go b/internal/store/store.go index e3aba6d..d6c3bba 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -5,6 +5,7 @@ package store import ( "context" "errors" + "fmt" "time" ) @@ -12,6 +13,21 @@ import ( // canonical content already exists in the store under a different ID. var ErrDuplicateContent = errors.New("duplicate content") +// DuplicateContentError is a structured error returned by AddNode when a node +// with identical canonical content already exists. It carries the ID of the +// existing node so callers can use errors.As instead of string-parsing. +type DuplicateContentError struct { + ExistingID string +} + +func (e *DuplicateContentError) Error() string { + return fmt.Sprintf("duplicate content: behavior %s has identical canonical content", e.ExistingID) +} + +func (e *DuplicateContentError) Is(target error) bool { + return target == ErrDuplicateContent +} + // Node represents a node in the behavior graph. type Node struct { ID string `json:"id"` diff --git a/internal/store/store_test.go b/internal/store/store_test.go new file mode 100644 index 0000000..3e70300 --- /dev/null +++ b/internal/store/store_test.go @@ -0,0 +1,32 @@ +package store + +import ( + "errors" + "testing" +) + +func TestDuplicateContentError_Error(t *testing.T) { + err := &DuplicateContentError{ExistingID: "bhv-123"} + want := "duplicate content: behavior bhv-123 has identical canonical content" + if got := err.Error(); got != want { + t.Errorf("Error() = %q, want %q", got, want) + } +} + +func TestDuplicateContentError_Is(t *testing.T) { + err := &DuplicateContentError{ExistingID: "bhv-456"} + if !errors.Is(err, ErrDuplicateContent) { + t.Error("expected errors.Is(err, ErrDuplicateContent) to be true") + } +} + +func TestDuplicateContentError_As(t *testing.T) { + var wrapped error = &DuplicateContentError{ExistingID: "bhv-789"} + var dupErr *DuplicateContentError + if !errors.As(wrapped, &dupErr) { + t.Fatal("expected errors.As to succeed") + } + if dupErr.ExistingID != "bhv-789" { + t.Errorf("ExistingID = %q, want %q", dupErr.ExistingID, "bhv-789") + } +}