From f7a843b45883fb1809f0df2811958363e3696a05 Mon Sep 17 00:00:00 2001 From: Dan Mills Date: Sun, 22 Feb 2026 21:25:58 +0100 Subject: [PATCH 1/5] feat(graph): enhance query_trace with done status and pyramid summaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add GetTraceSummariesAll to graph/compression.go — returns all available pyramid levels for a trace (no fallback, exact match per level) - Add LocalTraceInfo struct + GetTraceInfo callback to MCP tools deps - Wire GetTraceInfo in main.go via graphDB (read-only, no LLM needed) - query_trace now augments Engram response with local done status and all available pyramid levels when GetTraceInfo callback is configured - Also commit entities.go valid_at fix: AddEntityRelationWithSource now uses COALESCE subquery to populate valid_at from source episode timestamp Completes Phase 4 query_trace item from trace-completion-pyramid-design.md Co-Authored-By: Claude Sonnet 4.6 --- cmd/bud/main.go | 19 +++++++++++++++++++ internal/graph/compression.go | 26 ++++++++++++++++++++++++++ internal/graph/entities.go | 6 +++--- internal/mcp/tools/deps.go | 13 +++++++++++++ internal/mcp/tools/register.go | 22 ++++++++++++++++++++++ 5 files changed, 83 insertions(+), 3 deletions(-) diff --git a/cmd/bud/main.go b/cmd/bud/main.go index e1b1adb..8062d78 100644 --- a/cmd/bud/main.go +++ b/cmd/bud/main.go @@ -383,6 +383,25 @@ func main() { } return graphDB.MarkTraceDone(traceShortID, resolutionEpisodeShortID) }, + GetTraceInfo: func(traceShortID string) (*tools.LocalTraceInfo, error) { + if graphDB == nil { + return nil, fmt.Errorf("graph DB not available") + } + trace, err := graphDB.GetTraceByShortID(traceShortID) + if err != nil || trace == nil { + return nil, err + } + summaries, err := graphDB.GetTraceSummariesAll(trace.ID) + if err != nil { + summaries = nil + } + return &tools.LocalTraceInfo{ + Done: trace.Done, + Resolution: trace.Resolution, + DoneAt: trace.DoneAt, + PyramidSummaries: summaries, + }, nil + }, OnMCPToolCall: func(toolName string) { if exec != nil { exec.GetMCPToolCallback()(toolName) diff --git a/internal/graph/compression.go b/internal/graph/compression.go index 0cea2f1..36a3445 100644 --- a/internal/graph/compression.go +++ b/internal/graph/compression.go @@ -315,6 +315,32 @@ func (g *DB) AddTraceSummary(traceID string, level int, summary string, tokens i return err } +// GetTraceSummariesAll returns all available pyramid summaries for a trace as a map of +// compression_level → summary. Only levels that actually exist in the DB are included. +func (g *DB) GetTraceSummariesAll(traceID string) (map[int]string, error) { + rows, err := g.db.Query(` + SELECT compression_level, summary + FROM trace_summaries + WHERE trace_id = ? + ORDER BY compression_level DESC + `, traceID) + if err != nil { + return nil, err + } + defer rows.Close() + + result := make(map[int]string) + for rows.Next() { + var level int + var summary string + if err := rows.Scan(&level, &summary); err != nil { + return nil, err + } + result[level] = summary + } + return result, rows.Err() +} + // DeleteAllTraceSummaries removes all trace summaries from the database func (g *DB) DeleteAllTraceSummaries() error { _, err := g.db.Exec(`DELETE FROM trace_summaries`) diff --git a/internal/graph/entities.go b/internal/graph/entities.go index 63050b5..fd8d335 100644 --- a/internal/graph/entities.go +++ b/internal/graph/entities.go @@ -249,9 +249,9 @@ func (g *DB) AddEntityRelationWithSource(fromID, toID string, relType EdgeType, `, fromID, toID, relType, weight) } else { result, err = g.db.Exec(` - INSERT INTO entity_relations (from_id, to_id, relation_type, weight, source_episode_id) - VALUES (?, ?, ?, ?, ?) - `, fromID, toID, relType, weight, sourceEpisodeID) + INSERT INTO entity_relations (from_id, to_id, relation_type, weight, source_episode_id, valid_at) + VALUES (?, ?, ?, ?, ?, COALESCE((SELECT timestamp_event FROM episodes WHERE id = ?), CURRENT_TIMESTAMP)) + `, fromID, toID, relType, weight, sourceEpisodeID, sourceEpisodeID) } if err != nil { diff --git a/internal/mcp/tools/deps.go b/internal/mcp/tools/deps.go index 4d1793a..c5f8b97 100644 --- a/internal/mcp/tools/deps.go +++ b/internal/mcp/tools/deps.go @@ -2,6 +2,8 @@ package tools import ( + "time" + "github.com/vthunder/bud2/internal/activity" "github.com/vthunder/bud2/internal/engram" "github.com/vthunder/bud2/internal/eval" @@ -12,6 +14,15 @@ import ( "github.com/vthunder/bud2/internal/state" ) +// LocalTraceInfo holds done-status and pyramid summaries from the local graph DB. +// Returned by the GetTraceInfo callback in Dependencies. +type LocalTraceInfo struct { + Done bool `json:"done"` + Resolution string `json:"resolution,omitempty"` + DoneAt time.Time `json:"done_at,omitempty"` + PyramidSummaries map[int]string `json:"pyramid_summaries,omitempty"` +} + // Dependencies holds all services that MCP tools may need. // Optional fields may be nil. type Dependencies struct { @@ -42,6 +53,8 @@ type Dependencies struct { AddThought func(content string) error // If set, save_thought(completes=[...]) will use this to mark traces as done MarkTraceDone func(traceShortID, resolutionEpisodeShortID string) error + // If set, query_trace will augment Engram data with local done status + pyramid summaries + GetTraceInfo func(traceShortID string) (*LocalTraceInfo, error) // If set, signal_done will use this to send completion signals SendSignal func(signalType, content string, extra map[string]any) error // If set, MCP tools will call this to notify that they've been executed diff --git a/internal/mcp/tools/register.go b/internal/mcp/tools/register.go index 47532c8..2d71b1a 100644 --- a/internal/mcp/tools/register.go +++ b/internal/mcp/tools/register.go @@ -312,6 +312,28 @@ func registerMemoryTools(server *mcp.Server, deps *Dependencies) { return "", err } + // Augment with local done status + pyramid summaries if available + if deps.GetTraceInfo != nil { + info, infoErr := deps.GetTraceInfo(traceID) + if infoErr == nil && info != nil { + result := map[string]any{ + "trace": trace, + } + if info.Done { + result["done_status"] = map[string]any{ + "is_done": true, + "resolved_by": info.Resolution, + "resolved_at": info.DoneAt, + } + } + if len(info.PyramidSummaries) > 0 { + result["pyramid_summaries"] = info.PyramidSummaries + } + data, _ := json.MarshalIndent(result, "", " ") + return string(data), nil + } + } + data, _ := json.MarshalIndent(trace, "", " ") return string(data), nil }) From 90cbb15c01d43f0ffbfc7469bec1bad8127158b7 Mon Sep 17 00:00:00 2001 From: Dan Mills Date: Sun, 22 Feb 2026 21:54:56 +0100 Subject: [PATCH 2/5] feat(graph,consolidate): implement trace conflict detection (Step 1-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Claude inference detects "contradicts" relationships between episodes, those edges were previously fed into the clustering algorithm and caused contradicting episodes to merge into a single trace — silently discarding the conflict. This commit implements the conflict resolution pipeline: - Migration v22: adds has_conflict, conflict_with, conflict_with_at columns to traces table with idx_traces_has_conflict index - Trace struct: adds HasConflict, ConflictWith, ConflictResolvedAt fields (populated by all standard scanTrace/scanTraceRows queries) - graph.MarkTraceConflict: marks a trace as conflicting with another by short_id, builds CSV of conflict_with, idempotent re-marks - graph.GetTraceShortID: lightweight short_id lookup by trace ID - consolidate.Run: separates "contradicts" edges from regular edges before Phase 2 clustering — contradiction edges are still stored to DB for audit but excluded from the adjacency list, so contradicting episodes form separate traces - consolidate.markContradictionConflicts: after trace creation, looks up which traces the contradicting episodes landed in and marks both with has_conflict + conflict_with, deduplicating trace pairs - Tests: TestMarkTraceConflict covers marking, idempotency, and GetTraceShortID Phase 4 (retrieval: surface conflicting traces together in context) is a follow-up step. Co-Authored-By: Claude Sonnet 4.6 --- internal/consolidate/consolidate.go | 98 +++++++++++++++++++++++++++-- internal/graph/db.go | 21 +++++++ internal/graph/graph_test.go | 75 ++++++++++++++++++++++ internal/graph/traces.go | 83 +++++++++++++++++++++--- internal/graph/types.go | 5 ++ 5 files changed, 271 insertions(+), 11 deletions(-) diff --git a/internal/consolidate/consolidate.go b/internal/consolidate/consolidate.go index 5a6d9c9..5142c6f 100644 --- a/internal/consolidate/consolidate.go +++ b/internal/consolidate/consolidate.go @@ -131,7 +131,24 @@ func (c *Consolidator) Run() (int, error) { c.printEdgeSummaries(episodes, episodeEdges) } - // Store edges in database (only if both episodes exist) + // Separate contradiction edges from regular edges before clustering. + // Contradiction edges are stored to the DB for audit purposes but excluded from the + // clustering adjacency list so contradicting episodes form separate traces rather + // than being merged into a single trace (which would silently discard the conflict). + var contradictionEdges []EpisodeEdge + var regularEdges []EpisodeEdge + for _, edge := range episodeEdges { + if edge.Relationship == "contradicts" { + contradictionEdges = append(contradictionEdges, edge) + } else { + regularEdges = append(regularEdges, edge) + } + } + if len(contradictionEdges) > 0 { + log.Printf("[consolidate] Segregating %d contradiction edges from clustering", len(contradictionEdges)) + } + + // Store ALL edges in database (including contradictions, for audit/debugging) episodeIDs := make(map[string]bool) for _, ep := range episodes { episodeIDs[ep.ID] = true @@ -146,9 +163,9 @@ func (c *Consolidator) Run() (int, error) { } } - // Phase 2: Graph clustering using Claude-inferred edges - // Returns: new groups (to be consolidated) and existing traces with new episodes - newGroups, existingTracesWithNewEpisodes := c.clusterEpisodesByEdges(episodes, episodeEdges) + // Phase 2: Graph clustering using only regular (non-contradiction) edges. + // Contradiction edges are excluded so contradicting episodes land in separate traces. + newGroups, existingTracesWithNewEpisodes := c.clusterEpisodesByEdges(episodes, regularEdges) // Phase 3a: Add new episodes to existing traces and mark for reconsolidation for traceID, newEpisodes := range existingTracesWithNewEpisodes { @@ -184,6 +201,17 @@ func (c *Consolidator) Run() (int, error) { log.Printf("[consolidate] Created %d episode→trace cross-reference edges", linked) } + // Phase 3d: Mark conflicting traces from contradiction edges. + // Now that episodes have been assigned to traces, we can identify which traces + // contain contradicting information and flag them for executive review. + if len(contradictionEdges) > 0 { + conflictPairs := c.markContradictionConflicts(contradictionEdges) + if conflictPairs > 0 { + log.Printf("[consolidate] Marked %d conflicting trace pairs from %d contradiction edges", + conflictPairs, len(contradictionEdges)) + } + } + // Phase 4: Batch reconsolidation of traces with new episodes tracesNeedingRecon, err := c.graph.GetTracesNeedingReconsolidation() if err != nil { @@ -318,6 +346,68 @@ func (c *Consolidator) clusterEpisodesByEdges(episodes []*graph.Episode, edges [ return newGroups, existingTracesWithNewEpisodes } +// markContradictionConflicts identifies trace pairs connected by "contradicts" edges and +// marks both traces with has_conflict=true and the other's short_id in conflict_with. +// Returns the number of distinct trace pairs marked. +func (c *Consolidator) markContradictionConflicts(contradictionEdges []EpisodeEdge) int { + type tracePair struct{ a, b string } + seen := make(map[tracePair]bool) + marked := 0 + + for _, edge := range contradictionEdges { + // Find which trace each episode belongs to + tracesA, err := c.graph.GetEpisodeTraces(edge.FromID) + if err != nil || len(tracesA) == 0 { + continue + } + tracesB, err := c.graph.GetEpisodeTraces(edge.ToID) + if err != nil || len(tracesB) == 0 { + continue + } + + traceA := tracesA[0] + traceB := tracesB[0] + + // Skip if same trace or ephemeral + if traceA == traceB || traceA == "_ephemeral" || traceB == "_ephemeral" { + continue + } + + // Skip duplicate pairs + pair := tracePair{traceA, traceB} + if traceA > traceB { + pair = tracePair{traceB, traceA} + } + if seen[pair] { + continue + } + seen[pair] = true + + // Get short IDs for human-readable conflict_with field + shortA, err := c.graph.GetTraceShortID(traceA) + if err != nil || shortA == "" { + continue + } + shortB, err := c.graph.GetTraceShortID(traceB) + if err != nil || shortB == "" { + continue + } + + // Mark both traces as conflicting with each other + if err := c.graph.MarkTraceConflict(traceA, shortB); err != nil { + log.Printf("[consolidate] Failed to mark conflict on trace %s: %v", shortA, err) + continue + } + if err := c.graph.MarkTraceConflict(traceB, shortA); err != nil { + log.Printf("[consolidate] Failed to mark conflict on trace %s: %v", shortB, err) + continue + } + marked++ + } + + return marked +} + // reconsolidateTrace regenerates a trace's summary and metadata after new episodes are added func (c *Consolidator) reconsolidateTrace(traceID string) error { // Get all source episodes for this trace diff --git a/internal/graph/db.go b/internal/graph/db.go index cff0811..15edbc6 100644 --- a/internal/graph/db.go +++ b/internal/graph/db.go @@ -799,6 +799,27 @@ func (g *DB) runMigrations() error { log.Println("[graph] Migration to v21 completed successfully") } + // Migration v22: Add conflict tracking fields to traces. + // Enables detection and flagging of contradicting trace pairs found during consolidation. + // has_conflict: true when another trace contains contradicting information. + // conflict_with: CSV of conflicting trace short_ids. + // conflict_resolved_at: when the conflict was resolved (NULL = unresolved). + if version < 22 { + log.Println("[graph] Migrating to schema v22: trace conflict tracking") + migrations := []string{ + "ALTER TABLE traces ADD COLUMN has_conflict BOOLEAN DEFAULT 0", + "ALTER TABLE traces ADD COLUMN conflict_with TEXT", + "ALTER TABLE traces ADD COLUMN conflict_resolved_at DATETIME", + "CREATE INDEX IF NOT EXISTS idx_traces_has_conflict ON traces(has_conflict)", + } + for _, sql := range migrations { + // Ignore errors for columns that already exist + g.db.Exec(sql) + } + g.db.Exec("INSERT INTO schema_version (version) VALUES (22)") + log.Println("[graph] Migration to v22 completed successfully") + } + return nil } diff --git a/internal/graph/graph_test.go b/internal/graph/graph_test.go index d7974ff..88e1829 100644 --- a/internal/graph/graph_test.go +++ b/internal/graph/graph_test.go @@ -1095,3 +1095,78 @@ func TestMarkTraceDone(t *testing.T) { t.Error("Expected error for non-existent trace, got nil") } } + +// TestMarkTraceConflict tests the conflict tracking feature +func TestMarkTraceConflict(t *testing.T) { + db, cleanup := setupTestDB(t) + defer cleanup() + + // Add two traces that will be marked as conflicting + trA := &Trace{ID: "trace-prefer-dark", ShortID: "tr_drk", Summary: "User prefers dark mode", Activation: 0.7} + trB := &Trace{ID: "trace-prefer-light", ShortID: "tr_lgt", Summary: "User switched to light mode", Activation: 0.7} + + if err := addTestTrace(t, db, trA); err != nil { + t.Fatalf("AddTrace(A) failed: %v", err) + } + if err := addTestTrace(t, db, trB); err != nil { + t.Fatalf("AddTrace(B) failed: %v", err) + } + + // Before marking: no conflicts + trAFetched, err := db.GetTraceByShortID("tr_drk") + if err != nil || trAFetched == nil { + t.Fatalf("GetTraceByShortID(tr_drk) failed: %v", err) + } + if trAFetched.HasConflict { + t.Error("Expected HasConflict=false before marking") + } + + // Mark the conflict (caller must mark both directions) + if err := db.MarkTraceConflict("trace-prefer-dark", "tr_lgt"); err != nil { + t.Fatalf("MarkTraceConflict(A) failed: %v", err) + } + if err := db.MarkTraceConflict("trace-prefer-light", "tr_drk"); err != nil { + t.Fatalf("MarkTraceConflict(B) failed: %v", err) + } + + // After marking: both traces show conflict info + trAFetched, err = db.GetTraceByShortID("tr_drk") + if err != nil || trAFetched == nil { + t.Fatalf("GetTraceByShortID(tr_drk) after conflict failed: %v", err) + } + if !trAFetched.HasConflict { + t.Error("Expected HasConflict=true on trace A") + } + if trAFetched.ConflictWith != "tr_lgt" { + t.Errorf("Expected ConflictWith='tr_lgt', got %q", trAFetched.ConflictWith) + } + + trBFetched, err := db.GetTraceByShortID("tr_lgt") + if err != nil || trBFetched == nil { + t.Fatalf("GetTraceByShortID(tr_lgt) after conflict failed: %v", err) + } + if !trBFetched.HasConflict { + t.Error("Expected HasConflict=true on trace B") + } + if trBFetched.ConflictWith != "tr_drk" { + t.Errorf("Expected ConflictWith='tr_drk', got %q", trBFetched.ConflictWith) + } + + // Marking the same conflict again is idempotent + if err := db.MarkTraceConflict("trace-prefer-dark", "tr_lgt"); err != nil { + t.Fatalf("Second MarkTraceConflict should be idempotent, got: %v", err) + } + trAFetched2, _ := db.GetTraceByShortID("tr_drk") + if trAFetched2.ConflictWith != "tr_lgt" { + t.Errorf("Idempotent re-mark should not duplicate entry, got %q", trAFetched2.ConflictWith) + } + + // GetTraceShortID returns the correct short_id + shortID, err := db.GetTraceShortID("trace-prefer-dark") + if err != nil { + t.Fatalf("GetTraceShortID failed: %v", err) + } + if shortID != "tr_drk" { + t.Errorf("Expected short_id 'tr_drk', got %q", shortID) + } +} diff --git a/internal/graph/traces.go b/internal/graph/traces.go index 62a9928..d5ceea0 100644 --- a/internal/graph/traces.go +++ b/internal/graph/traces.go @@ -105,7 +105,8 @@ func (g *DB) GetTrace(id string) (*Trace, error) { ) as summary, t.topic, t.trace_type, t.activation, t.strength, t.embedding, t.created_at, t.last_accessed, t.labile_until, - COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at + COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at, + COALESCE(t.has_conflict, 0), COALESCE(t.conflict_with, ''), t.conflict_resolved_at FROM traces t WHERE t.id = ? `, id) @@ -129,7 +130,8 @@ func (g *DB) GetTraceByShortID(shortID string) (*Trace, error) { ) as summary, t.topic, t.trace_type, t.activation, t.strength, t.embedding, t.created_at, t.last_accessed, t.labile_until, - COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at + COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at, + COALESCE(t.has_conflict, 0), COALESCE(t.conflict_with, ''), t.conflict_resolved_at FROM traces t WHERE t.short_id = ? `, shortID) @@ -160,7 +162,8 @@ func (g *DB) GetActivatedTraces(threshold float64, limit int) ([]*Trace, error) ) as summary, t.topic, t.trace_type, t.activation, t.strength, t.embedding, t.created_at, t.last_accessed, t.labile_until, - COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at + COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at, + COALESCE(t.has_conflict, 0), COALESCE(t.conflict_with, ''), t.conflict_resolved_at FROM traces t WHERE t.activation >= ? AND COALESCE(t.done, 0) = 0 ORDER BY t.activation DESC @@ -210,7 +213,8 @@ func (g *DB) GetTracesBatch(ids []string) (map[string]*Trace, error) { ) as summary, t.topic, t.trace_type, t.activation, t.strength, t.embedding, t.created_at, t.last_accessed, t.labile_until, - COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at + COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at, + COALESCE(t.has_conflict, 0), COALESCE(t.conflict_with, ''), t.conflict_resolved_at FROM traces t WHERE t.id IN (`+string(placeholders)+`) `, args...) @@ -264,7 +268,8 @@ func (g *DB) GetTracesBatchAtLevel(ids []string, level int) (map[string]*Trace, ) as summary, t.topic, t.trace_type, t.activation, t.strength, t.embedding, t.created_at, t.last_accessed, t.labile_until, - COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at + COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at, + COALESCE(t.has_conflict, 0), COALESCE(t.conflict_with, ''), t.conflict_resolved_at FROM traces t WHERE t.id IN (`+string(placeholders)+`) `, args...) @@ -299,7 +304,8 @@ func (g *DB) GetActivatedTracesWithLevel(threshold float64, limit, level int) ([ ) as summary, t.topic, t.trace_type, t.activation, t.strength, t.embedding, t.created_at, t.last_accessed, t.labile_until, - COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at + COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at, + COALESCE(t.has_conflict, 0), COALESCE(t.conflict_with, ''), t.conflict_resolved_at FROM traces t WHERE t.activation >= ? AND COALESCE(t.done, 0) = 0 ORDER BY t.activation DESC @@ -795,7 +801,8 @@ func (g *DB) GetAllTraces() ([]*Trace, error) { ) as summary, t.topic, t.trace_type, t.activation, t.strength, t.embedding, t.created_at, t.last_accessed, t.labile_until, - COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at + COALESCE(t.done, 0), COALESCE(t.resolution, ''), t.done_at, + COALESCE(t.has_conflict, 0), COALESCE(t.conflict_with, ''), t.conflict_resolved_at FROM traces t ORDER BY t.created_at DESC `) @@ -850,11 +857,15 @@ func scanTrace(row *sql.Row) (*Trace, error) { var done sql.NullBool var resolution sql.NullString var doneAt sql.NullTime + var hasConflict sql.NullBool + var conflictWith sql.NullString + var conflictResolvedAt sql.NullTime err := row.Scan( &tr.ID, &tr.ShortID, &summary, &topic, &traceType, &tr.Activation, &tr.Strength, &embeddingBytes, &tr.CreatedAt, &tr.LastAccessed, &labileUntil, &done, &resolution, &doneAt, + &hasConflict, &conflictWith, &conflictResolvedAt, ) if err != nil { if err == sql.ErrNoRows { @@ -881,6 +892,15 @@ func scanTrace(row *sql.Row) (*Trace, error) { if doneAt.Valid { tr.DoneAt = doneAt.Time } + if hasConflict.Valid { + tr.HasConflict = hasConflict.Bool + } + if conflictWith.Valid { + tr.ConflictWith = conflictWith.String + } + if conflictResolvedAt.Valid { + tr.ConflictResolvedAt = conflictResolvedAt.Time + } if len(embeddingBytes) > 0 { json.Unmarshal(embeddingBytes, &tr.Embedding) @@ -902,11 +922,15 @@ func scanTraceRows(rows *sql.Rows) ([]*Trace, error) { var done sql.NullBool var resolution sql.NullString var doneAt sql.NullTime + var hasConflict sql.NullBool + var conflictWith sql.NullString + var conflictResolvedAt sql.NullTime err := rows.Scan( &tr.ID, &tr.ShortID, &summary, &topic, &traceType, &tr.Activation, &tr.Strength, &embeddingBytes, &tr.CreatedAt, &tr.LastAccessed, &labileUntil, &done, &resolution, &doneAt, + &hasConflict, &conflictWith, &conflictResolvedAt, ) if err != nil { continue @@ -930,6 +954,15 @@ func scanTraceRows(rows *sql.Rows) ([]*Trace, error) { if doneAt.Valid { tr.DoneAt = doneAt.Time } + if hasConflict.Valid { + tr.HasConflict = hasConflict.Bool + } + if conflictWith.Valid { + tr.ConflictWith = conflictWith.String + } + if conflictResolvedAt.Valid { + tr.ConflictResolvedAt = conflictResolvedAt.Time + } if len(embeddingBytes) > 0 { json.Unmarshal(embeddingBytes, &tr.Embedding) @@ -1018,6 +1051,42 @@ func (g *DB) MarkTraceDone(traceShortID, resolutionEpisodeShortID string) error return nil } +// GetTraceShortID returns the short_id for a trace by its full ID (lightweight query). +func (g *DB) GetTraceShortID(traceID string) (string, error) { + var shortID string + err := g.db.QueryRow(`SELECT short_id FROM traces WHERE id = ?`, traceID).Scan(&shortID) + return shortID, err +} + +// MarkTraceConflict marks a trace as having a contradiction with another trace. +// conflictWithShortID is appended to the conflict_with CSV if not already present. +// Both traces in a contradicting pair should be marked (caller's responsibility). +func (g *DB) MarkTraceConflict(traceID, conflictWithShortID string) error { + // Read current conflict_with to build the updated CSV + var current sql.NullString + g.db.QueryRow(`SELECT conflict_with FROM traces WHERE id = ?`, traceID).Scan(¤t) + + // Check if conflictWithShortID is already in the list + existing := current.String + if existing != "" { + for _, entry := range strings.Split(existing, ",") { + if strings.TrimSpace(entry) == conflictWithShortID { + return nil // already marked, nothing to do + } + } + } + + newConflictWith := conflictWithShortID + if existing != "" { + newConflictWith = existing + "," + conflictWithShortID + } + + _, err := g.db.Exec(` + UPDATE traces SET has_conflict = 1, conflict_with = ? WHERE id = ? + `, newConflictWith, traceID) + return err +} + // UpdateTrace updates a trace's summary, embedding, type, and strength after reconsolidation func (g *DB) UpdateTrace(traceID, summary string, embedding []float64, traceType TraceType, strength int) error { embeddingJSON, err := json.Marshal(embedding) diff --git a/internal/graph/types.go b/internal/graph/types.go index d30fdf6..a1a16ec 100644 --- a/internal/graph/types.go +++ b/internal/graph/types.go @@ -146,6 +146,11 @@ type Trace struct { Resolution string `json:"resolution,omitempty"` // Episode short_id that resolved this trace DoneAt time.Time `json:"done_at,omitempty"` + // Conflict tracking (set when contradicting traces are detected during consolidation) + HasConflict bool `json:"has_conflict,omitempty"` + ConflictWith string `json:"conflict_with,omitempty"` // CSV of conflicting trace short_ids + ConflictResolvedAt time.Time `json:"conflict_resolved_at,omitempty"` + // Related data (populated on retrieval) SourceIDs []string `json:"source_ids,omitempty"` EntityIDs []string `json:"entity_ids,omitempty"` From 071fc1213e1b643fe5c659f55c6f316576130efb Mon Sep 17 00:00:00 2001 From: Dan Mills Date: Sun, 22 Feb 2026 22:21:02 +0100 Subject: [PATCH 3/5] feat(graph): inject conflict partners into retrieval results (Step 4) When a conflicted trace is retrieved, its conflict partner(s) are now automatically included in the RetrievalResult at the same activation level. This ensures the executive sees both sides of a contradiction in context rather than silently surfacing only one claim. - Added injectConflictPartners() helper in activation.go - Called from both Retrieve() and RetrieveWithContext() after assembly, before the final activation-based sort - TestInjectConflictPartners: verifies injection, activation parity, and idempotency (no duplicate injection on second call) Step 4 of conflict-resolution-design.md. Steps 1-3 were in 90cbb15. Co-Authored-By: Claude Sonnet 4.6 --- internal/graph/activation.go | 46 ++++++++++++++++++++++++-- internal/graph/graph_test.go | 62 ++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/internal/graph/activation.go b/internal/graph/activation.go index b9278c9..df956fc 100644 --- a/internal/graph/activation.go +++ b/internal/graph/activation.go @@ -702,7 +702,11 @@ func (g *DB) Retrieve(queryEmb []float64, queryText string, limit int) (*Retriev result.Traces = append(result.Traces, trace) } - // Re-sort after applying operational bias (may reorder results) + // Inject conflict partners: if a conflicted trace was retrieved, also bring in its counterpart + // so the executive can see both sides of the contradiction in the same context. + g.injectConflictPartners(result) + + // Re-sort after applying operational bias and conflict injection (may reorder results) sort.Slice(result.Traces, func(i, j int) bool { return result.Traces[i].Activation > result.Traces[j].Activation }) @@ -891,7 +895,10 @@ func (g *DB) RetrieveWithContext(queryEmb []float64, queryText string, contextTr result.Traces = append(result.Traces, trace) } - // Re-sort after applying operational bias (may reorder results) + // Inject conflict partners: if a conflicted trace was retrieved, also bring in its counterpart. + g.injectConflictPartners(result) + + // Re-sort after applying operational bias and conflict injection (may reorder results) sort.Slice(result.Traces, func(i, j int) bool { return result.Traces[i].Activation > result.Traces[j].Activation }) @@ -899,6 +906,41 @@ func (g *DB) RetrieveWithContext(queryEmb []float64, queryText string, contextTr return result, nil } +// injectConflictPartners ensures that for any retrieved trace with has_conflict=true, +// its conflict partner(s) are also included in the result. If a conflicted trace surfaces +// during retrieval, the counterpart (which contains the contradicting claim) is fetched and +// added at the same activation level so both sides appear together in context. +func (g *DB) injectConflictPartners(result *RetrievalResult) { + retrievedIDs := make(map[string]bool, len(result.Traces)) + for _, tr := range result.Traces { + retrievedIDs[tr.ID] = true + } + + var toAdd []*Trace + for _, tr := range result.Traces { + if !tr.HasConflict || tr.ConflictWith == "" { + continue + } + for _, shortID := range strings.Split(tr.ConflictWith, ",") { + shortID = strings.TrimSpace(shortID) + if shortID == "" { + continue + } + partner, err := g.GetTraceByShortID(shortID) + if err != nil || partner == nil { + continue + } + if retrievedIDs[partner.ID] { + continue + } + retrievedIDs[partner.ID] = true + partner.Activation = tr.Activation + toAdd = append(toAdd, partner) + } + } + result.Traces = append(result.Traces, toAdd...) +} + // applyLateralInhibition applies Synapse-style lateral inhibition // Top M winners suppress competitors: û_i = max(0, u_i - β * Σ(u_k - u_i) for u_k > u_i) func applyLateralInhibition(activation map[string]float64) map[string]float64 { diff --git a/internal/graph/graph_test.go b/internal/graph/graph_test.go index 88e1829..426975f 100644 --- a/internal/graph/graph_test.go +++ b/internal/graph/graph_test.go @@ -1170,3 +1170,65 @@ func TestMarkTraceConflict(t *testing.T) { t.Errorf("Expected short_id 'tr_drk', got %q", shortID) } } + +// TestInjectConflictPartners verifies that retrieval results are augmented with +// conflict partner traces when a conflicted trace is retrieved. +func TestInjectConflictPartners(t *testing.T) { + db, cleanup := setupTestDB(t) + defer cleanup() + + // Add two conflicting traces + trA := &Trace{ID: "trace-dark-mode", ShortID: "tr_dA", Summary: "User prefers dark mode", Activation: 0.8} + trB := &Trace{ID: "trace-light-mode", ShortID: "tr_lB", Summary: "User switched to light mode", Activation: 0.6} + + if err := addTestTrace(t, db, trA); err != nil { + t.Fatalf("AddTrace(A): %v", err) + } + if err := addTestTrace(t, db, trB); err != nil { + t.Fatalf("AddTrace(B): %v", err) + } + if err := db.MarkTraceConflict("trace-dark-mode", "tr_lB"); err != nil { + t.Fatalf("MarkTraceConflict(A): %v", err) + } + if err := db.MarkTraceConflict("trace-light-mode", "tr_dA"); err != nil { + t.Fatalf("MarkTraceConflict(B): %v", err) + } + + // Fetch A to confirm it has has_conflict set + fetchedA, err := db.GetTrace("trace-dark-mode") + if err != nil || fetchedA == nil { + t.Fatalf("GetTrace(A): %v", err) + } + fetchedA.Activation = 0.75 + + // Simulate a retrieval result that only contains trace A + result := &RetrievalResult{Traces: []*Trace{fetchedA}} + + // injectConflictPartners should add trace B + db.injectConflictPartners(result) + + if len(result.Traces) != 2 { + t.Fatalf("Expected 2 traces after injection, got %d", len(result.Traces)) + } + + // Find the injected partner + var partner *Trace + for _, tr := range result.Traces { + if tr.ID == "trace-light-mode" { + partner = tr + } + } + if partner == nil { + t.Fatal("Expected trace-light-mode to be injected as conflict partner") + } + // Partner should have the same activation as the conflicting trace + if partner.Activation != fetchedA.Activation { + t.Errorf("Expected partner activation=%.2f, got %.2f", fetchedA.Activation, partner.Activation) + } + + // Already-present partners should not be duplicated + db.injectConflictPartners(result) + if len(result.Traces) != 2 { + t.Errorf("Second inject should not add duplicates, got %d traces", len(result.Traces)) + } +} From 0ec307c2a3536b86666b06f61221e65132f930f8 Mon Sep 17 00:00:00 2001 From: Dan Mills Date: Sun, 22 Feb 2026 22:54:31 +0100 Subject: [PATCH 4/5] Step 5: format conflicted trace pairs in context assembly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a conflicted trace is retrieved, context now shows both sides of the contradiction grouped together with a [CONFLICT] label so the executive can notice and ask for clarification. - focus.MemorySummary: add ShortID, HasConflict, ConflictWith fields - engram.Trace (HTTP client): add ShortID, HasConflict, ConflictWith for JSON deserialization - buildPrompt(): conflict-aware memory formatting — groups pairs, marks the later trace as "contradicts above", falls back to plain format if partner is absent from result set - buildprompt_test.go: 3 tests (conflict pair, normal trace, orphaned conflict) All 21 packages pass. Co-Authored-By: Claude Sonnet 4.6 --- internal/engram/client.go | 4 + internal/executive/buildprompt_test.go | 141 +++++++++++++++++++++++++ internal/executive/executive_v2.go | 64 ++++++++--- internal/focus/types.go | 11 +- 4 files changed, 204 insertions(+), 16 deletions(-) create mode 100644 internal/executive/buildprompt_test.go diff --git a/internal/engram/client.go b/internal/engram/client.go index 4054bc7..f83d306 100644 --- a/internal/engram/client.go +++ b/internal/engram/client.go @@ -68,6 +68,7 @@ type Entity struct { // JSON field names match Engram's "engram" type. type Trace struct { ID string `json:"id"` + ShortID string `json:"short_id,omitempty"` Summary string `json:"summary"` Topic string `json:"topic,omitempty"` TraceType string `json:"engram_type,omitempty"` @@ -79,6 +80,9 @@ type Trace struct { LabileUntil time.Time `json:"labile_until,omitempty"` SourceIDs []string `json:"source_ids,omitempty"` EntityIDs []string `json:"entity_ids,omitempty"` + // Conflict tracking (populated when contradicting traces are detected during consolidation) + HasConflict bool `json:"has_conflict,omitempty"` + ConflictWith string `json:"conflict_with,omitempty"` // CSV of conflicting trace short_ids } // TraceContext holds a trace with its source episodes and linked entities. diff --git a/internal/executive/buildprompt_test.go b/internal/executive/buildprompt_test.go new file mode 100644 index 0000000..b81599f --- /dev/null +++ b/internal/executive/buildprompt_test.go @@ -0,0 +1,141 @@ +package executive + +import ( + "strings" + "testing" + "time" + + "github.com/vthunder/bud2/internal/focus" +) + +// newTestExecutive creates a minimal ExecutiveV2 for prompt-building tests. +// memory and reflexLog are nil; statePath is a temp dir. +func newTestExecutive(t *testing.T) *ExecutiveV2 { + t.Helper() + statePath := t.TempDir() + return NewExecutiveV2(nil, nil, statePath, ExecutiveV2Config{}) +} + +// TestBuildPrompt_ConflictFormatting verifies that conflicting trace pairs are +// grouped together with a "[CONFLICT]" label and the "contradicts above" annotation. +func TestBuildPrompt_ConflictFormatting(t *testing.T) { + exec := newTestExecutive(t) + + ts := time.Date(2026, 1, 15, 0, 0, 0, 0, time.UTC) + ts2 := time.Date(2026, 1, 16, 0, 0, 0, 0, time.UTC) + + // Trace A has short_id "abc12", says "prefers dark mode", and conflicts with "def34" + // Trace B has short_id "def34", says "switched to light mode", and conflicts with "abc12" + bundle := &focus.ContextBundle{ + Memories: []focus.MemorySummary{ + { + ID: "trace-a-full-id", + ShortID: "abc12", + Summary: "User prefers dark mode", + Relevance: 0.9, + Timestamp: ts, + HasConflict: true, + ConflictWith: "def34", + }, + { + ID: "trace-b-full-id", + ShortID: "def34", + Summary: "User switched to light mode", + Relevance: 0.8, + Timestamp: ts2, + HasConflict: true, + ConflictWith: "abc12", + }, + }, + } + + out := exec.buildPrompt(bundle) + + // Should contain CONFLICT label + if !strings.Contains(out, "[CONFLICT]") { + t.Errorf("expected [CONFLICT] label in output, got:\n%s", out) + } + // Should contain both summaries + if !strings.Contains(out, "User prefers dark mode") { + t.Errorf("expected trace A summary in output, got:\n%s", out) + } + if !strings.Contains(out, "User switched to light mode") { + t.Errorf("expected trace B summary in output, got:\n%s", out) + } + // Should contain "contradicts above" annotation + if !strings.Contains(out, "contradicts above") { + t.Errorf("expected 'contradicts above' annotation, got:\n%s", out) + } + // Should NOT format either trace as a plain memory line (no double-listing) + // Count occurrences of the summaries - each should appear exactly once + countA := strings.Count(out, "User prefers dark mode") + countB := strings.Count(out, "User switched to light mode") + if countA != 1 { + t.Errorf("trace A summary should appear exactly once, got %d times", countA) + } + if countB != 1 { + t.Errorf("trace B summary should appear exactly once, got %d times", countB) + } +} + +// TestBuildPrompt_NonConflictFormatting verifies that normal (non-conflicted) memories +// are still formatted in the standard "[displayID] [timeStr] summary" style. +func TestBuildPrompt_NonConflictFormatting(t *testing.T) { + exec := newTestExecutive(t) + + ts := time.Date(2026, 1, 15, 0, 0, 0, 0, time.UTC) + + bundle := &focus.ContextBundle{ + Memories: []focus.MemorySummary{ + { + ID: "trace-normal-id", + ShortID: "aa111", + Summary: "User prefers vim keybindings", + Relevance: 0.7, + Timestamp: ts, + }, + }, + } + + out := exec.buildPrompt(bundle) + + if strings.Contains(out, "[CONFLICT]") { + t.Errorf("unexpected [CONFLICT] label for non-conflicted memory, got:\n%s", out) + } + if !strings.Contains(out, "User prefers vim keybindings") { + t.Errorf("expected summary in output, got:\n%s", out) + } +} + +// TestBuildPrompt_ConflictWithMissingPartner verifies that a conflicted trace whose +// partner is NOT in the retrieved set still renders as a normal (non-paired) memory. +func TestBuildPrompt_ConflictWithMissingPartner(t *testing.T) { + exec := newTestExecutive(t) + + ts := time.Date(2026, 1, 15, 0, 0, 0, 0, time.UTC) + + bundle := &focus.ContextBundle{ + Memories: []focus.MemorySummary{ + { + ID: "trace-orphan-id", + ShortID: "xxx99", + Summary: "User prefers dark mode", + Relevance: 0.9, + Timestamp: ts, + HasConflict: true, + ConflictWith: "yyy00", // partner not in result set + }, + }, + } + + out := exec.buildPrompt(bundle) + + // No CONFLICT label since partner is absent + if strings.Contains(out, "[CONFLICT]") { + t.Errorf("unexpected [CONFLICT] label when partner is absent, got:\n%s", out) + } + // Still shows the memory + if !strings.Contains(out, "User prefers dark mode") { + t.Errorf("expected orphan conflict summary in output, got:\n%s", out) + } +} diff --git a/internal/executive/executive_v2.go b/internal/executive/executive_v2.go index a4dc2cf..605f391 100644 --- a/internal/executive/executive_v2.go +++ b/internal/executive/executive_v2.go @@ -472,10 +472,13 @@ func (e *ExecutiveV2) buildContext(item *focus.PendingItem) *focus.ContextBundle if err == nil && result != nil { for _, t := range result.Traces { allMemories = append(allMemories, focus.MemorySummary{ - ID: t.ID, - Summary: t.Summary, - Relevance: t.Activation, - Timestamp: t.CreatedAt, + ID: t.ID, + ShortID: t.ShortID, + Summary: t.Summary, + Relevance: t.Activation, + Timestamp: t.CreatedAt, + HasConflict: t.HasConflict, + ConflictWith: t.ConflictWith, }) } } @@ -485,10 +488,13 @@ func (e *ExecutiveV2) buildContext(item *focus.PendingItem) *focus.ContextBundle if err == nil { for _, t := range traces { allMemories = append(allMemories, focus.MemorySummary{ - ID: t.ID, - Summary: t.Summary, - Relevance: t.Activation, - Timestamp: t.CreatedAt, + ID: t.ID, + ShortID: t.ShortID, + Summary: t.Summary, + Relevance: t.Activation, + Timestamp: t.CreatedAt, + HasConflict: t.HasConflict, + ConflictWith: t.ConflictWith, }) } } @@ -747,12 +753,46 @@ func (e *ExecutiveV2) buildPrompt(bundle *focus.ContextBundle) string { sort.Slice(bundle.Memories, func(i, j int) bool { return bundle.Memories[i].Timestamp.Before(bundle.Memories[j].Timestamp) }) - // Assign display IDs using BLAKE3 short hash for content-addressable IDs - // The memory ID map is reset at the start of each SendPrompt - for _, mem := range bundle.Memories { + // Build shortID → index map for conflict partner lookup + shortIDToIdx := make(map[string]int, len(bundle.Memories)) + for i, mem := range bundle.Memories { + if mem.ShortID != "" { + shortIDToIdx[mem.ShortID] = i + } + } + // Format memories; conflicted pairs are grouped together with a CONFLICT label + formatted := make([]bool, len(bundle.Memories)) + for i, mem := range bundle.Memories { + if formatted[i] { + continue + } + formatted[i] = true displayID := e.session.GetOrAssignMemoryID(mem.ID) - // Format timestamp as relative time if recent, otherwise as date timeStr := formatMemoryTimestamp(mem.Timestamp) + if mem.HasConflict && mem.ConflictWith != "" { + // Check if any conflict partners are present in this result set + var partnerIdxs []int + for _, psid := range strings.Split(mem.ConflictWith, ",") { + psid = strings.TrimSpace(psid) + if psid == "" { + continue + } + if pidx, ok := shortIDToIdx[psid]; ok && !formatted[pidx] { + partnerIdxs = append(partnerIdxs, pidx) + } + } + if len(partnerIdxs) > 0 { + prompt.WriteString(fmt.Sprintf("- [CONFLICT] [%s] [%s] %s\n", displayID, timeStr, mem.Summary)) + for _, pidx := range partnerIdxs { + partner := bundle.Memories[pidx] + formatted[pidx] = true + pDisplayID := e.session.GetOrAssignMemoryID(partner.ID) + pTimeStr := formatMemoryTimestamp(partner.Timestamp) + prompt.WriteString(fmt.Sprintf(" ↔ [%s] [%s] %s (contradicts above)\n", pDisplayID, pTimeStr, partner.Summary)) + } + continue + } + } prompt.WriteString(fmt.Sprintf("- [%s] [%s] %s\n", displayID, timeStr, mem.Summary)) } } diff --git a/internal/focus/types.go b/internal/focus/types.go index 9afc94d..cd260e2 100644 --- a/internal/focus/types.go +++ b/internal/focus/types.go @@ -69,10 +69,13 @@ type ContextBundle struct { // MemorySummary is a simplified view of a memory trace for context type MemorySummary struct { - ID string `json:"id"` - Summary string `json:"summary"` - Relevance float64 `json:"relevance"` // How relevant to current focus - Timestamp time.Time `json:"timestamp"` // When the memory was created or last accessed + ID string `json:"id"` + ShortID string `json:"short_id"` // 5-char short ID for conflict lookup + Summary string `json:"summary"` + Relevance float64 `json:"relevance"` // How relevant to current focus + Timestamp time.Time `json:"timestamp"` // When the memory was created or last accessed + HasConflict bool `json:"has_conflict,omitempty"` // True when a contradicting trace exists + ConflictWith string `json:"conflict_with,omitempty"` // CSV of conflicting trace short_ids } // ReflexActivity represents a recent reflex action for context From 80187e8ea939dd37fe3e2a74532ff51331a2ecb9 Mon Sep 17 00:00:00 2001 From: Dan Mills Date: Sun, 22 Feb 2026 23:25:12 +0100 Subject: [PATCH 5/5] feat(engram): use POST /v1/engrams/search for semantic search Switch Client.Search() from GET /v1/engrams?query=... to POST /v1/engrams/search with a JSON body. This matches the updated Engram API design where query params are avoided for search requests that can carry arbitrarily large payloads. Co-Authored-By: Claude Sonnet 4.6 --- internal/engram/client.go | 11 ++++++----- internal/engram/client_test.go | 15 +++++++++++---- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/internal/engram/client.go b/internal/engram/client.go index f83d306..732add4 100644 --- a/internal/engram/client.go +++ b/internal/engram/client.go @@ -181,14 +181,15 @@ func (c *Client) Consolidate() (*ConsolidateResult, error) { // limit <= 0 uses the server default (10). // Returns a RetrievalResult with Traces populated; Episodes and Entities are empty. func (c *Client) Search(query string, limit int) (*RetrievalResult, error) { - params := url.Values{} - params.Set("query", query) - params.Set("detail", "full") + body := map[string]any{ + "query": query, + "detail": "full", + } if limit > 0 { - params.Set("limit", strconv.Itoa(limit)) + body["limit"] = limit } var traces []*Trace - if err := c.get("/v1/engrams", params, &traces); err != nil { + if err := c.post("/v1/engrams/search", body, &traces); err != nil { return nil, err } return &RetrievalResult{Traces: traces}, nil diff --git a/internal/engram/client_test.go b/internal/engram/client_test.go index fcb81f2..559de34 100644 --- a/internal/engram/client_test.go +++ b/internal/engram/client_test.go @@ -94,13 +94,20 @@ func TestIngestThought(t *testing.T) { func TestSearch(t *testing.T) { c := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/v1/engrams" { + if r.URL.Path != "/v1/engrams/search" { t.Errorf("unexpected path: %s", r.URL.Path) } - if r.URL.Query().Get("query") != "test query" { - t.Errorf("expected query param 'test query', got %q", r.URL.Query().Get("query")) + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + var body map[string]any + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("failed to decode request body: %v", err) + } + if body["query"] != "test query" { + t.Errorf("expected query 'test query', got %q", body["query"]) } - if r.URL.Query().Get("detail") != "full" { + if body["detail"] != "full" { t.Errorf("expected detail=full") } traces := []*Trace{{ID: "tr-1", Summary: "a trace"}}