Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 66 additions & 53 deletions typescript/amp/src/ManifestBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ export const ManifestBuildResult = Schema.Struct({
})
export type ManifestBuildResult = typeof ManifestBuildResult.Type

export class ManifestBuilderError extends Data.TaggedError("ManifestBuilderError")<{
export class ManifestBuilderError extends Data.TaggedError(
"ManifestBuilderError",
)<{
readonly cause: unknown
readonly message: string
readonly table: string
}> {}

export class ManifestBuilder extends Effect.Service<ManifestBuilder>()("Amp/ManifestBuilder", {
effect: Effect.gen(function*() {
const client = yield* Admin.Admin
const build = (config: Model.DatasetConfig) =>
Effect.gen(function*() {
export class ManifestBuilder extends Effect.Service<ManifestBuilder>()(
"Amp/ManifestBuilder",
{
effect: Effect.gen(function*() {
const client = yield* Admin.Admin

const build = Effect.fn("ManifestBuilder.build")(function*(
config: Model.DatasetConfig,
) {
// Extract metadata
const metadata = new Model.DatasetMetadata({
namespace: config.namespace ?? Model.DatasetNamespace.make("_"),
Expand All @@ -35,44 +41,41 @@ export class ManifestBuilder extends Effect.Service<ManifestBuilder>()("Amp/Mani
sources: config.sources,
})

// Build manifest tables - send all tables in one request
const tables = yield* Effect.gen(function*() {
const configTables = config.tables ?? {}
const configFunctions = config.functions ?? {}

// Build function definitions map from config
const functionsMap = Object.fromEntries(
Object.entries(configFunctions).map(([name, func]) => [
name,
new Model.FunctionDefinition({
source: func.source,
inputTypes: func.inputTypes,
outputType: func.outputType,
}),
]),
)
// Extract configuration values
const configTables = config.tables ?? {}
const configFunctions = config.functions ?? {}

// Build function definitions map from config
const functionsMap: Record<string, Model.FunctionDefinition> = {}
for (const [name, func] of Object.entries(configFunctions)) {
functionsMap[name] = new Model.FunctionDefinition({
source: func.source,
inputTypes: func.inputTypes,
outputType: func.outputType,
})
}

// If no tables and no functions, skip schema request entirely
if (Object.keys(configTables).length === 0 && Object.keys(functionsMap).length === 0) {
return []
}
// Cache length computations
const configTablesLength = Object.keys(configTables).length
const functionsMapLength = Object.keys(functionsMap).length

// If no tables but we have functions, still skip schema request
// (when functions-only validation happens server-side, returns empty schema)
if (Object.keys(configTables).length === 0) {
return []
}
// Setup the table map
const tablesMap: Record<string, Model.Table> = {}

// Only perform schema request if tables are present - when functions-
// only validation happens server-side, an empty schema is returned
if (configTablesLength !== 0) {
// Prepare all table SQL queries
const tableSqlMap = Object.fromEntries(
Object.entries(configTables).map(([name, table]) => [name, table.sql]),
)
const tableSqlMap: Record<string, string> = {}
for (const [name, table] of Object.entries(configTables)) {
tableSqlMap[name] = table.sql
}

// Call schema endpoint with all tables and functions at once
const request = new Admin.GetOutputSchemaPayload({
tables: tableSqlMap,
dependencies: config.dependencies,
functions: Object.keys(functionsMap).length > 0 ? functionsMap : undefined,
functions: functionsMapLength > 0 ? functionsMap : undefined,
})

const response = yield* client.getOutputSchema(request).pipe(
Expand All @@ -88,18 +91,19 @@ export class ManifestBuilder extends Effect.Service<ManifestBuilder>()("Amp/Mani
)

// Process each table's schema
return Object.entries(configTables).map(([name, table]) => {
for (const [name, table] of Object.entries(configTables)) {
const tableSchema = response.schemas[name]

if (!tableSchema) {
throw new ManifestBuilderError({
return yield* new ManifestBuilderError({
cause: undefined,
message: `No schema returned for table ${name}`,
table: name,
})
}

if (tableSchema.networks.length != 1) {
throw new ManifestBuilderError({
return yield* new ManifestBuilderError({
cause: undefined,
message: `Expected 1 network for SQL query, got ${tableSchema.networks}`,
table: name,
Expand All @@ -108,30 +112,39 @@ export class ManifestBuilder extends Effect.Service<ManifestBuilder>()("Amp/Mani

const network = Model.Network.make(tableSchema.networks[0])
const input = new Model.TableInput({ sql: table.sql })
const output = new Model.Table({ input, schema: tableSchema.schema, network })
const output = new Model.Table({
input,
schema: tableSchema.schema,
network,
})

return [name, output] as const
})
})
tablesMap[name] = output
}
}

// Build manifest functions
const functions = Object.entries(config.functions ?? {}).map(([name, func]) => {
const { inputTypes, outputType, source } = func
const functionManifest = new Model.FunctionManifest({ name, source, inputTypes, outputType })

return [name, functionManifest] as const
})
const functionManifestMap: Record<string, Model.FunctionManifest> = {}
for (const [name, func] of Object.entries(configFunctions)) {
const functionManifest = new Model.FunctionManifest({
name,
source: func.source,
inputTypes: func.inputTypes,
outputType: func.outputType,
})
functionManifestMap[name] = functionManifest
}

const manifest = new Model.DatasetDerived({
kind: "manifest",
dependencies: config.dependencies,
tables: Object.fromEntries(tables),
functions: Object.fromEntries(functions),
tables: tablesMap,
functions: functionManifestMap,
})

return { metadata, manifest }
})

return { build }
}),
}) {}
return { build }
}),
},
) {}
17 changes: 11 additions & 6 deletions typescript/amp/src/api/ArrowFlight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,17 @@ const createResponseStream: (
catch: (cause) => new ArrowFlightError({ cause, method: "getFlightInfo" }),
})

const response = yield* Effect.async<AsyncIterable<Flight.FlightData>>((resume, signal) => {
resume(Effect.sync(() => client.doGet(ticket, { signal })))
})

let meta: Uint8Array
const ipc = Stream.fromAsyncIterable(response, (cause) => new ArrowFlightError({ cause, method: "doGet" })).pipe(
const ipc = Stream.unwrapScoped(Effect.gen(function*() {
const controller = yield* Effect.acquireRelease(
Effect.sync(() => new AbortController()),
(controller) => Effect.sync(() => controller.abort()),
)
return Stream.fromAsyncIterable(
client.doGet(ticket, { signal: controller.signal }),
(cause) => new ArrowFlightError({ cause, method: "doGet" }),
)
})).pipe(
Stream.map((data) => {
// NOTE: This is a hack to forward the app metadata through the stream.
meta = data.appMetadata
Expand All @@ -103,8 +108,8 @@ const createResponseStream: (
)

const reader = yield* Effect.tryPromise({
catch: (cause) => new ArrowFlightError({ cause, method: "doGet" }),
try: () => RecordBatchReader.from(ipc),
catch: (cause) => new ArrowFlightError({ cause, method: "doGet" }),
})

return Stream.fromAsyncIterable(reader, (cause) => new ArrowFlightError({ cause, method: "doGet" })).pipe(
Expand Down