Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 84 additions & 15 deletions pkg/runtime/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,26 +153,93 @@ func RunLLM(ctx context.Context, args LLMArgs) (*Result, error) {
// [maxKeepTokens] window. Used by the runtime when a hook supplies
// its own summary so the kept-tail policy stays consistent across
// the two strategies.
func ComputeFirstKeptEntry(sess *session.Session, a *agent.Agent) int {
return mapToSessionIndex(sess, compaction.SplitIndexForKeep(nonSystemMessages(sess, a), maxKeepTokens))
func ComputeFirstKeptEntry(sess *session.Session, _ *agent.Agent) int {
refs := nonSystemMessageRefs(sess)
messages := messagesFromRefs(refs)
splitIdx := compaction.SplitIndexForKeep(messages, maxKeepTokens)
splitIdx = skipSyntheticKeepBoundary(refs, splitIdx)
return firstKeptEntryFromRefs(refs, splitIdx)
}

const syntheticSessionIndex = -1

type messageRef struct {
message chat.Message
sessionIndex int
}

// nonSystemMessages returns the agent-visible messages in sess with
// the system entries filtered out. Both the LLM strategy (via
// [extractMessages]) and the hook-supplied path (via
// [ComputeFirstKeptEntry]) operate on this same shape, which is also
// what [compaction.SplitIndexForKeep] expects.
func nonSystemMessages(sess *session.Session, a *agent.Agent) []chat.Message {
var messages []chat.Message
for _, msg := range sess.GetMessages(a) {
if msg.Role == chat.MessageRoleSystem {
// nonSystemMessageRefs reconstructs the compactable conversation as the latest
// summary (if any) followed by the messages visible after that summary, while
// retaining provenance back to sess.Messages for real messages. The provenance
// is critical after a prior compaction: the previous summary is synthetic, so
// indexes in the reconstructed message list no longer line up with raw session
// indexes.
func nonSystemMessageRefs(sess *session.Session) []messageRef {
items := sess.SnapshotItems()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old path went through sess.GetMessages(a) which applied agent-specific transformations (e.g. injecting the system prompt, flattening sub-sessions, etc.). SnapshotItems() bypasses all of that. For compaction purposes filtering by role is sufficient — but it would be worth a quick audit confirming there are no agent-level transformations in GetMessages the compactor was implicitly relying on (e.g. content rewriting or implicit messages). If there are none, a brief inline comment here noting the intentional bypass would make future readers more confident this is deliberate.


lastSummaryIndex := -1
for i := len(items) - 1; i >= 0; i-- {
if items[i].Summary != "" {
lastSummaryIndex = i
break
}
}

var refs []messageRef
if lastSummaryIndex >= 0 {
refs = append(refs, messageRef{
message: chat.Message{
Role: chat.MessageRoleUser,
Content: "Session Summary: " + items[lastSummaryIndex].Summary,
CreatedAt: time.Now().Format(time.RFC3339),
},
sessionIndex: syntheticSessionIndex,
})
}

startIndex := lastSummaryIndex + 1
if lastSummaryIndex >= 0 {
kept := items[lastSummaryIndex].FirstKeptEntry
if kept >= 0 && kept < lastSummaryIndex {
startIndex = kept
}
}

for i := startIndex; i < len(items); i++ {
item := items[i]
if !item.IsMessage() || item.Message.Message.Role == chat.MessageRoleSystem {
continue
}
messages = append(messages, msg)
refs = append(refs, messageRef{message: item.Message.Message, sessionIndex: i})
}

return refs
}

func messagesFromRefs(refs []messageRef) []chat.Message {
messages := make([]chat.Message, len(refs))
for i, ref := range refs {
messages[i] = ref.message
}
return messages
}

func skipSyntheticKeepBoundary(refs []messageRef, splitIdx int) int {
for splitIdx < len(refs) && refs[splitIdx].sessionIndex == syntheticSessionIndex {
splitIdx++
}
return splitIdx
}

func firstKeptEntryFromRefs(refs []messageRef, splitIdx int) int {
for i := splitIdx; i < len(refs); i++ {
if refs[i].sessionIndex >= 0 {
return refs[i].sessionIndex
}
}
return -1
}

// extractMessages returns the messages to send to the compaction
// model, plus the index (into sess.Messages) of the first message
// that is kept verbatim after compaction. The caller is responsible
Expand All @@ -188,8 +255,9 @@ func nonSystemMessages(sess *session.Session, a *agent.Agent) []chat.Message {
// If the conversation tail itself doesn't fit in
// (contextLimit − MaxSummaryTokens − prompt-overhead), older messages
// are dropped from the front of the to-compact list to make room.
func extractMessages(sess *session.Session, a *agent.Agent, contextLimit int64, additionalPrompt string) ([]chat.Message, int) {
messages := nonSystemMessages(sess, a)
func extractMessages(sess *session.Session, _ *agent.Agent, contextLimit int64, additionalPrompt string) ([]chat.Message, int) {
refs := nonSystemMessageRefs(sess)
messages := messagesFromRefs(refs)
// Clear Cost and CacheControl on our local copy of the conversation.
// Cost is per-message bookkeeping that's already accumulated into
// sess.TotalCost(); leaving it set would double-count when the
Expand All @@ -204,7 +272,8 @@ func extractMessages(sess *session.Session, a *agent.Agent, contextLimit int64,
}

splitIdx := compaction.SplitIndexForKeep(messages, maxKeepTokens)
firstKeptEntry := mapToSessionIndex(sess, splitIdx)
splitIdx = skipSyntheticKeepBoundary(refs, splitIdx)
firstKeptEntry := firstKeptEntryFromRefs(refs, splitIdx)
messages = messages[:splitIdx]

systemPromptMessage := chat.Message{
Expand Down
58 changes: 55 additions & 3 deletions pkg/runtime/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func TestComputeFirstKeptEntry(t *testing.T) {

a := agent.New("test", "")

t.Run("empty session returns 0", func(t *testing.T) {
t.Run("empty session returns sentinel", func(t *testing.T) {
t.Parallel()
sess := session.New()
assert.Equal(t, 0, ComputeFirstKeptEntry(sess, a))
assert.Equal(t, -1, ComputeFirstKeptEntry(sess, a))
})

t.Run("short conversation: split at end (compact everything)", func(t *testing.T) {
Expand All @@ -178,10 +178,62 @@ func TestComputeFirstKeptEntry(t *testing.T) {
session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "hi"}}),
session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "hello"}}),
}))
assert.Equal(t, len(sess.Messages), ComputeFirstKeptEntry(sess, a))
assert.Equal(t, -1, ComputeFirstKeptEntry(sess, a))
})
}

func TestComputeFirstKeptEntry_AfterPriorCompactionAllowsZeroKeptEntry(t *testing.T) {
t.Parallel()

var items []session.Item
for i := range 4 {
items = append(items, session.NewMessageItem(&session.Message{
Message: chat.Message{
Role: chat.MessageRoleUser,
Content: strings.Repeat(string(rune('a'+i)), 40000), // ~10k tokens each
},
}))
}
items = append(items, session.Item{
Summary: "previous summary",
FirstKeptEntry: 0,
})

sess := session.New(session.WithMessages(items))
a := agent.New("test", "")

assert.Equal(t, 3, ComputeFirstKeptEntry(sess, a))
}

func TestComputeFirstKeptEntry_AfterPriorCompactionUsesRawSessionIndexes(t *testing.T) {
t.Parallel()

var items []session.Item
for i := range 10 {
items = append(items, session.NewMessageItem(&session.Message{
Message: chat.Message{
Role: chat.MessageRoleUser,
Content: strings.Repeat(string(rune('a'+i)), 40000), // ~10k tokens each
},
}))
}
items = append(items, session.Item{
Summary: "previous summary",
FirstKeptEntry: 8,
})

sess := session.New(session.WithMessages(items))
a := agent.New("test", "")

// The reconstructed conversation seen by compaction is:
// [synthetic previous summary, raw message 8, raw message 9]
// With ~10k-token raw messages and a 20k keep window, the kept suffix
// starts at reconstructed index 2, which must map back to raw session
// index 9. A simple filtered-index-to-raw-index mapping would return 2
// and resurrect old messages that were already covered by the summary.
assert.Equal(t, 9, ComputeFirstKeptEntry(sess, a))
}

func TestMapToSessionIndex(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/session_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestSessionGetMessages_SummaryWithoutFirstKeptEntry(t *testing.T) {
session.NewMessageItem(&session.Message{
Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "m2"},
}),
{Summary: "This is a summary"},
{Summary: "This is a summary", FirstKeptEntry: -1},
session.NewMessageItem(&session.Message{
Message: chat.Message{Role: chat.MessageRoleUser, Content: "m3"},
}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/session/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func cloneSessionItem(item Item) (Item, error) {
}
return Item{SubSession: clonedSub}, nil
case item.Summary != "":
return Item{Summary: item.Summary, Cost: item.Cost}, nil
return Item{Summary: item.Summary, FirstKeptEntry: item.FirstKeptEntry, Cost: item.Cost}, nil
default:
return Item{}, errors.New("cannot clone empty session item")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/session/branch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func TestCloneSessionItem(t *testing.T) {
})

t.Run("summary item clones successfully", func(t *testing.T) {
item := Item{Summary: "test summary"}
item := Item{Summary: "test summary", FirstKeptEntry: 2}
cloned, err := cloneSessionItem(item)
require.NoError(t, err)
assert.Equal(t, "test summary", cloned.Summary)
assert.Equal(t, 2, cloned.FirstKeptEntry)
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/session/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func getAllMigrations() []Migration {
ID: 21,
Name: "021_add_first_kept_entry_column",
Description: "Add first_kept_entry column to session_items for compaction-preserved messages",
UpSQL: `ALTER TABLE session_items ADD COLUMN first_kept_entry INTEGER DEFAULT 0`,
UpSQL: `ALTER TABLE session_items ADD COLUMN first_kept_entry INTEGER DEFAULT -1`,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while it looks to be ok in this context it's always dangerous to update an existing migration. A new migration 22 would be cheap: UPDATE session_items SET first_kept_entry = -1 WHERE item_type = 'summary' AND first_kept_entry = 0

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooof this is no good

},
}
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,14 @@ func (s *Session) snapshotItems() []Item {
return items
}

// SnapshotItems returns a copy of s.Messages safe to use without holding
// s.mu. Each Message value is deep-copied so concurrent UpdateMessage calls
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing snapshotItems() as public to serve the compactor creates direct cross-package coupling — the compactor now reaches into session internals rather than asking the session to produce what it needs. A narrower alternative would be a Session method (e.g. BuildCompactionRefs()) that returns the already-reconstructed message list with raw indexes, keeping the shape the compactor actually needs without leaking the full item slice. Fine to defer to a follow-up, but worth noting since we are growing the public API surface.

// cannot mutate the snapshot; non-Message fields (Summary, SubSession, Cost,
// FirstKeptEntry) are shallow-copied since they are not mutated in place.
func (s *Session) SnapshotItems() []Item {
return s.snapshotItems()
}

// cloneChatMessage returns a deep copy of a chat.Message, duplicating
// all slice and pointer fields that would otherwise alias the original.
func cloneChatMessage(m chat.Message) chat.Message {
Expand Down Expand Up @@ -902,7 +910,7 @@ func buildSessionSummaryMessages(items []Item) ([]chat.Message, int) {
startIndex := lastSummaryIndex + 1
if lastSummaryIndex >= 0 {
kept := items[lastSummaryIndex].FirstKeptEntry
if kept > 0 && kept < lastSummaryIndex {
if kept >= 0 && kept < lastSummaryIndex {
startIndex = kept
}
}
Expand Down
30 changes: 28 additions & 2 deletions pkg/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestGetMessagesWithSummary(t *testing.T) {
Content: "first response",
}))

s.Messages = append(s.Messages, Item{Summary: "This is a summary of the conversation so far"})
s.Messages = append(s.Messages, Item{Summary: "This is a summary of the conversation so far", FirstKeptEntry: -1})

s.AddMessage(NewAgentMessage("", &chat.Message{
Role: chat.MessageRoleUser,
Expand Down Expand Up @@ -161,6 +161,32 @@ func TestGetMessagesWithSummary(t *testing.T) {
assert.Equal(t, 3, userAssistantMessages, "should only include messages after summary")
}

func TestGetMessagesWithSummaryAndZeroFirstKeptEntry(t *testing.T) {
testAgent := &agent.Agent{}

s := New(WithMessages([]Item{
NewMessageItem(&Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "m1"}}),
NewMessageItem(&Message{Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "m2"}}),
NewMessageItem(&Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "m3"}}),
{Summary: "summary", FirstKeptEntry: 0},
}))

messages := s.GetMessages(testAgent)

var conversationMessages []chat.Message
for _, msg := range messages {
if msg.Role != chat.MessageRoleSystem {
conversationMessages = append(conversationMessages, msg)
}
}

require.Len(t, conversationMessages, 4)
assert.Equal(t, "Session Summary: summary", conversationMessages[0].Content)
assert.Equal(t, "m1", conversationMessages[1].Content)
assert.Equal(t, "m2", conversationMessages[2].Content)
assert.Equal(t, "m3", conversationMessages[3].Content)
}

func TestGetMessages_Instructions(t *testing.T) {
testAgent := agent.New("root", "instructions")

Expand Down Expand Up @@ -201,7 +227,7 @@ func TestGetMessages_CacheControlWithSummary(t *testing.T) {
)

s := New()
s.Messages = append(s.Messages, Item{Summary: "Test summary"})
s.Messages = append(s.Messages, Item{Summary: "Test summary", FirstKeptEntry: -1})

extra := chat.Message{
Role: chat.MessageRoleSystem,
Expand Down
2 changes: 1 addition & 1 deletion pkg/session/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ type sessionItemRow struct {
// loadSession when resolving sub-sessions inside a transaction.
func (s *SQLiteSessionStore) loadSessionItems(ctx context.Context, q querier, sessionID string) ([]Item, error) {
rows, err := q.QueryContext(ctx,
`SELECT position, item_type, agent_name, message_json, implicit, subsession_id, summary_text, COALESCE(first_kept_entry, 0)
`SELECT position, item_type, agent_name, message_json, implicit, subsession_id, summary_text, COALESCE(first_kept_entry, -1)
FROM session_items WHERE session_id = ? ORDER BY position`, sessionID)
if err != nil {
return nil, err
Expand Down
Loading