diff --git a/pkg/runtime/compactor/compactor.go b/pkg/runtime/compactor/compactor.go index e4addce95..cd0de6d53 100644 --- a/pkg/runtime/compactor/compactor.go +++ b/pkg/runtime/compactor/compactor.go @@ -153,26 +153,93 @@ func RunLLM(ctx context.Context, args LLMArgs) (*Result, error) { // [maxKeepTokens] window. Used by the runtime when a hook supplies // its own summary so the kept-tail policy stays consistent across // the two strategies. -func ComputeFirstKeptEntry(sess *session.Session, a *agent.Agent) int { - return mapToSessionIndex(sess, compaction.SplitIndexForKeep(nonSystemMessages(sess, a), maxKeepTokens)) +func ComputeFirstKeptEntry(sess *session.Session, _ *agent.Agent) int { + refs := nonSystemMessageRefs(sess) + messages := messagesFromRefs(refs) + splitIdx := compaction.SplitIndexForKeep(messages, maxKeepTokens) + splitIdx = skipSyntheticKeepBoundary(refs, splitIdx) + return firstKeptEntryFromRefs(refs, splitIdx) +} + +const syntheticSessionIndex = -1 + +type messageRef struct { + message chat.Message + sessionIndex int } -// nonSystemMessages returns the agent-visible messages in sess with -// the system entries filtered out. Both the LLM strategy (via -// [extractMessages]) and the hook-supplied path (via -// [ComputeFirstKeptEntry]) operate on this same shape, which is also -// what [compaction.SplitIndexForKeep] expects. -func nonSystemMessages(sess *session.Session, a *agent.Agent) []chat.Message { - var messages []chat.Message - for _, msg := range sess.GetMessages(a) { - if msg.Role == chat.MessageRoleSystem { +// nonSystemMessageRefs reconstructs the compactable conversation as the latest +// summary (if any) followed by the messages visible after that summary, while +// retaining provenance back to sess.Messages for real messages. The provenance +// is critical after a prior compaction: the previous summary is synthetic, so +// indexes in the reconstructed message list no longer line up with raw session +// indexes. +func nonSystemMessageRefs(sess *session.Session) []messageRef { + items := sess.SnapshotItems() + + lastSummaryIndex := -1 + for i := len(items) - 1; i >= 0; i-- { + if items[i].Summary != "" { + lastSummaryIndex = i + break + } + } + + var refs []messageRef + if lastSummaryIndex >= 0 { + refs = append(refs, messageRef{ + message: chat.Message{ + Role: chat.MessageRoleUser, + Content: "Session Summary: " + items[lastSummaryIndex].Summary, + CreatedAt: time.Now().Format(time.RFC3339), + }, + sessionIndex: syntheticSessionIndex, + }) + } + + startIndex := lastSummaryIndex + 1 + if lastSummaryIndex >= 0 { + kept := items[lastSummaryIndex].FirstKeptEntry + if kept >= 0 && kept < lastSummaryIndex { + startIndex = kept + } + } + + for i := startIndex; i < len(items); i++ { + item := items[i] + if !item.IsMessage() || item.Message.Message.Role == chat.MessageRoleSystem { continue } - messages = append(messages, msg) + refs = append(refs, messageRef{message: item.Message.Message, sessionIndex: i}) + } + + return refs +} + +func messagesFromRefs(refs []messageRef) []chat.Message { + messages := make([]chat.Message, len(refs)) + for i, ref := range refs { + messages[i] = ref.message } return messages } +func skipSyntheticKeepBoundary(refs []messageRef, splitIdx int) int { + for splitIdx < len(refs) && refs[splitIdx].sessionIndex == syntheticSessionIndex { + splitIdx++ + } + return splitIdx +} + +func firstKeptEntryFromRefs(refs []messageRef, splitIdx int) int { + for i := splitIdx; i < len(refs); i++ { + if refs[i].sessionIndex >= 0 { + return refs[i].sessionIndex + } + } + return -1 +} + // extractMessages returns the messages to send to the compaction // model, plus the index (into sess.Messages) of the first message // that is kept verbatim after compaction. The caller is responsible @@ -188,8 +255,9 @@ func nonSystemMessages(sess *session.Session, a *agent.Agent) []chat.Message { // If the conversation tail itself doesn't fit in // (contextLimit − MaxSummaryTokens − prompt-overhead), older messages // are dropped from the front of the to-compact list to make room. -func extractMessages(sess *session.Session, a *agent.Agent, contextLimit int64, additionalPrompt string) ([]chat.Message, int) { - messages := nonSystemMessages(sess, a) +func extractMessages(sess *session.Session, _ *agent.Agent, contextLimit int64, additionalPrompt string) ([]chat.Message, int) { + refs := nonSystemMessageRefs(sess) + messages := messagesFromRefs(refs) // Clear Cost and CacheControl on our local copy of the conversation. // Cost is per-message bookkeeping that's already accumulated into // sess.TotalCost(); leaving it set would double-count when the @@ -204,7 +272,8 @@ func extractMessages(sess *session.Session, a *agent.Agent, contextLimit int64, } splitIdx := compaction.SplitIndexForKeep(messages, maxKeepTokens) - firstKeptEntry := mapToSessionIndex(sess, splitIdx) + splitIdx = skipSyntheticKeepBoundary(refs, splitIdx) + firstKeptEntry := firstKeptEntryFromRefs(refs, splitIdx) messages = messages[:splitIdx] systemPromptMessage := chat.Message{ diff --git a/pkg/runtime/compactor/compactor_test.go b/pkg/runtime/compactor/compactor_test.go index 301dfaa0f..48e36ca83 100644 --- a/pkg/runtime/compactor/compactor_test.go +++ b/pkg/runtime/compactor/compactor_test.go @@ -165,10 +165,10 @@ func TestComputeFirstKeptEntry(t *testing.T) { a := agent.New("test", "") - t.Run("empty session returns 0", func(t *testing.T) { + t.Run("empty session returns sentinel", func(t *testing.T) { t.Parallel() sess := session.New() - assert.Equal(t, 0, ComputeFirstKeptEntry(sess, a)) + assert.Equal(t, -1, ComputeFirstKeptEntry(sess, a)) }) t.Run("short conversation: split at end (compact everything)", func(t *testing.T) { @@ -178,10 +178,62 @@ func TestComputeFirstKeptEntry(t *testing.T) { session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "hi"}}), session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "hello"}}), })) - assert.Equal(t, len(sess.Messages), ComputeFirstKeptEntry(sess, a)) + assert.Equal(t, -1, ComputeFirstKeptEntry(sess, a)) }) } +func TestComputeFirstKeptEntry_AfterPriorCompactionAllowsZeroKeptEntry(t *testing.T) { + t.Parallel() + + var items []session.Item + for i := range 4 { + items = append(items, session.NewMessageItem(&session.Message{ + Message: chat.Message{ + Role: chat.MessageRoleUser, + Content: strings.Repeat(string(rune('a'+i)), 40000), // ~10k tokens each + }, + })) + } + items = append(items, session.Item{ + Summary: "previous summary", + FirstKeptEntry: 0, + }) + + sess := session.New(session.WithMessages(items)) + a := agent.New("test", "") + + assert.Equal(t, 3, ComputeFirstKeptEntry(sess, a)) +} + +func TestComputeFirstKeptEntry_AfterPriorCompactionUsesRawSessionIndexes(t *testing.T) { + t.Parallel() + + var items []session.Item + for i := range 10 { + items = append(items, session.NewMessageItem(&session.Message{ + Message: chat.Message{ + Role: chat.MessageRoleUser, + Content: strings.Repeat(string(rune('a'+i)), 40000), // ~10k tokens each + }, + })) + } + items = append(items, session.Item{ + Summary: "previous summary", + FirstKeptEntry: 8, + }) + + sess := session.New(session.WithMessages(items)) + a := agent.New("test", "") + + // The reconstructed conversation seen by compaction is: + // [synthetic previous summary, raw message 8, raw message 9] + // With ~10k-token raw messages and a 20k keep window, the kept suffix + // starts at reconstructed index 2, which must map back to raw session + // index 9. A simple filtered-index-to-raw-index mapping would return 2 + // and resurrect old messages that were already covered by the summary. + assert.Equal(t, 9, ComputeFirstKeptEntry(sess, a)) +} + func TestMapToSessionIndex(t *testing.T) { t.Parallel() diff --git a/pkg/runtime/session_compaction_test.go b/pkg/runtime/session_compaction_test.go index a1c067b40..2e72676cc 100644 --- a/pkg/runtime/session_compaction_test.go +++ b/pkg/runtime/session_compaction_test.go @@ -74,7 +74,7 @@ func TestSessionGetMessages_SummaryWithoutFirstKeptEntry(t *testing.T) { session.NewMessageItem(&session.Message{ Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "m2"}, }), - {Summary: "This is a summary"}, + {Summary: "This is a summary", FirstKeptEntry: -1}, session.NewMessageItem(&session.Message{ Message: chat.Message{Role: chat.MessageRoleUser, Content: "m3"}, }), diff --git a/pkg/session/branch.go b/pkg/session/branch.go index a7442a1ec..bfc60c51a 100644 --- a/pkg/session/branch.go +++ b/pkg/session/branch.go @@ -52,7 +52,7 @@ func cloneSessionItem(item Item) (Item, error) { } return Item{SubSession: clonedSub}, nil case item.Summary != "": - return Item{Summary: item.Summary, Cost: item.Cost}, nil + return Item{Summary: item.Summary, FirstKeptEntry: item.FirstKeptEntry, Cost: item.Cost}, nil default: return Item{}, errors.New("cannot clone empty session item") } diff --git a/pkg/session/branch_test.go b/pkg/session/branch_test.go index 33e9384af..9e2e036a0 100644 --- a/pkg/session/branch_test.go +++ b/pkg/session/branch_test.go @@ -85,10 +85,11 @@ func TestCloneSessionItem(t *testing.T) { }) t.Run("summary item clones successfully", func(t *testing.T) { - item := Item{Summary: "test summary"} + item := Item{Summary: "test summary", FirstKeptEntry: 2} cloned, err := cloneSessionItem(item) require.NoError(t, err) assert.Equal(t, "test summary", cloned.Summary) + assert.Equal(t, 2, cloned.FirstKeptEntry) }) } diff --git a/pkg/session/migrations.go b/pkg/session/migrations.go index df5eeb8fd..8881d3a8e 100644 --- a/pkg/session/migrations.go +++ b/pkg/session/migrations.go @@ -398,7 +398,7 @@ func getAllMigrations() []Migration { ID: 21, Name: "021_add_first_kept_entry_column", Description: "Add first_kept_entry column to session_items for compaction-preserved messages", - UpSQL: `ALTER TABLE session_items ADD COLUMN first_kept_entry INTEGER DEFAULT 0`, + UpSQL: `ALTER TABLE session_items ADD COLUMN first_kept_entry INTEGER DEFAULT -1`, }, } } diff --git a/pkg/session/session.go b/pkg/session/session.go index cd2585cbf..621c60cfd 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -354,6 +354,14 @@ func (s *Session) snapshotItems() []Item { return items } +// SnapshotItems returns a copy of s.Messages safe to use without holding +// s.mu. Each Message value is deep-copied so concurrent UpdateMessage calls +// cannot mutate the snapshot; non-Message fields (Summary, SubSession, Cost, +// FirstKeptEntry) are shallow-copied since they are not mutated in place. +func (s *Session) SnapshotItems() []Item { + return s.snapshotItems() +} + // cloneChatMessage returns a deep copy of a chat.Message, duplicating // all slice and pointer fields that would otherwise alias the original. func cloneChatMessage(m chat.Message) chat.Message { @@ -902,7 +910,7 @@ func buildSessionSummaryMessages(items []Item) ([]chat.Message, int) { startIndex := lastSummaryIndex + 1 if lastSummaryIndex >= 0 { kept := items[lastSummaryIndex].FirstKeptEntry - if kept > 0 && kept < lastSummaryIndex { + if kept >= 0 && kept < lastSummaryIndex { startIndex = kept } } diff --git a/pkg/session/session_test.go b/pkg/session/session_test.go index 9212598e9..3029f6684 100644 --- a/pkg/session/session_test.go +++ b/pkg/session/session_test.go @@ -128,7 +128,7 @@ func TestGetMessagesWithSummary(t *testing.T) { Content: "first response", })) - s.Messages = append(s.Messages, Item{Summary: "This is a summary of the conversation so far"}) + s.Messages = append(s.Messages, Item{Summary: "This is a summary of the conversation so far", FirstKeptEntry: -1}) s.AddMessage(NewAgentMessage("", &chat.Message{ Role: chat.MessageRoleUser, @@ -161,6 +161,32 @@ func TestGetMessagesWithSummary(t *testing.T) { assert.Equal(t, 3, userAssistantMessages, "should only include messages after summary") } +func TestGetMessagesWithSummaryAndZeroFirstKeptEntry(t *testing.T) { + testAgent := &agent.Agent{} + + s := New(WithMessages([]Item{ + NewMessageItem(&Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "m1"}}), + NewMessageItem(&Message{Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "m2"}}), + NewMessageItem(&Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "m3"}}), + {Summary: "summary", FirstKeptEntry: 0}, + })) + + messages := s.GetMessages(testAgent) + + var conversationMessages []chat.Message + for _, msg := range messages { + if msg.Role != chat.MessageRoleSystem { + conversationMessages = append(conversationMessages, msg) + } + } + + require.Len(t, conversationMessages, 4) + assert.Equal(t, "Session Summary: summary", conversationMessages[0].Content) + assert.Equal(t, "m1", conversationMessages[1].Content) + assert.Equal(t, "m2", conversationMessages[2].Content) + assert.Equal(t, "m3", conversationMessages[3].Content) +} + func TestGetMessages_Instructions(t *testing.T) { testAgent := agent.New("root", "instructions") @@ -201,7 +227,7 @@ func TestGetMessages_CacheControlWithSummary(t *testing.T) { ) s := New() - s.Messages = append(s.Messages, Item{Summary: "Test summary"}) + s.Messages = append(s.Messages, Item{Summary: "Test summary", FirstKeptEntry: -1}) extra := chat.Message{ Role: chat.MessageRoleSystem, diff --git a/pkg/session/store.go b/pkg/session/store.go index 7dedd5a39..73a2fd756 100644 --- a/pkg/session/store.go +++ b/pkg/session/store.go @@ -689,7 +689,7 @@ type sessionItemRow struct { // loadSession when resolving sub-sessions inside a transaction. func (s *SQLiteSessionStore) loadSessionItems(ctx context.Context, q querier, sessionID string) ([]Item, error) { rows, err := q.QueryContext(ctx, - `SELECT position, item_type, agent_name, message_json, implicit, subsession_id, summary_text, COALESCE(first_kept_entry, 0) + `SELECT position, item_type, agent_name, message_json, implicit, subsession_id, summary_text, COALESCE(first_kept_entry, -1) FROM session_items WHERE session_id = ? ORDER BY position`, sessionID) if err != nil { return nil, err