Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions ee/enterprise/conversation-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,14 @@ async function runConversationMessage(

const model = keyData.aiModel
const budget = selectHistoryBudget({ plan, model, source: 'api' })
// Same resume contract as the Studio path — load the full
// trace (intermediate assistant + tool_result rows) so the
// prompt builder can reconstruct Anthropic protocol shape.
const historyRows = await db.loadConversationMessages(
conversationId,
budget.rowLimit,
'role, content, tool_calls',
undefined,
{ includeInternal: true },
)
const messages = buildPromptMessages({
history: historyRows ?? [],
Expand All @@ -319,6 +323,7 @@ async function runConversationMessage(
let totalCacheCreationInputTokens = 0
let totalCacheReadInputTokens = 0
let lastAssistantContent: AIContentBlock[] = []
let iterations: Array<{ iteration: number, assistantBlocks: AIContentBlock[], toolResultBlocks: AIContentBlock[] }> = []

for await (const evt of runConversationLoop(
{ model, apiKey, systemPrompt, messages, tools: aiTools },
Expand Down Expand Up @@ -359,6 +364,7 @@ async function runConversationMessage(
totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0
totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0
lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? []
iterations = (evt.iterations as typeof iterations) ?? []
break
}
}
Expand All @@ -367,8 +373,8 @@ async function runConversationMessage(
await saveApiChatResult({
conversationId,
userMessage: body.message,
assistantText: responseText,
assistantContent: lastAssistantContent,
iterations,
lastAssistantContent,
model,
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,15 @@ export default defineEventHandler(async (event) => {

// === HISTORY ===
const budget = selectHistoryBudget({ plan, model, source: usageSource })
const historyRows = await db.loadConversationMessages(conversationId, budget.rowLimit)
// Resume reads need the full Anthropic-protocol trace, including
// intermediate assistant turns and tool_result blocks — those rows
// are RLS-hidden from end users but allowed for service-role reads.
const historyRows = await db.loadConversationMessages(
conversationId,
budget.rowLimit,
undefined,
{ includeInternal: true },
)
const messages = buildPromptMessages({
history: historyRows ?? [],
newUserMessage: body.message,
Expand Down Expand Up @@ -232,6 +240,7 @@ export default defineEventHandler(async (event) => {
let totalCacheCreationInputTokens = 0
let totalCacheReadInputTokens = 0
let lastAssistantContent: AIContentBlock[] = []
let iterations: Array<{ iteration: number, assistantBlocks: AIContentBlock[], toolResultBlocks: AIContentBlock[] }> = []

try {
for await (const evt of runConversationLoop(
Expand Down Expand Up @@ -263,6 +272,7 @@ export default defineEventHandler(async (event) => {
totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0
totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0
lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? []
iterations = (evt.iterations as typeof iterations) ?? []

// Forward the done event without lastContent (not needed by client)
await eventStream.push(JSON.stringify({
Expand All @@ -277,16 +287,17 @@ export default defineEventHandler(async (event) => {
}

// === SAVE TO DB ===
const assistantText = lastAssistantContent
.filter(b => b.type === 'text')
.map(b => (b as { text: string }).text)
.join('')

// `saveChatResult` writes the full iteration trace as a
// single batched INSERT — seed user row, every assistant
// iteration, every tool_result iteration — under one
// `turn_id`. Intermediate rows land with `internal=true`
// (RLS-hidden from the user transcript); only the final
// assistant row stays visible.
await saveChatResult({
conversationId,
userMessage: body.message,
assistantText,
assistantContent: lastAssistantContent,
iterations,
lastAssistantContent,
model,
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
Expand Down
82 changes: 70 additions & 12 deletions server/providers/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,42 @@ export interface PaginationOptions {
limit?: number
}

/**
* Shape of one row in the `messages` table for write operations.
*
* `content_blocks` is the structured Anthropic-protocol payload
* (text + tool_use + tool_result discriminated array) and takes
* precedence over the legacy `content` + `toolCalls` pair on read.
*
* `turnId` groups all rows produced by a single chat POST — seed
* user row, every assistant iteration, every tool_result iteration.
* `turnSequence` orders rows within the turn deterministically;
* `iteration` is the engine's per-turn iteration counter (NULL for
* the seed user row).
*
* `internal=true` rows are part of the protocol-replay trace but
* MUST NOT appear in the user-facing transcript — RLS enforces this
* for client-side queries; the resume path uses the service role to
* read across the boundary.
*/
export interface MessageInsertInput {
conversationId: string
role: 'user' | 'assistant'
content: string
contentBlocks?: unknown[] | null
/** Legacy column kept for backward compat with old single-row writes. */
toolCalls?: unknown[] | null
turnId: string
turnSequence: number
iteration?: number | null
internal: boolean
tokenCountInput?: number
tokenCountOutput?: number
cacheCreationInputTokens?: number
cacheReadInputTokens?: number
model?: string
}

export interface DatabaseProvider {
// ═══════════════════════════════════════════════════
// PROFILES
Expand Down Expand Up @@ -277,18 +313,40 @@ export interface DatabaseProvider {
// MESSAGES
// ═══════════════════════════════════════════════════

loadConversationMessages: (conversationId: string, limit?: number, fields?: string) => Promise<DatabaseRow[]>
insertMessage: (input: {
conversationId: string
role: 'user' | 'assistant'
content: string
toolCalls?: unknown[] | null
tokenCountInput?: number
tokenCountOutput?: number
cacheCreationInputTokens?: number
cacheReadInputTokens?: number
model?: string
}) => Promise<void>
/**
* Load conversation messages for the engine's resume path or the
* public transcript route.
*
* `options.includeInternal` defaults to `false` — the public route
* MUST NOT pass `true`. Only the chat / Conversation API resume
* paths set it to load the full Anthropic-protocol shape
* (intermediate assistant turns + tool_result blocks) needed to
* reconstruct prior multi-iteration loops.
*/
loadConversationMessages: (
conversationId: string,
limit?: number,
fields?: string,
options?: { includeInternal?: boolean },
) => Promise<DatabaseRow[]>

/**
* Insert a single message row. Convenience wrapper around the batch
* path used in places that only persist one row (e.g. an event
* stream side-effect). For the chat persistence path use
* `insertMessages` so the entire turn lands atomically in one
* round-trip.
*/
insertMessage: (input: MessageInsertInput) => Promise<void>

/**
* Atomic batch insert for the full assistant/tool_result trace a
* single chat POST produces. All rows in one batch share the same
* `turnId` so resume reads can group them deterministically; the
* caller assigns `turnSequence` in protocol order so two rows that
* land at the same `created_at` still resolve consistently.
*/
insertMessages: (rows: MessageInsertInput[]) => Promise<void>

// ═══════════════════════════════════════════════════
// AGENT USAGE
Expand Down
60 changes: 42 additions & 18 deletions server/providers/supabase-db/conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,31 @@
* Conversation, message, and agent usage methods
* for the Supabase DatabaseProvider.
*/
import type { DatabaseProvider, DatabaseRow } from '../database'
import type { DatabaseProvider, DatabaseRow, MessageInsertInput } from '../database'
import { getAdmin, getUser } from './helpers'

function toMessageRow(input: MessageInsertInput): Record<string, unknown> {
const row: Record<string, unknown> = {
conversation_id: input.conversationId,
role: input.role,
content: input.content,
turn_id: input.turnId,
turn_sequence: input.turnSequence,
internal: input.internal,
}

if (input.contentBlocks && input.contentBlocks.length > 0) row.content_blocks = input.contentBlocks
if (input.toolCalls) row.tool_calls = input.toolCalls
if (input.iteration != null) row.iteration = input.iteration
if (input.tokenCountInput) row.token_count_input = input.tokenCountInput
if (input.tokenCountOutput) row.token_count_output = input.tokenCountOutput
if (input.cacheCreationInputTokens) row.cache_creation_input_tokens = input.cacheCreationInputTokens
if (input.cacheReadInputTokens) row.cache_read_input_tokens = input.cacheReadInputTokens
if (input.model) row.model = input.model

return row
}

type ConversationMethods = Pick<
DatabaseProvider,
| 'createConversation'
Expand All @@ -15,6 +37,7 @@ type ConversationMethods = Pick<
| 'updateConversationTimestamp'
| 'loadConversationMessages'
| 'insertMessage'
| 'insertMessages'
| 'getAgentUsage'
| 'upsertAgentUsage'
| 'getMonthlyUsageSummary'
Expand Down Expand Up @@ -111,34 +134,35 @@ export function conversationMethods(): ConversationMethods {
.eq('id', conversationId)
},

async loadConversationMessages(conversationId, limit = 20, fields = 'role, content, tool_calls') {
async loadConversationMessages(conversationId, limit = 20, fields = 'role, content, tool_calls, content_blocks, turn_id, turn_sequence, iteration, internal', options) {
const admin = getAdmin()
const { data } = await admin
let query = admin
.from('messages')
.select(fields)
.eq('conversation_id', conversationId)
.order('created_at', { ascending: true })
.limit(limit)
// Ordering by `(created_at, turn_sequence)` keeps batch-inserted
// rows with identical timestamps in protocol order.
query = query.order('created_at', { ascending: true })
query = query.order('turn_sequence', { ascending: true })

if (!options?.includeInternal) {
query = query.eq('internal', false)
}

const { data } = await query.limit(limit)

return (data ?? []) as unknown as DatabaseRow[]
},

async insertMessage(input) {
const admin = getAdmin()
const row: Record<string, unknown> = {
conversation_id: input.conversationId,
role: input.role,
content: input.content,
}

if (input.toolCalls) row.tool_calls = input.toolCalls
if (input.tokenCountInput) row.token_count_input = input.tokenCountInput
if (input.tokenCountOutput) row.token_count_output = input.tokenCountOutput
if (input.cacheCreationInputTokens) row.cache_creation_input_tokens = input.cacheCreationInputTokens
if (input.cacheReadInputTokens) row.cache_read_input_tokens = input.cacheReadInputTokens
if (input.model) row.model = input.model
await admin.from('messages').insert(toMessageRow(input))
},

await admin.from('messages').insert(row)
async insertMessages(rows) {
if (rows.length === 0) return
const admin = getAdmin()
await admin.from('messages').insert(rows.map(toMessageRow))
},

// ─── Agent Usage ───
Expand Down
31 changes: 30 additions & 1 deletion server/utils/conversation-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ export interface ConversationEvent {
[key: string]: unknown
}

/**
* One slice of the tool-use loop. `assistantBlocks` is whatever the
* provider produced this iteration (text + tool_use blocks in order).
* `toolResultBlocks` is the synthesized tool_result content fed back
* to Anthropic as the next "user" message — empty for the final
* iteration when the model stops with `end_turn`. The persister
* writes each entry as `assistant` + (optional) `user` row pair
* sharing one `turn_id`.
*/
export interface IterationTrace {
iteration: number
assistantBlocks: AIContentBlock[]
toolResultBlocks: AIContentBlock[]
}

// ─── Configuration ───

export interface ConversationConfig {
Expand Down Expand Up @@ -94,6 +109,12 @@ export async function* runConversationLoop(
let totalCacheReadInputTokens = 0
let lastAssistantContent: AIContentBlock[] = []
let accumulatedAffected: AffectedResources = emptyAffected()
// Full iteration-by-iteration trace surfaced on `done` so the
// caller can persist the actual Anthropic-protocol shape Claude
// saw — intermediate assistant turns AND tool_result blocks —
// and reconstruct it on resume. Empty for the seed user message;
// populated for every loop iteration regardless of stop reason.
const trace: IterationTrace[] = []

let iteration = 0

Expand Down Expand Up @@ -189,7 +210,13 @@ export async function* runConversationLoop(

lastAssistantContent = assistantBlocks

if (stopReason !== 'tool_use' || currentToolCalls.length === 0) break
if (stopReason !== 'tool_use' || currentToolCalls.length === 0) {
// Final iteration — no tool execution this turn. Persist the
// assistant blocks alone (no tool_result row will exist for
// this iteration).
trace.push({ iteration, assistantBlocks, toolResultBlocks: [] })
break
}

// === TOOL EXECUTION with state guard + workflow-aware auto-merge ===
const toolResultBlocks: AIContentBlock[] = []
Expand Down Expand Up @@ -231,6 +258,7 @@ export async function* runConversationLoop(
config.messages.push({ role: 'assistant', content: assistantBlocks })
config.messages.push({ role: 'user', content: toolResultBlocks })
lastAssistantContent = assistantBlocks
trace.push({ iteration, assistantBlocks, toolResultBlocks })
}

// === DONE with affected resources ===
Expand All @@ -244,6 +272,7 @@ export async function* runConversationLoop(
},
affected: accumulatedAffected,
lastContent: lastAssistantContent,
iterations: trace,
}
}

Expand Down
Loading
Loading