1- import { db } from '@sim/db'
2- import { userTableRows } from '@sim/db/schema'
31import { createLogger } from '@sim/logger'
42import { toError } from '@sim/utils/errors'
53import { generateId } from '@sim/utils/id'
64import { parse as csvParse } from 'csv-parse/sync'
7- import { eq } from 'drizzle-orm'
85import { FunctionExecute , Read as ReadTool } from '@/lib/copilot/generated/tool-catalog-v1'
96import { CopilotTableOutcome } from '@/lib/copilot/generated/trace-attribute-values-v1'
107import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1'
118import { TraceEvent } from '@/lib/copilot/generated/trace-events-v1'
129import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1'
1310import { withCopilotSpan } from '@/lib/copilot/request/otel'
1411import type { ExecutionContext , ToolCallResult } from '@/lib/copilot/request/types'
15- import type { RowData } from '@/lib/table'
16- import { nKeysBetween } from '@/lib/table/order-key '
17- import { buildOrderedRowValues , getTableById } from '@/lib/table/service'
12+ import type { RowData , TableDefinition } from '@/lib/table'
13+ import { buildIdByName , rowDataNameToId } from '@/lib/table/column-keys '
14+ import { getTableById , replaceTableRows } from '@/lib/table/service'
1815
1916const logger = createLogger ( 'CopilotToolResultTables' )
2017
2118const MAX_OUTPUT_TABLE_ROWS = 10_000
22- const BATCH_CHUNK_SIZE = 500
19+
20+ /**
21+ * Replaces a table's rows with wire rows keyed by column name. Translates the
22+ * keys to stable column ids (unknown keys are dropped, matching every other
23+ * name-translating boundary) and delegates to `replaceTableRows`, which owns
24+ * locking, validation, plan row limits, batching, and rowCount maintenance.
25+ */
26+ async function replaceTableRowsFromWire (
27+ table : TableDefinition ,
28+ rows : Array < Record < string , unknown > > ,
29+ context : ExecutionContext
30+ ) : Promise < { error ?: string } > {
31+ const idByName = buildIdByName ( table . schema )
32+ const idKeyedRows = rows . map ( ( row ) => rowDataNameToId ( row as RowData , idByName ) )
33+ if ( idKeyedRows . every ( ( row ) => Object . keys ( row ) . length === 0 ) ) {
34+ return {
35+ error : `None of the row keys match columns on table "${ table . name } " (columns: ${ table . schema . columns . map ( ( c ) => c . name ) . join ( ', ' ) } )` ,
36+ }
37+ }
38+ await replaceTableRows (
39+ {
40+ tableId : table . id ,
41+ rows : idKeyedRows ,
42+ workspaceId : table . workspaceId ,
43+ userId : context . userId ,
44+ } ,
45+ table ,
46+ generateId ( ) . slice ( 0 , 8 )
47+ )
48+ return { }
49+ }
2350
2451export async function maybeWriteOutputToTable (
2552 toolName : string ,
@@ -44,7 +71,7 @@ export async function maybeWriteOutputToTable(
4471 async ( span ) => {
4572 try {
4673 const table = await getTableById ( outputTable )
47- if ( ! table ) {
74+ if ( ! table || table . workspaceId !== context . workspaceId ) {
4875 span . setAttribute ( TraceAttr . CopilotTableOutcome , CopilotTableOutcome . TableNotFound )
4976 return {
5077 success : false ,
@@ -97,33 +124,11 @@ export async function maybeWriteOutputToTable(
97124 if ( context . abortSignal ?. aborted ) {
98125 throw new Error ( 'Request aborted before tool mutation could be applied' )
99126 }
100- await db . transaction ( async ( tx ) => {
101- if ( context . abortSignal ?. aborted ) {
102- throw new Error ( 'Request aborted before tool mutation could be applied' )
103- }
104- await tx . delete ( userTableRows ) . where ( eq ( userTableRows . tableId , outputTable ) )
105-
106- const now = new Date ( )
107- // Replace-all: table was just cleared — mint a fresh contiguous key run.
108- const orderKeys = nKeysBetween ( null , null , rows . length )
109- for ( let i = 0 ; i < rows . length ; i += BATCH_CHUNK_SIZE ) {
110- if ( context . abortSignal ?. aborted ) {
111- throw new Error ( 'Request aborted before tool mutation could be applied' )
112- }
113- const chunk = rows . slice ( i , i + BATCH_CHUNK_SIZE )
114- const values = buildOrderedRowValues ( {
115- tableId : outputTable ,
116- workspaceId : context . workspaceId ! ,
117- rows : chunk as RowData [ ] ,
118- startPosition : i ,
119- orderKeys : orderKeys . slice ( i , i + BATCH_CHUNK_SIZE ) ,
120- now,
121- createdBy : context . userId ,
122- makeId : ( ) => `row_${ generateId ( ) . replace ( / - / g, '' ) } ` ,
123- } )
124- await tx . insert ( userTableRows ) . values ( values )
125- }
126- } )
127+ const replaceResult = await replaceTableRowsFromWire ( table , rows , context )
128+ if ( replaceResult . error ) {
129+ span . setAttribute ( TraceAttr . CopilotTableOutcome , CopilotTableOutcome . InvalidShape )
130+ return { success : false , error : replaceResult . error }
131+ }
127132
128133 logger . info ( 'Tool output written to table' , {
129134 toolName,
@@ -181,7 +186,7 @@ export async function maybeWriteReadCsvToTable(
181186 async ( span ) => {
182187 try {
183188 const table = await getTableById ( outputTable )
184- if ( ! table ) {
189+ if ( ! table || table . workspaceId !== context . workspaceId ) {
185190 span . setAttribute ( TraceAttr . CopilotTableOutcome , CopilotTableOutcome . TableNotFound )
186191 return { success : false , error : `Table "${ outputTable } " not found` }
187192 }
@@ -243,33 +248,11 @@ export async function maybeWriteReadCsvToTable(
243248 if ( context . abortSignal ?. aborted ) {
244249 throw new Error ( 'Request aborted before tool mutation could be applied' )
245250 }
246- await db . transaction ( async ( tx ) => {
247- if ( context . abortSignal ?. aborted ) {
248- throw new Error ( 'Request aborted before tool mutation could be applied' )
249- }
250- await tx . delete ( userTableRows ) . where ( eq ( userTableRows . tableId , outputTable ) )
251-
252- const now = new Date ( )
253- // Replace-all: table was just cleared — mint a fresh contiguous key run.
254- const orderKeys = nKeysBetween ( null , null , rows . length )
255- for ( let i = 0 ; i < rows . length ; i += BATCH_CHUNK_SIZE ) {
256- if ( context . abortSignal ?. aborted ) {
257- throw new Error ( 'Request aborted before tool mutation could be applied' )
258- }
259- const chunk = rows . slice ( i , i + BATCH_CHUNK_SIZE )
260- const values = buildOrderedRowValues ( {
261- tableId : outputTable ,
262- workspaceId : context . workspaceId ! ,
263- rows : chunk as RowData [ ] ,
264- startPosition : i ,
265- orderKeys : orderKeys . slice ( i , i + BATCH_CHUNK_SIZE ) ,
266- now,
267- createdBy : context . userId ,
268- makeId : ( ) => `row_${ generateId ( ) . replace ( / - / g, '' ) } ` ,
269- } )
270- await tx . insert ( userTableRows ) . values ( values )
271- }
272- } )
251+ const replaceResult = await replaceTableRowsFromWire ( table , rows , context )
252+ if ( replaceResult . error ) {
253+ span . setAttribute ( TraceAttr . CopilotTableOutcome , CopilotTableOutcome . InvalidShape )
254+ return { success : false , error : replaceResult . error }
255+ }
273256
274257 logger . info ( 'Read output written to table' , {
275258 toolName,
0 commit comments