From f060ac2603c89ab9dd64dc52d4164bbca624d865 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Sun, 23 Nov 2025 07:29:34 -0500 Subject: [PATCH 1/3] remove unnecessary Effect.async --- typescript/amp/src/api/ArrowFlight.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/typescript/amp/src/api/ArrowFlight.ts b/typescript/amp/src/api/ArrowFlight.ts index 106ae8dd7..647f9774b 100644 --- a/typescript/amp/src/api/ArrowFlight.ts +++ b/typescript/amp/src/api/ArrowFlight.ts @@ -88,11 +88,8 @@ const createResponseStream: ( catch: (cause) => new ArrowFlightError({ cause, method: "getFlightInfo" }), }) - const response = yield* Effect.async>((resume, signal) => { - resume(Effect.sync(() => client.doGet(ticket, { signal }))) - }) - let meta: Uint8Array + const response = client.doGet(ticket) const ipc = Stream.fromAsyncIterable(response, (cause) => new ArrowFlightError({ cause, method: "doGet" })).pipe( Stream.map((data) => { // NOTE: This is a hack to forward the app metadata through the stream. @@ -103,8 +100,8 @@ const createResponseStream: ( ) const reader = yield* Effect.tryPromise({ - catch: (cause) => new ArrowFlightError({ cause, method: "doGet" }), try: () => RecordBatchReader.from(ipc), + catch: (cause) => new ArrowFlightError({ cause, method: "doGet" }), }) return Stream.fromAsyncIterable(reader, (cause) => new ArrowFlightError({ cause, method: "doGet" })).pipe( From df6a362a2f9496081fa8c8c6f4a83062c0701372 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Sun, 23 Nov 2025 07:54:08 -0500 Subject: [PATCH 2/3] reduce allocations in ManifestBuilder --- typescript/amp/src/ManifestBuilder.ts | 119 ++++++++++++++------------ 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/typescript/amp/src/ManifestBuilder.ts b/typescript/amp/src/ManifestBuilder.ts index ca9566074..c09e11415 100644 --- a/typescript/amp/src/ManifestBuilder.ts +++ b/typescript/amp/src/ManifestBuilder.ts @@ -11,17 +11,23 @@ export const ManifestBuildResult = Schema.Struct({ }) export type ManifestBuildResult = typeof ManifestBuildResult.Type -export class ManifestBuilderError extends Data.TaggedError("ManifestBuilderError")<{ +export class ManifestBuilderError extends Data.TaggedError( + "ManifestBuilderError", +)<{ readonly cause: unknown readonly message: string readonly table: string }> {} -export class ManifestBuilder extends Effect.Service()("Amp/ManifestBuilder", { - effect: Effect.gen(function*() { - const client = yield* Admin.Admin - const build = (config: Model.DatasetConfig) => - Effect.gen(function*() { +export class ManifestBuilder extends Effect.Service()( + "Amp/ManifestBuilder", + { + effect: Effect.gen(function*() { + const client = yield* Admin.Admin + + const build = Effect.fn("ManifestBuilder.build")(function*( + config: Model.DatasetConfig, + ) { // Extract metadata const metadata = new Model.DatasetMetadata({ namespace: config.namespace ?? Model.DatasetNamespace.make("_"), @@ -35,44 +41,41 @@ export class ManifestBuilder extends Effect.Service()("Amp/Mani sources: config.sources, }) - // Build manifest tables - send all tables in one request - const tables = yield* Effect.gen(function*() { - const configTables = config.tables ?? {} - const configFunctions = config.functions ?? {} - - // Build function definitions map from config - const functionsMap = Object.fromEntries( - Object.entries(configFunctions).map(([name, func]) => [ - name, - new Model.FunctionDefinition({ - source: func.source, - inputTypes: func.inputTypes, - outputType: func.outputType, - }), - ]), - ) + // Extract configuration values + const configTables = config.tables ?? {} + const configFunctions = config.functions ?? {} + + // Build function definitions map from config + const functionsMap: Record = {} + for (const [name, func] of Object.entries(configFunctions)) { + functionsMap[name] = new Model.FunctionDefinition({ + source: func.source, + inputTypes: func.inputTypes, + outputType: func.outputType, + }) + } - // If no tables and no functions, skip schema request entirely - if (Object.keys(configTables).length === 0 && Object.keys(functionsMap).length === 0) { - return [] - } + // Cache length computations + const configTablesLength = Object.keys(configTables).length + const functionsMapLength = Object.keys(functionsMap).length - // If no tables but we have functions, still skip schema request - // (when functions-only validation happens server-side, returns empty schema) - if (Object.keys(configTables).length === 0) { - return [] - } + // Setup the table map + const tablesMap: Record = {} + // Only perform schema request if tables are present - when functions- + // only validation happens server-side, an empty schema is returned + if (configTablesLength !== 0) { // Prepare all table SQL queries - const tableSqlMap = Object.fromEntries( - Object.entries(configTables).map(([name, table]) => [name, table.sql]), - ) + const tableSqlMap: Record = {} + for (const [name, table] of Object.entries(configTables)) { + tableSqlMap[name] = table.sql + } // Call schema endpoint with all tables and functions at once const request = new Admin.GetOutputSchemaPayload({ tables: tableSqlMap, dependencies: config.dependencies, - functions: Object.keys(functionsMap).length > 0 ? functionsMap : undefined, + functions: functionsMapLength > 0 ? functionsMap : undefined, }) const response = yield* client.getOutputSchema(request).pipe( @@ -88,10 +91,11 @@ export class ManifestBuilder extends Effect.Service()("Amp/Mani ) // Process each table's schema - return Object.entries(configTables).map(([name, table]) => { + for (const [name, table] of Object.entries(configTables)) { const tableSchema = response.schemas[name] + if (!tableSchema) { - throw new ManifestBuilderError({ + return yield* new ManifestBuilderError({ cause: undefined, message: `No schema returned for table ${name}`, table: name, @@ -99,7 +103,7 @@ export class ManifestBuilder extends Effect.Service()("Amp/Mani } if (tableSchema.networks.length != 1) { - throw new ManifestBuilderError({ + return yield* new ManifestBuilderError({ cause: undefined, message: `Expected 1 network for SQL query, got ${tableSchema.networks}`, table: name, @@ -108,30 +112,39 @@ export class ManifestBuilder extends Effect.Service()("Amp/Mani const network = Model.Network.make(tableSchema.networks[0]) const input = new Model.TableInput({ sql: table.sql }) - const output = new Model.Table({ input, schema: tableSchema.schema, network }) + const output = new Model.Table({ + input, + schema: tableSchema.schema, + network, + }) - return [name, output] as const - }) - }) + tablesMap[name] = output + } + } // Build manifest functions - const functions = Object.entries(config.functions ?? {}).map(([name, func]) => { - const { inputTypes, outputType, source } = func - const functionManifest = new Model.FunctionManifest({ name, source, inputTypes, outputType }) - - return [name, functionManifest] as const - }) + const functionManifestMap: Record = {} + for (const [name, func] of Object.entries(configFunctions)) { + const functionManifest = new Model.FunctionManifest({ + name, + source: func.source, + inputTypes: func.inputTypes, + outputType: func.outputType, + }) + functionManifestMap[name] = functionManifest + } const manifest = new Model.DatasetDerived({ kind: "manifest", dependencies: config.dependencies, - tables: Object.fromEntries(tables), - functions: Object.fromEntries(functions), + tables: tablesMap, + functions: functionManifestMap, }) return { metadata, manifest } }) - return { build } - }), -}) {} + return { build } + }), + }, +) {} From 20380cce8572749401faff1380f60067a973d718 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Sun, 23 Nov 2025 13:34:13 -0500 Subject: [PATCH 3/3] use stream scope to abort response via signal --- typescript/amp/src/api/ArrowFlight.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/typescript/amp/src/api/ArrowFlight.ts b/typescript/amp/src/api/ArrowFlight.ts index 647f9774b..18559392e 100644 --- a/typescript/amp/src/api/ArrowFlight.ts +++ b/typescript/amp/src/api/ArrowFlight.ts @@ -89,8 +89,16 @@ const createResponseStream: ( }) let meta: Uint8Array - const response = client.doGet(ticket) - const ipc = Stream.fromAsyncIterable(response, (cause) => new ArrowFlightError({ cause, method: "doGet" })).pipe( + const ipc = Stream.unwrapScoped(Effect.gen(function*() { + const controller = yield* Effect.acquireRelease( + Effect.sync(() => new AbortController()), + (controller) => Effect.sync(() => controller.abort()), + ) + return Stream.fromAsyncIterable( + client.doGet(ticket, { signal: controller.signal }), + (cause) => new ArrowFlightError({ cause, method: "doGet" }), + ) + })).pipe( Stream.map((data) => { // NOTE: This is a hack to forward the app metadata through the stream. meta = data.appMetadata