From 98599e072173c52bf613a7159c936be3f88f7699 Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 10:32:28 -0400 Subject: [PATCH 1/9] upload metadata command --- bin/cli.test.ts | 96 ++++++++++ bin/cli.ts | 82 ++++++++- bun.lock | 5 + package.json | 1 + src/index.ts | 6 + src/ndjson.ts | 120 +++++++++++++ src/stream-json.ts | 22 +++ src/upload-metadata.test.ts | 350 ++++++++++++++++++++++++++++++++++++ src/upload-metadata.ts | 340 +++++++++++++++++++++++++++++++++++ 9 files changed, 1020 insertions(+), 2 deletions(-) create mode 100644 src/ndjson.ts create mode 100644 src/stream-json.ts create mode 100644 src/upload-metadata.test.ts create mode 100644 src/upload-metadata.ts diff --git a/bin/cli.test.ts b/bin/cli.test.ts index f3af7c2..7470185 100644 --- a/bin/cli.test.ts +++ b/bin/cli.test.ts @@ -14,6 +14,8 @@ type RunResult = { exitCode: number; }; +type UploadLine = { id: number }; + function runCli(args: string[]): RunResult { const proc = Bun.spawnSync({ cmd: ["bun", "run", CLI, ...args], @@ -121,6 +123,100 @@ describe("cli", () => { }); }); + describe("upload-metadata", () => { + it("errors when arguments are missing", () => { + const { stderr, exitCode } = runCli(["upload-metadata"]); + expect(exitCode).toBe(1); + expect(stderr).toContain( + " and arguments are required", + ); + }); + + it("errors when no api key is set", () => { + const proc = Bun.spawnSync({ + cmd: [ + "bun", + "run", + CLI, + "upload-metadata", + EXAMPLE_INPUT, + "http://127.0.0.1:1", + ], + cwd: REPO_ROOT, + env: { ...process.env, METABASE_API_KEY: "" }, + }); + expect(proc.exitCode).toBe(1); + expect(proc.stderr.toString()).toContain("API key is required"); + }); + + it("uploads against a mock server end-to-end", async () => { + + const server = Bun.serve({ + port: 0, + async fetch(request) { + const url = new URL(request.url); + const body = await request.text(); + const inLines = body + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0); + + let response = ""; + switch (url.pathname) { + case "/api/database/metadata/databases": + case "/api/database/metadata/tables": + case "/api/database/metadata/fields": + for (const line of inLines) { + const { id } = JSON.parse(line) as UploadLine; + response += JSON.stringify({ old_id: id, new_id: id }) + "\n"; + } + break; + case "/api/database/metadata/fields/finalize": + for (const line of inLines) { + const { id } = JSON.parse(line) as UploadLine; + response += JSON.stringify({ id, ok: true }) + "\n"; + } + break; + default: + return new Response("not found", { status: 404 }); + } + return new Response(response, { + headers: { "Content-Type": "application/x-ndjson" }, + }); + }, + }); + try { + // NB: must use async Bun.spawn — spawnSync would block the parent + // event loop and deadlock with the in-process mock server. + const proc = Bun.spawn({ + cmd: [ + "bun", + "run", + CLI, + "upload-metadata", + EXAMPLE_INPUT, + `http://127.0.0.1:${server.port}`, + ], + cwd: REPO_ROOT, + env: { ...process.env, METABASE_API_KEY: "ci-key" }, + stdout: "pipe", + stderr: "pipe", + }); + const [stdoutText, stderrText, exitCode] = await Promise.all([ + new Response(proc.stdout).text(), + new Response(proc.stderr).text(), + proc.exited, + ]); + expect(exitCode).toBe(0); + expect(stdoutText).toContain("Databases:"); + expect(stdoutText).toContain("Finalized:"); + expect(stderrText).toBe(""); + } finally { + await server.stop(); + } + }); + }); + describe("extract-spec", () => { let workdir: string; diff --git a/bin/cli.ts b/bin/cli.ts index 9145591..e59ee46 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -5,10 +5,17 @@ import { parseArgs } from "node:util"; import { extractFieldValues } from "../src/extract-field-values.js"; import { extractMetadata } from "../src/extract-metadata.js"; import { extractSpec } from "../src/extract-spec.js"; +import { + uploadMetadata, + type UploadMetadataResult, + type UploadStepStats, +} from "../src/upload-metadata.js"; type ParsedValues = { file?: string; help?: boolean; + "field-values"?: string; + "api-key"?: string; }; const HELP = `Usage: database-metadata [arguments] [options] @@ -26,6 +33,12 @@ Commands: extract-spec Copy the bundled spec.md into a target file --file Destination file (default: ./spec.md) + upload-metadata + Stream metadata (and optional field values) + to a target Metabase instance via NDJSON. + --field-values Optional field-values JSON file to upload after metadata + --api-key API key. Defaults to METABASE_API_KEY env var. + Options: -h, --help Show this help message`; @@ -35,6 +48,8 @@ function parseArguments() { options: { file: { type: "string" }, help: { type: "boolean", short: "h", default: false }, + "field-values": { type: "string" }, + "api-key": { type: "string" }, }, }); } @@ -80,13 +95,71 @@ function handleExtractFieldValues(positionals: string[]): void { process.exit(0); } +async function handleUploadMetadata( + positionals: string[], + values: ParsedValues, +): Promise { + const metadataFile = positionals[1]; + const instanceUrl = positionals[2]; + + if (!metadataFile || !instanceUrl) { + console.error( + "Error: and arguments are required", + ); + process.exit(1); + } + + const apiKey = values["api-key"] ?? process.env.METABASE_API_KEY; + if (!apiKey) { + console.error( + "Error: API key is required (pass --api-key or set METABASE_API_KEY)", + ); + process.exit(1); + } + + const fieldValuesFile = values["field-values"]; + const stats = await uploadMetadata({ + metadataFile, + fieldValuesFile, + instanceUrl, + apiKey, + }); + console.log(formatUploadReport(stats, Boolean(fieldValuesFile))); + process.exit(hasAnyErrors(stats) ? 1 : 0); +} + +function formatStepLine(label: string, step: UploadStepStats): string { + const total = step.mapped + step.errors; + return `${label} ${step.mapped}/${total} mapped (${step.errors} errors)`; +} + +function formatUploadReport( + stats: UploadMetadataResult, + fieldValuesRan: boolean, +): string { + const lines = [ + formatStepLine("Databases: ", stats.databases), + formatStepLine("Tables: ", stats.tables), + formatStepLine("Fields: ", stats.fieldsInsert), + formatStepLine("Finalized: ", stats.fieldsFinalize), + ]; + if (fieldValuesRan) { + lines.push(formatStepLine("Values: ", stats.fieldValues)); + } + return lines.join("\n"); +} + +function hasAnyErrors(stats: UploadMetadataResult): boolean { + return Object.values(stats).some((step) => step.errors > 0); +} + function handleExtractSpec(values: ParsedValues): void { const { target } = extractSpec({ file: values.file ?? "spec.md" }); console.log(`Spec extracted to ${target}`); process.exit(0); } -function main(): void { +async function main(): Promise { const { values, positionals } = parseArguments(); const command = positionals[0]; @@ -102,10 +175,15 @@ function main(): void { return handleExtractFieldValues(positionals); case "extract-spec": return handleExtractSpec(values); + case "upload-metadata": + return handleUploadMetadata(positionals, values); default: console.error(`Unknown command: ${command}`); process.exit(1); } } -main(); +main().catch((error) => { + console.error(error instanceof Error ? error.message : String(error)); + process.exit(1); +}); diff --git a/bun.lock b/bun.lock index de5ae96..5b9342d 100644 --- a/bun.lock +++ b/bun.lock @@ -5,6 +5,7 @@ "": { "name": "@metabase/database-metadata", "dependencies": { + "@streamparser/json-node": "^0.0.22", "js-yaml": "^4.1.0", }, "devDependencies": { @@ -84,6 +85,10 @@ "@oxfmt/binding-win32-x64-msvc": ["@oxfmt/binding-win32-x64-msvc@0.45.0", "", { "os": "win32", "cpu": "x64" }, "sha512-w5MMTRCK1dpQeRA+HHqXQXyN33DlG/N2LOYxJmaT4fJjcmZrbNnqw7SmIk7I2/a2493PPLZ+2E/Ar6t2iKVMug=="], + "@streamparser/json": ["@streamparser/json@0.0.22", "", {}, "sha512-b6gTSBjJ8G8SuO3Gbbj+zXbVx8NSs1EbpbMKpzGLWMdkR+98McH9bEjSz3+0mPJf68c5nxa3CrJHp5EQNXM6zQ=="], + + "@streamparser/json-node": ["@streamparser/json-node@0.0.22", "", { "dependencies": { "@streamparser/json": "^0.0.22" } }, "sha512-sJT2ptNRwqB1lIsQrQlCoWk5rF4tif9wDh+7yluAGijJamAhrHGYpFB/Zg3hJeceoZypi74ftXk8DHzwYpbZSg=="], + "@types/bun": ["@types/bun@1.3.12", "", { "dependencies": { "bun-types": "1.3.12" } }, "sha512-DBv81elK+/VSwXHDlnH3Qduw+KxkTIWi7TXkAeh24zpi5l0B2kUg9Ga3tb4nJaPcOFswflgi/yAvMVBPrxMB+A=="], "@types/esrecurse": ["@types/esrecurse@4.3.1", "", {}, "sha512-xJBAbDifo5hpffDBuHl0Y8ywswbiAp/Wi7Y/GtAgSlZyIABppyurxVueOPE8LUQOxdlgi6Zqce7uoEpqNTeiUw=="], diff --git a/package.json b/package.json index ac9fc58..4d4495c 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "test": "bun test" }, "dependencies": { + "@streamparser/json-node": "^0.0.22", "js-yaml": "^4.1.0" }, "devDependencies": { diff --git a/src/index.ts b/src/index.ts index c904ea4..7162500 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,3 +13,9 @@ export { type ExtractSpecOptions, type ExtractSpecResult, } from "./extract-spec.js"; +export { + uploadMetadata, + type UploadMetadataOptions, + type UploadMetadataResult, + type UploadStepStats, +} from "./upload-metadata.js"; diff --git a/src/ndjson.ts b/src/ndjson.ts new file mode 100644 index 0000000..cbf0664 --- /dev/null +++ b/src/ndjson.ts @@ -0,0 +1,120 @@ +export type PostNdjsonOptions = { + url: string; + apiKey: string; + requests: AsyncIterable; + onResponse: (response: Res, index: number) => void | Promise; +}; + +// Node's fetch RequestInit does not type `duplex` in lib.dom, but it is required +// when sending a streaming body. +type StreamingRequestInit = RequestInit & { duplex: "half" }; + +export async function postNdjson({ + url, + apiKey, + requests, + onResponse, +}: PostNdjsonOptions): Promise { + const iterator = requests[Symbol.asyncIterator](); + const encoder = new TextEncoder(); + + const body = new ReadableStream({ + async pull(controller) { + try { + const { value, done } = await iterator.next(); + if (done) { + controller.close(); + return; + } + controller.enqueue(encoder.encode(JSON.stringify(value) + "\n")); + } catch (error) { + controller.error(error); + } + }, + async cancel(reason) { + if (typeof iterator.return === "function") { + await iterator.return(reason); + } + }, + }); + + const init: StreamingRequestInit = { + method: "POST", + headers: { + "Content-Type": "application/x-ndjson", + "X-API-Key": apiKey, + }, + body, + duplex: "half", + }; + const response = await fetch(url, init); + + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error( + `POST ${url} failed: ${response.status} ${response.statusText} ${text}`.trim(), + ); + } + if (!response.body) { + throw new Error(`POST ${url} returned an empty body`); + } + + let index = 0; + for await (const parsed of parseNdjsonStream(response.body)) { + await onResponse(parsed, index); + index += 1; + } +} + +export async function* parseNdjsonStream( + stream: ReadableStream, +): AsyncGenerator { + const reader = stream.getReader(); + const decoder = new TextDecoder(); + // Slice once per chunk (not per line) to keep parsing O(n) on large responses. + let pending = ""; + + try { + while (true) { + const { value, done } = await reader.read(); + if (value) { + const chunk = pending + decoder.decode(value, { stream: true }); + const lastNewline = chunk.lastIndexOf("\n"); + if (lastNewline === -1) { + pending = chunk; + } else { + for (const line of splitLines(chunk.slice(0, lastNewline))) { + yield JSON.parse(line) as T; + } + pending = chunk.slice(lastNewline + 1); + } + } + if (done) { + break; + } + } + } finally { + reader.releaseLock(); + } + + const trailing = pending.trim(); + if (trailing.length > 0) { + yield JSON.parse(trailing) as T; + } +} + +function* splitLines(block: string): Generator { + let start = 0; + while (start <= block.length) { + const newlineIndex = block.indexOf("\n", start); + const end = newlineIndex === -1 ? block.length : newlineIndex; + const line = block.slice(start, end).trim(); + if (line.length > 0) { + yield line; + } + if (newlineIndex === -1) { + return; + } + start = newlineIndex + 1; + } +} diff --git a/src/stream-json.ts b/src/stream-json.ts new file mode 100644 index 0000000..1fe696b --- /dev/null +++ b/src/stream-json.ts @@ -0,0 +1,22 @@ +import { createReadStream } from "node:fs"; +import { JSONParser } from "@streamparser/json-node"; + +type ParsedElement = { value: T }; + +export async function* streamJsonElements( + filePath: string, + jsonPath: string, +): AsyncGenerator { + const parser = new JSONParser({ paths: [jsonPath], keepStack: false }); + const fileStream = createReadStream(filePath); + fileStream.pipe(parser); + try { + for await (const chunk of parser) { + yield (chunk as ParsedElement).value; + } + } finally { + fileStream.unpipe(parser); + fileStream.destroy(); + parser.destroy(); + } +} diff --git a/src/upload-metadata.test.ts b/src/upload-metadata.test.ts new file mode 100644 index 0000000..bf99f35 --- /dev/null +++ b/src/upload-metadata.test.ts @@ -0,0 +1,350 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { join, resolve } from "path"; + +import { parseNdjsonStream } from "./ndjson.js"; +import { uploadMetadata } from "./upload-metadata.js"; + +const REPO_ROOT = resolve(import.meta.dirname, ".."); +const EXAMPLE_METADATA = join(REPO_ROOT, "examples/v1/metadata.json"); +const EXAMPLE_FIELD_VALUES = join(REPO_ROOT, "examples/v1/field-values.json"); + +const DB_OFFSET = 1000; +const TABLE_OFFSET = 2000; +const FIELD_OFFSET = 3000; + +type RecordedCall = { + path: string; + contentType: string; + transferEncoding: string | null; + apiKey: string | null; + lines: unknown[]; +}; + +type MockServerControl = { + baseUrl: string; + calls: RecordedCall[]; + stop: () => Promise; + setFieldInsertBehavior: (behavior: FieldInsertBehavior) => void; + setFieldFailure: (oldId: number) => void; +}; + +type FieldInsertBehavior = "new" | "existing" | "alternate"; + +type IdLine = { id: number }; +type TableLine = { id: number; db_id: number }; +type FieldInsertLine = Record & { table_id: number }; +type FinalizeLine = { + id: number; + parent_id: number | null; + fk_target_field_id: number | null; +}; +type FieldValuesLine = { field_id: number }; + +async function readNdjsonLines( + stream: ReadableStream, +): Promise { + const lines: unknown[] = []; + for await (const line of parseNdjsonStream(stream)) { + lines.push(line); + } + return lines; +} + +function ndjsonStreamResponse( + responses: AsyncIterable, +): Response { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + async start(controller) { + try { + for await (const response of responses) { + controller.enqueue( + encoder.encode(JSON.stringify(response) + "\n"), + ); + } + controller.close(); + } catch (error) { + controller.error(error); + } + }, + }); + return new Response(body, { + headers: { "Content-Type": "application/x-ndjson" }, + }); +} + +function startMockServer(): MockServerControl { + const calls: RecordedCall[] = []; + let fieldInsertBehavior: FieldInsertBehavior = "new"; + const fieldFailures = new Set(); + let fieldInsertCounter = 0; + + const server = Bun.serve({ + port: 0, + async fetch(request) { + const url = new URL(request.url); + const path = url.pathname; + const contentType = request.headers.get("Content-Type") ?? ""; + const transferEncoding = request.headers.get("Transfer-Encoding"); + const apiKey = request.headers.get("X-API-Key"); + if (!request.body) { + return new Response("missing body", { status: 400 }); + } + const lines = await readNdjsonLines(request.body); + calls.push({ path, contentType, transferEncoding, apiKey, lines }); + + switch (path) { + case "/api/database/metadata/databases": { + async function* responses() { + for (const line of lines as IdLine[]) { + yield { old_id: line.id, new_id: line.id + DB_OFFSET }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/metadata/tables": { + async function* responses() { + for (const line of lines as IdLine[]) { + yield { old_id: line.id, new_id: line.id + TABLE_OFFSET }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/metadata/fields": { + async function* responses() { + for (const line of lines as IdLine[]) { + if (fieldFailures.has(line.id)) { + yield { + old_id: line.id, + error: "invalid_table_id", + detail: "test failure", + }; + continue; + } + const newId = line.id + FIELD_OFFSET; + const inserted = + fieldInsertBehavior === "new" || + (fieldInsertBehavior === "alternate" && + fieldInsertCounter++ % 2 === 0); + yield inserted + ? { old_id: line.id, new_id: newId } + : { old_id: line.id, existing_id: newId }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/metadata/fields/finalize": { + async function* responses() { + for (const line of lines as IdLine[]) { + yield { id: line.id, ok: true }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/field-values": { + async function* responses() { + for (const line of lines as FieldValuesLine[]) { + yield { field_id: line.field_id, created: true }; + } + } + return ndjsonStreamResponse(responses()); + } + default: + return new Response("not found", { status: 404 }); + } + }, + }); + + return { + baseUrl: `http://127.0.0.1:${server.port}`, + calls, + stop: () => server.stop(), + setFieldInsertBehavior: (behavior) => { + fieldInsertBehavior = behavior; + fieldInsertCounter = 0; + }, + setFieldFailure: (oldId) => { + fieldFailures.add(oldId); + }, + }; +} + +describe("uploadMetadata", () => { + let mock: MockServerControl; + + beforeEach(() => { + mock = startMockServer(); + }); + + afterEach(async () => { + await mock.stop(); + }); + + it("runs the full pipeline and remaps ids across passes", async () => { + const stats = await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + fieldValuesFile: EXAMPLE_FIELD_VALUES, + instanceUrl: mock.baseUrl, + apiKey: "test-key", + onWarning: () => {}, + }); + + expect(stats).toEqual({ + databases: { mapped: 1, errors: 0 }, + tables: { mapped: 8, errors: 0 }, + fieldsInsert: { mapped: 71, errors: 0 }, + fieldsFinalize: { mapped: 71, errors: 0 }, + fieldValues: { mapped: 4, errors: 0 }, + }); + + const paths = mock.calls.map((call) => call.path); + // The first three steps are strictly sequential (each feeds the next's + // id map); finalize and field-values are kicked off concurrently once + // the field id map is populated. + expect(paths.slice(0, 3)).toEqual([ + "/api/database/metadata/databases", + "/api/database/metadata/tables", + "/api/database/metadata/fields", + ]); + expect(paths.slice(3).sort()).toEqual([ + "/api/database/field-values", + "/api/database/metadata/fields/finalize", + ]); + + for (const call of mock.calls) { + expect(call.contentType).toBe("application/x-ndjson"); + expect(call.apiKey).toBe("test-key"); + } + }); + + it("rewrites db_id on tables using the step-1 mapping", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const tableCall = mock.calls.find( + (call) => call.path === "/api/database/metadata/tables", + )!; + const sampleDbNewId = 1 + DB_OFFSET; + for (const line of tableCall.lines as TableLine[]) { + expect(line.db_id).toBe(sampleDbNewId); + } + }); + + it("rewrites table_id on fields using the step-3 mapping and strips fk/parent on insert", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const fieldsCall = mock.calls.find( + (call) => call.path === "/api/database/metadata/fields", + )!; + for (const line of fieldsCall.lines as FieldInsertLine[]) { + expect(line.table_id).toBeGreaterThanOrEqual(TABLE_OFFSET + 1); + expect(line.table_id).toBeLessThanOrEqual(TABLE_OFFSET + 8); + expect(line).not.toHaveProperty("parent_id"); + expect(line).not.toHaveProperty("fk_target_field_id"); + } + }); + + it("sends remapped parent_id and fk_target_field_id in finalize", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const finalizeCall = mock.calls.find( + (call) => call.path === "/api/database/metadata/fields/finalize", + )!; + const lines = finalizeCall.lines as FinalizeLine[]; + + for (const line of lines) { + expect(line.id).toBeGreaterThanOrEqual(FIELD_OFFSET + 1); + if (line.fk_target_field_id !== null) { + expect(line.fk_target_field_id).toBeGreaterThanOrEqual( + FIELD_OFFSET + 1, + ); + } + } + + const fkCount = lines.filter( + (line) => line.fk_target_field_id !== null, + ).length; + expect(fkCount).toBeGreaterThan(0); + }); + + it("skips non-inserted rows in finalize (existing_id responses)", async () => { + mock.setFieldInsertBehavior("existing"); + const stats = await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + + expect(stats.fieldsInsert.mapped).toBe(71); + expect(stats.fieldsFinalize.mapped).toBe(0); + expect(stats.fieldsFinalize.errors).toBe(0); + }); + + it("rewrites field_id on field-values using the step-3 mapping", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + fieldValuesFile: EXAMPLE_FIELD_VALUES, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const valuesCall = mock.calls.find( + (call) => call.path === "/api/database/field-values", + )!; + for (const line of valuesCall.lines as FieldValuesLine[]) { + expect(line.field_id).toBeGreaterThanOrEqual(FIELD_OFFSET + 1); + } + }); + + it("counts per-row errors without aborting the pipeline", async () => { + mock.setFieldFailure(1); + const warnings: string[] = []; + const stats = await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + fieldValuesFile: EXAMPLE_FIELD_VALUES, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: (message) => warnings.push(message), + }); + + expect(stats.fieldsInsert.errors).toBe(1); + expect(stats.fieldsInsert.mapped).toBe(70); + expect(stats.fieldsFinalize.mapped).toBe(70); + expect(warnings.some((w) => w.includes("Field 1"))).toBe(true); + }); + + it("streams the request body with chunked transfer encoding", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + for (const call of mock.calls) { + expect(call.transferEncoding).toBe("chunked"); + } + }); + + it("skips the field-values step when the file is not provided", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const paths = mock.calls.map((call) => call.path); + expect(paths).not.toContain("/api/database/field-values"); + }); +}); diff --git a/src/upload-metadata.ts b/src/upload-metadata.ts new file mode 100644 index 0000000..215dd72 --- /dev/null +++ b/src/upload-metadata.ts @@ -0,0 +1,340 @@ +import { postNdjson } from "./ndjson.js"; +import { streamJsonElements } from "./stream-json.js"; + +export const API_PATHS = { + databases: "/api/database/metadata/databases", + tables: "/api/database/metadata/tables", + fields: "/api/database/metadata/fields", + fieldsFinalize: "/api/database/metadata/fields/finalize", + fieldValues: "/api/database/field-values", +} as const; + +const JSON_PATHS = { + databases: "$.databases.*", + tables: "$.tables.*", + fields: "$.fields.*", + fieldValues: "$.field_values.*", +} as const; + +export type UploadMetadataOptions = { + metadataFile: string; + fieldValuesFile?: string; + instanceUrl: string; + apiKey: string; + onWarning?: (message: string) => void; +}; + +export type UploadStepStats = { + mapped: number; + errors: number; +}; + +export type UploadMetadataResult = { + databases: UploadStepStats; + tables: UploadStepStats; + fieldsInsert: UploadStepStats; + fieldsFinalize: UploadStepStats; + fieldValues: UploadStepStats; +}; + +type DatabaseEntry = { + id: number; + name: string; + engine: string; +}; + +type TableEntry = { + id: number; + db_id: number; + name: string; + schema: string | null; + description?: string; +}; + +type FieldEntry = { + id: number; + table_id: number; + name: string; + base_type?: string; + database_type?: string; + description?: string | null; + semantic_type?: string | null; + effective_type?: string | null; + coercion_strategy?: string | null; + parent_id?: number | null; + fk_target_field_id?: number | null; +}; + +type FieldInsertRequest = Omit; + +type FieldFinalizeRequest = { + id: number; + parent_id: number | null; + fk_target_field_id: number | null; +}; + +type FieldValuesEntry = { + field_id: number; + values: unknown[]; + has_more_values?: boolean; + human_readable_values?: string[]; +}; + +type IdMapResponse = + | { old_id: number; new_id: number } + | { old_id: number; existing_id: number } + | { old_id: number; error: string; detail?: string }; + +type FieldFinalizeResponse = + | { id: number; ok: true } + | { id: number; error: string; detail?: string }; + +type FieldValuesResponse = + | { field_id: number; created: true } + | { field_id: number; updated: true } + | { field_id: number; error: string; detail?: string }; + +type RecordIdMapResponseOptions = { + response: IdMapResponse; + stats: UploadStepStats; + idMap: Map; + label: string; + onInserted?: (oldId: number) => void; +}; + +function joinUrl(baseUrl: string, path: string): string { + return `${baseUrl.replace(/\/$/, "")}${path}`; +} + +function emptyStats(): UploadStepStats { + return { mapped: 0, errors: 0 }; +} + +function formatError( + label: string, + id: number, + response: { error?: string; detail?: string }, +): string { + const suffix = response.detail ? ` — ${response.detail}` : ""; + return `${label} ${id}: ${response.error}${suffix}`; +} + +export async function uploadMetadata({ + metadataFile, + fieldValuesFile, + instanceUrl, + apiKey, + onWarning, +}: UploadMetadataOptions): Promise { + const warn = onWarning ?? ((message: string) => console.warn(message)); + + const databaseIdMap = new Map(); + const tableIdMap = new Map(); + const fieldIdMap = new Map(); + const insertedFieldIds = new Set(); + + const result: UploadMetadataResult = { + databases: emptyStats(), + tables: emptyStats(), + fieldsInsert: emptyStats(), + fieldsFinalize: emptyStats(), + fieldValues: emptyStats(), + }; + + function recordIdMapResponse({ + response, + stats, + idMap, + label, + onInserted, + }: RecordIdMapResponseOptions): void { + if ("new_id" in response) { + idMap.set(response.old_id, response.new_id); + onInserted?.(response.old_id); + stats.mapped += 1; + return; + } + if ("existing_id" in response) { + idMap.set(response.old_id, response.existing_id); + stats.mapped += 1; + return; + } + stats.errors += 1; + warn(formatError(label, response.old_id, response)); + } + + async function* remapForeignKey(opts: { + jsonPath: string; + sourceFile: string; + getKey: (entry: In) => number; + idMap: Map; + transform: (entry: In, newKey: number) => Out; + describeSkip: (entry: In, missingKey: number) => string; + }): AsyncGenerator { + for await (const entry of streamJsonElements( + opts.sourceFile, + opts.jsonPath, + )) { + const oldKey = opts.getKey(entry); + const newKey = opts.idMap.get(oldKey); + if (newKey === undefined) { + warn(opts.describeSkip(entry, oldKey)); + continue; + } + yield opts.transform(entry, newKey); + } + } + + function remapFieldReference( + oldId: number | null | undefined, + ownerFieldId: number, + referenceName: "parent_id" | "fk_target_field_id", + ): number | null { + if (oldId == null) { + return null; + } + const newId = fieldIdMap.get(oldId); + if (newId === undefined) { + warn( + `Field ${ownerFieldId}: dropping ${referenceName} → ${oldId} (referenced field was not mapped)`, + ); + return null; + } + return newId; + } + + async function* fieldFinalizeRequests(): AsyncGenerator { + for await (const field of streamJsonElements( + metadataFile, + JSON_PATHS.fields, + )) { + if (!insertedFieldIds.has(field.id)) { + continue; + } + const newId = fieldIdMap.get(field.id); + if (newId === undefined) { + continue; + } + yield { + id: newId, + parent_id: remapFieldReference(field.parent_id, field.id, "parent_id"), + fk_target_field_id: remapFieldReference( + field.fk_target_field_id, + field.id, + "fk_target_field_id", + ), + }; + } + } + + await postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.databases), + apiKey, + requests: streamJsonElements( + metadataFile, + JSON_PATHS.databases, + ), + onResponse: (response) => + recordIdMapResponse({ + response, + stats: result.databases, + idMap: databaseIdMap, + label: "Database", + }), + }); + + await postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.tables), + apiKey, + requests: remapForeignKey({ + jsonPath: JSON_PATHS.tables, + sourceFile: metadataFile, + getKey: (table) => table.db_id, + idMap: databaseIdMap, + transform: (table, newDbId) => ({ ...table, db_id: newDbId }), + describeSkip: (table, oldDbId) => + `Skipping table ${table.id} (${table.name}): source db_id ${oldDbId} did not map to a target database`, + }), + onResponse: (response) => + recordIdMapResponse({ + response, + stats: result.tables, + idMap: tableIdMap, + label: "Table", + }), + }); + + await postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.fields), + apiKey, + requests: remapForeignKey({ + jsonPath: JSON_PATHS.fields, + sourceFile: metadataFile, + getKey: (field) => field.table_id, + idMap: tableIdMap, + transform: (field, newTableId) => { + const { + parent_id: _parent_id, + fk_target_field_id: _fk_target_field_id, + ...rest + } = field; + return { ...rest, table_id: newTableId }; + }, + describeSkip: (field, oldTableId) => + `Skipping field ${field.id} (${field.name}): source table_id ${oldTableId} did not map to a target table`, + }), + onResponse: (response) => + recordIdMapResponse({ + response, + stats: result.fieldsInsert, + idMap: fieldIdMap, + label: "Field", + onInserted: (oldId) => insertedFieldIds.add(oldId), + }), + }); + + const finalizePass = postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.fieldsFinalize), + apiKey, + requests: fieldFinalizeRequests(), + onResponse: (response) => { + if ("ok" in response) { + result.fieldsFinalize.mapped += 1; + return; + } + result.fieldsFinalize.errors += 1; + warn(formatError("Finalize", response.id, response)); + }, + }); + + const fieldValuesPass = fieldValuesFile + ? postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.fieldValues), + apiKey, + requests: remapForeignKey({ + jsonPath: JSON_PATHS.fieldValues, + sourceFile: fieldValuesFile, + getKey: (entry) => entry.field_id, + idMap: fieldIdMap, + transform: (entry, newFieldId) => ({ + ...entry, + field_id: newFieldId, + }), + describeSkip: (entry, oldId) => + `Skipping field values for field_id ${oldId}: no mapping from source field to target`, + }), + onResponse: (response) => { + if ("error" in response) { + result.fieldValues.errors += 1; + warn(formatError("Field values", response.field_id, response)); + return; + } + result.fieldValues.mapped += 1; + }, + }) + : Promise.resolve(); + + await Promise.all([finalizePass, fieldValuesPass]); + + return result; +} From e291916589de77cdb4e354ad68d6410cd5fde5a9 Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 10:43:44 -0400 Subject: [PATCH 2/9] download too --- bin/cli.test.ts | 151 ++++++++++++++++++++++++++++ bin/cli.ts | 72 ++++++++++++++ package.json | 2 +- src/download-metadata.test.ts | 180 ++++++++++++++++++++++++++++++++++ src/download-metadata.ts | 89 +++++++++++++++++ src/index.ts | 5 + 6 files changed, 498 insertions(+), 1 deletion(-) create mode 100644 src/download-metadata.test.ts create mode 100644 src/download-metadata.ts diff --git a/bin/cli.test.ts b/bin/cli.test.ts index 7470185..037ff2d 100644 --- a/bin/cli.test.ts +++ b/bin/cli.test.ts @@ -217,6 +217,157 @@ describe("cli", () => { }); }); + describe("download-metadata", () => { + let workdir: string; + + beforeEach(() => { + workdir = mkdtempSync(join(tmpdir(), "download-metadata-cli-")); + }); + + afterEach(() => { + rmSync(workdir, { recursive: true, force: true }); + }); + + it("errors when is missing", () => { + const { stderr, exitCode } = runCli(["download-metadata"]); + expect(exitCode).toBe(1); + expect(stderr).toContain(""); + }); + + async function runDownloadCli( + serverPort: number, + extraArgs: string[], + cwd: string, + ): Promise<{ stdout: string; stderr: string; exitCode: number | null }> { + const proc = Bun.spawn({ + cmd: [ + "bun", + "run", + join(REPO_ROOT, CLI), + "download-metadata", + `http://127.0.0.1:${serverPort}`, + ...extraArgs, + ], + cwd, + env: { ...process.env, METABASE_API_KEY: "ci-key" }, + stdout: "pipe", + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([ + new Response(proc.stdout).text(), + new Response(proc.stderr).text(), + proc.exited, + ]); + return { stdout, stderr, exitCode }; + } + + function startMockServer() { + const EXAMPLE_METADATA_PATH = join(REPO_ROOT, EXAMPLE_INPUT); + const EXAMPLE_VALUES_PATH = join(REPO_ROOT, EXAMPLE_FIELD_VALUES); + return Bun.serve({ + port: 0, + fetch(request) { + const url = new URL(request.url); + if (url.pathname === "/api/database/metadata") { + return new Response(Bun.file(EXAMPLE_METADATA_PATH)); + } + if (url.pathname === "/api/database/field-values") { + return new Response(Bun.file(EXAMPLE_VALUES_PATH)); + } + return new Response("not found", { status: 404 }); + }, + }); + } + + it("defaults paths to .metabase/ when no flags are given", async () => { + const server = startMockServer(); + try { + const { stdout, stderr, exitCode } = await runDownloadCli( + server.port, + [], + workdir, + ); + expect(stderr).toBe(""); + expect(exitCode).toBe(0); + expect(stdout).toContain(".metabase/metadata.json"); + expect(stdout).toContain(".metabase/field-values.json"); + expect(stdout).toContain(".metabase/databases"); + + expect(existsSync(join(workdir, ".metabase/metadata.json"))).toBe(true); + expect(existsSync(join(workdir, ".metabase/field-values.json"))).toBe( + true, + ); + expect( + existsSync( + join( + workdir, + ".metabase/databases/Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", + ), + ), + ).toBe(true); + } finally { + await server.stop(); + } + }); + + it("honors --no-field-values and --no-extract", async () => { + const server = startMockServer(); + try { + const { stderr, exitCode, stdout } = await runDownloadCli( + server.port, + ["--no-field-values", "--no-extract"], + workdir, + ); + expect(stderr).toBe(""); + expect(exitCode).toBe(0); + expect(stdout).not.toContain("Field values:"); + expect(stdout).not.toContain("Extracted to:"); + expect(existsSync(join(workdir, ".metabase/metadata.json"))).toBe(true); + expect(existsSync(join(workdir, ".metabase/field-values.json"))).toBe( + false, + ); + expect(existsSync(join(workdir, ".metabase/databases"))).toBe(false); + } finally { + await server.stop(); + } + }); + + it("overrides paths via flags", async () => { + const server = startMockServer(); + try { + const metadataFile = join(workdir, "custom-metadata.json"); + const fieldValuesFile = join(workdir, "custom-values.json"); + const extractFolder = join(workdir, "custom-databases"); + const { stderr, exitCode } = await runDownloadCli( + server.port, + [ + "--metadata", + metadataFile, + "--field-values", + fieldValuesFile, + "--extract", + extractFolder, + ], + REPO_ROOT, + ); + expect(stderr).toBe(""); + expect(exitCode).toBe(0); + expect(existsSync(metadataFile)).toBe(true); + expect(existsSync(fieldValuesFile)).toBe(true); + expect( + existsSync( + join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", + ), + ), + ).toBe(true); + } finally { + await server.stop(); + } + }); + }); + describe("extract-spec", () => { let workdir: string; diff --git a/bin/cli.ts b/bin/cli.ts index e59ee46..a8a0e44 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -2,6 +2,7 @@ import { parseArgs } from "node:util"; +import { downloadMetadata } from "../src/download-metadata.js"; import { extractFieldValues } from "../src/extract-field-values.js"; import { extractMetadata } from "../src/extract-metadata.js"; import { extractSpec } from "../src/extract-spec.js"; @@ -14,10 +15,20 @@ import { type ParsedValues = { file?: string; help?: boolean; + metadata?: string; "field-values"?: string; + extract?: string; + "no-field-values"?: boolean; + "no-extract"?: boolean; "api-key"?: string; }; +const DEFAULT_DOWNLOAD_PATHS = { + metadata: ".metabase/metadata.json", + fieldValues: ".metabase/field-values.json", + extract: ".metabase/databases", +} as const; + const HELP = `Usage: database-metadata [arguments] [options] Commands: @@ -39,6 +50,16 @@ Commands: --field-values Optional field-values JSON file to upload after metadata --api-key API key. Defaults to METABASE_API_KEY env var. + download-metadata Stream metadata + field values from a + Metabase instance into .metabase/ and + extract the YAML tree by default. + --metadata Override metadata.json path (default: .metabase/metadata.json) + --field-values Override field-values.json path (default: .metabase/field-values.json) + --extract Override YAML extract folder (default: .metabase/databases) + --no-field-values Skip downloading field values + --no-extract Skip YAML extraction + --api-key API key. Defaults to METABASE_API_KEY env var. + Options: -h, --help Show this help message`; @@ -48,7 +69,11 @@ function parseArguments() { options: { file: { type: "string" }, help: { type: "boolean", short: "h", default: false }, + metadata: { type: "string" }, "field-values": { type: "string" }, + extract: { type: "string" }, + "no-field-values": { type: "boolean", default: false }, + "no-extract": { type: "boolean", default: false }, "api-key": { type: "string" }, }, }); @@ -153,6 +178,51 @@ function hasAnyErrors(stats: UploadMetadataResult): boolean { return Object.values(stats).some((step) => step.errors > 0); } +async function handleDownloadMetadata( + positionals: string[], + values: ParsedValues, +): Promise { + const instanceUrl = positionals[1]; + + if (!instanceUrl) { + console.error("Error: argument is required"); + process.exit(1); + } + + const apiKey = values["api-key"] ?? process.env.METABASE_API_KEY; + if (!apiKey) { + console.error( + "Error: API key is required (pass --api-key or set METABASE_API_KEY)", + ); + process.exit(1); + } + + const metadataFile = values.metadata ?? DEFAULT_DOWNLOAD_PATHS.metadata; + const fieldValuesFile = values["no-field-values"] + ? undefined + : (values["field-values"] ?? DEFAULT_DOWNLOAD_PATHS.fieldValues); + const extractFolder = values["no-extract"] + ? undefined + : (values.extract ?? DEFAULT_DOWNLOAD_PATHS.extract); + + const result = await downloadMetadata({ + instanceUrl, + apiKey, + metadataFile, + fieldValuesFile, + extractFolder, + }); + const lines = [`Metadata: ${result.metadataFile}`]; + if (result.fieldValuesFile) { + lines.push(`Field values: ${result.fieldValuesFile}`); + } + if (result.extractFolder) { + lines.push(`Extracted to: ${result.extractFolder}`); + } + console.log(lines.join("\n")); + process.exit(0); +} + function handleExtractSpec(values: ParsedValues): void { const { target } = extractSpec({ file: values.file ?? "spec.md" }); console.log(`Spec extracted to ${target}`); @@ -177,6 +247,8 @@ async function main(): Promise { return handleExtractSpec(values); case "upload-metadata": return handleUploadMetadata(positionals, values); + case "download-metadata": + return handleDownloadMetadata(positionals, values); default: console.error(`Unknown command: ${command}`); process.exit(1); diff --git a/package.json b/package.json index 4d4495c..743dea8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@metabase/database-metadata", - "version": "1.0.2", + "version": "1.1.0", "description": "CLI tool to extract Metabase database metadata into YAML files", "license": "SEE LICENSE IN LICENSE.txt", "repository": { diff --git a/src/download-metadata.test.ts b/src/download-metadata.test.ts new file mode 100644 index 0000000..c154831 --- /dev/null +++ b/src/download-metadata.test.ts @@ -0,0 +1,180 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { + existsSync, + mkdtempSync, + readFileSync, + rmSync, + statSync, +} from "fs"; +import { tmpdir } from "os"; +import { join, resolve } from "path"; + +import { downloadMetadata } from "./download-metadata.js"; + +const REPO_ROOT = resolve(import.meta.dirname, ".."); +const EXAMPLE_METADATA = join(REPO_ROOT, "examples/v1/metadata.json"); +const EXAMPLE_FIELD_VALUES = join(REPO_ROOT, "examples/v1/field-values.json"); + +type MockServerControl = { + baseUrl: string; + apiKeysSeen: string[]; + stop: () => Promise; +}; + +type MockServerOptions = { + metadataStatus?: number; + fieldValuesStatus?: number; +}; + +function startMockServer(options: MockServerOptions = {}): MockServerControl { + const apiKeysSeen: string[] = []; + const metadataStatus = options.metadataStatus ?? 200; + const fieldValuesStatus = options.fieldValuesStatus ?? 200; + + const server = Bun.serve({ + port: 0, + fetch(request) { + const url = new URL(request.url); + apiKeysSeen.push(request.headers.get("X-API-Key") ?? ""); + if (url.pathname === "/api/database/metadata") { + if (metadataStatus !== 200) { + return new Response("boom", { status: metadataStatus }); + } + return new Response(Bun.file(EXAMPLE_METADATA)); + } + if (url.pathname === "/api/database/field-values") { + if (fieldValuesStatus !== 200) { + return new Response("boom", { status: fieldValuesStatus }); + } + return new Response(Bun.file(EXAMPLE_FIELD_VALUES)); + } + return new Response("not found", { status: 404 }); + }, + }); + + return { + baseUrl: `http://127.0.0.1:${server.port}`, + apiKeysSeen, + stop: () => server.stop(), + }; +} + +describe("downloadMetadata", () => { + let workdir: string; + let mock: MockServerControl; + + beforeEach(() => { + workdir = mkdtempSync(join(tmpdir(), "download-metadata-")); + mock = startMockServer(); + }); + + afterEach(async () => { + await mock.stop(); + rmSync(workdir, { recursive: true, force: true }); + }); + + it("streams metadata.json to the configured path", async () => { + const metadataFile = join(workdir, "nested", "metadata.json"); + const result = await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "test-key", + metadataFile, + }); + expect(result.metadataFile).toBe(metadataFile); + expect(existsSync(metadataFile)).toBe(true); + const downloaded = readFileSync(metadataFile, "utf8"); + const expected = readFileSync(EXAMPLE_METADATA, "utf8"); + expect(downloaded).toBe(expected); + expect(mock.apiKeysSeen).toEqual(["test-key"]); + }); + + it("downloads field-values only when a path is given", async () => { + const metadataFile = join(workdir, "metadata.json"); + const fieldValuesFile = join(workdir, "values.json"); + const result = await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + fieldValuesFile, + }); + expect(result.fieldValuesFile).toBe(fieldValuesFile); + expect(existsSync(fieldValuesFile)).toBe(true); + expect(statSync(fieldValuesFile).size).toBeGreaterThan(0); + }); + + it("extracts YAML when an extract folder is given", async () => { + const metadataFile = join(workdir, "metadata.json"); + const fieldValuesFile = join(workdir, "values.json"); + const extractFolder = join(workdir, "databases"); + const result = await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + fieldValuesFile, + extractFolder, + }); + expect(result.extractFolder).toBe(extractFolder); + + const ordersYaml = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", + ); + expect(existsSync(ordersYaml)).toBe(true); + + const stateValues = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/PEOPLE/STATE.yaml", + ); + expect(existsSync(stateValues)).toBe(true); + }); + + it("skips field-values extraction when no field-values path is given", async () => { + const metadataFile = join(workdir, "metadata.json"); + const extractFolder = join(workdir, "databases"); + await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + extractFolder, + }); + const ordersYaml = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", + ); + expect(existsSync(ordersYaml)).toBe(true); + const stateValues = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/PEOPLE/STATE.yaml", + ); + expect(existsSync(stateValues)).toBe(false); + }); + + it("throws on non-200 metadata response and does not write the file", async () => { + await mock.stop(); + mock = startMockServer({ metadataStatus: 401 }); + const metadataFile = join(workdir, "metadata.json"); + await expect( + downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + }), + ).rejects.toThrow(/401/); + expect(existsSync(metadataFile)).toBe(false); + }); + + it("throws on non-200 field-values response", async () => { + await mock.stop(); + mock = startMockServer({ fieldValuesStatus: 500 }); + const metadataFile = join(workdir, "metadata.json"); + const fieldValuesFile = join(workdir, "values.json"); + await expect( + downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + fieldValuesFile, + }), + ).rejects.toThrow(/500/); + }); +}); diff --git a/src/download-metadata.ts b/src/download-metadata.ts new file mode 100644 index 0000000..3d6f34b --- /dev/null +++ b/src/download-metadata.ts @@ -0,0 +1,89 @@ +import { createWriteStream, mkdirSync } from "node:fs"; +import { dirname } from "node:path"; +import { Readable } from "node:stream"; +import { pipeline } from "node:stream/promises"; + +import { extractFieldValues } from "./extract-field-values.js"; +import { extractMetadata } from "./extract-metadata.js"; + +export const DOWNLOAD_PATHS = { + metadata: "/api/database/metadata", + fieldValues: "/api/database/field-values", +} as const; + +export type DownloadMetadataOptions = { + instanceUrl: string; + apiKey: string; + metadataFile: string; + fieldValuesFile?: string; + extractFolder?: string; +}; + +export type DownloadMetadataResult = { + metadataFile: string; + fieldValuesFile?: string; + extractFolder?: string; +}; + +function joinUrl(baseUrl: string, path: string): string { + return `${baseUrl.replace(/\/$/, "")}${path}`; +} + +async function streamDownload( + url: string, + apiKey: string, + destination: string, +): Promise { + const response = await fetch(url, { + headers: { "X-API-Key": apiKey }, + }); + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error( + `GET ${url} failed: ${response.status} ${response.statusText} ${text}`.trim(), + ); + } + if (!response.body) { + throw new Error(`GET ${url} returned an empty body`); + } + mkdirSync(dirname(destination), { recursive: true }); + await pipeline( + Readable.fromWeb(response.body as Parameters[0]), + createWriteStream(destination), + ); +} + +export async function downloadMetadata({ + instanceUrl, + apiKey, + metadataFile, + fieldValuesFile, + extractFolder, +}: DownloadMetadataOptions): Promise { + await streamDownload( + joinUrl(instanceUrl, DOWNLOAD_PATHS.metadata), + apiKey, + metadataFile, + ); + + if (fieldValuesFile) { + await streamDownload( + joinUrl(instanceUrl, DOWNLOAD_PATHS.fieldValues), + apiKey, + fieldValuesFile, + ); + } + + if (extractFolder) { + extractMetadata({ inputFile: metadataFile, outputFolder: extractFolder }); + if (fieldValuesFile) { + extractFieldValues({ + metadataFile, + fieldValuesFile, + outputFolder: extractFolder, + }); + } + } + + return { metadataFile, fieldValuesFile, extractFolder }; +} diff --git a/src/index.ts b/src/index.ts index 7162500..e0e9b87 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,3 +19,8 @@ export { type UploadMetadataResult, type UploadStepStats, } from "./upload-metadata.js"; +export { + downloadMetadata, + type DownloadMetadataOptions, + type DownloadMetadataResult, +} from "./download-metadata.js"; From 5068e4b19ae2cdf24858a869bb7a4bfb575e3dff Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 10:57:55 -0400 Subject: [PATCH 3/9] download --- README.md | 74 ++++++++++++++++++-------- bin/cli.test.ts | 138 ++++++++++++------------------------------------ bin/cli.ts | 32 +++++------ 3 files changed, 102 insertions(+), 142 deletions(-) diff --git a/README.md b/README.md index 9e76dac..3d17130 100644 --- a/README.md +++ b/README.md @@ -22,17 +22,39 @@ Reference output for the Sample Database lives in **[examples/v1/](examples/v1/) Metadata is fetched on demand from a running Metabase instance via `GET /api/database/metadata`. The response is a flat JSON document with three arrays — `databases`, `tables`, and `fields` — streamed so that even warehouses with very large schemas can be exported without exhausting server memory. -Authenticate with either a session token (`X-Metabase-Session`) or an API key (`X-API-Key`): +Authenticate with an API key (`X-API-Key`) or session token (`X-Metabase-Session`). + +### Downloading metadata + +The CLI can fetch `metadata.json`, `field-values.json`, and extract the YAML tree in one streaming pass: ```sh -curl "$METABASE_URL/api/database/metadata" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o metadata.json +export METABASE_API_KEY=... +bunx @metabase/database-metadata download-metadata "$METABASE_URL" ``` +With no flags, the command writes: + +- `.metabase/metadata.json` +- `.metabase/field-values.json` +- `.metabase/databases/` — extracted YAML tree + +Flags override any default or opt out of individual steps: + +| Flag | Default | Purpose | +|------|---------|---------| +| `--metadata ` | `.metabase/metadata.json` | Where to write the raw metadata JSON | +| `--field-values ` | `.metabase/field-values.json` | Where to write the raw field-values JSON | +| `--extract ` | `.metabase/databases` | Where to extract the YAML tree | +| `--no-field-values` | — | Skip downloading field values | +| `--no-extract` | — | Skip YAML extraction | +| `--api-key ` | `METABASE_API_KEY` env var | API key | + +Files are streamed to disk directly — responses are never fully buffered in memory, so multi-GB exports stay lean. + ### Extracting metadata to YAML -The CLI turns that JSON into the human- and agent-friendly YAML tree described in the spec: +If you already have a `metadata.json` on disk (e.g. downloaded via `curl`), you can skip the download and extract directly: ```sh bunx @metabase/database-metadata extract-metadata @@ -43,13 +65,9 @@ bunx @metabase/database-metadata extract-metadata ### Extracting field values -Metabase keeps a sampled list of distinct values for each field that's low-cardinality enough to enumerate (the same list that powers filter dropdowns in the UI). Fetch it and extract it alongside the metadata: +Metabase keeps a sampled list of distinct values for each field that's low-cardinality enough to enumerate (the same list that powers filter dropdowns in the UI). ```sh -curl "$METABASE_URL/api/database/field-values" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o field-values.json - bunx @metabase/database-metadata extract-field-values ``` @@ -59,6 +77,26 @@ bunx @metabase/database-metadata extract-field-values ` | `.metabase/metadata.json` | Path to the metadata JSON to upload | +| `--field-values ` | `.metabase/field-values.json` | Path to the field-values JSON | +| `--no-field-values` | — | Skip uploading field values | +| `--api-key ` | `METABASE_API_KEY` env var | API key | + +Exits non-zero if any step reports row-level errors so CI can catch partial imports. The command is designed to stream 5 GB+ files without loading them into memory. + ### Extracting the spec The bundled spec can be extracted to any file — convenient for agents that need to read it locally: @@ -112,25 +150,17 @@ cp .env.template .env ### 4. Fetch and extract on demand -With `.env` populated, the end-to-end flow is: +With `.env` populated, the end-to-end flow is a single command: ```sh set -a; source .env; set +a -mkdir -p .metabase -curl -sf "$METABASE_URL/api/database/metadata" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o .metabase/metadata.json - -curl -sf "$METABASE_URL/api/database/field-values" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o .metabase/field-values.json - rm -rf .metabase/databases -bunx @metabase/database-metadata extract-metadata .metabase/metadata.json .metabase/databases -bunx @metabase/database-metadata extract-field-values .metabase/metadata.json .metabase/field-values.json .metabase/databases +bunx @metabase/database-metadata download-metadata "$METABASE_URL" ``` +That downloads `.metabase/metadata.json`, `.metabase/field-values.json`, and extracts the YAML tree into `.metabase/databases/`. Use `--no-field-values` or `--no-extract` to skip parts of the pipeline. + After this, tools and agents should read the YAML tree under `.metabase/databases/` — not `metadata.json` or `field-values.json`, which exist only as input to the extractors. ## Publishing to NPM diff --git a/bin/cli.test.ts b/bin/cli.test.ts index 037ff2d..aaa6f0d 100644 --- a/bin/cli.test.ts +++ b/bin/cli.test.ts @@ -124,24 +124,15 @@ describe("cli", () => { }); describe("upload-metadata", () => { - it("errors when arguments are missing", () => { + it("errors when is missing", () => { const { stderr, exitCode } = runCli(["upload-metadata"]); expect(exitCode).toBe(1); - expect(stderr).toContain( - " and arguments are required", - ); + expect(stderr).toContain(""); }); it("errors when no api key is set", () => { const proc = Bun.spawnSync({ - cmd: [ - "bun", - "run", - CLI, - "upload-metadata", - EXAMPLE_INPUT, - "http://127.0.0.1:1", - ], + cmd: ["bun", "run", CLI, "upload-metadata", "http://127.0.0.1:1"], cwd: REPO_ROOT, env: { ...process.env, METABASE_API_KEY: "" }, }); @@ -150,7 +141,6 @@ describe("cli", () => { }); it("uploads against a mock server end-to-end", async () => { - const server = Bun.serve({ port: 0, async fetch(request) { @@ -194,8 +184,10 @@ describe("cli", () => { "run", CLI, "upload-metadata", - EXAMPLE_INPUT, `http://127.0.0.1:${server.port}`, + "--metadata", + EXAMPLE_INPUT, + "--no-field-values", ], cwd: REPO_ROOT, env: { ...process.env, METABASE_API_KEY: "ci-key" }, @@ -210,6 +202,7 @@ describe("cli", () => { expect(exitCode).toBe(0); expect(stdoutText).toContain("Databases:"); expect(stdoutText).toContain("Finalized:"); + expect(stdoutText).not.toContain("Values:"); expect(stderrText).toBe(""); } finally { await server.stop(); @@ -234,37 +227,14 @@ describe("cli", () => { expect(stderr).toContain(""); }); - async function runDownloadCli( - serverPort: number, - extraArgs: string[], - cwd: string, - ): Promise<{ stdout: string; stderr: string; exitCode: number | null }> { - const proc = Bun.spawn({ - cmd: [ - "bun", - "run", - join(REPO_ROOT, CLI), - "download-metadata", - `http://127.0.0.1:${serverPort}`, - ...extraArgs, - ], - cwd, - env: { ...process.env, METABASE_API_KEY: "ci-key" }, - stdout: "pipe", - stderr: "pipe", - }); - const [stdout, stderr, exitCode] = await Promise.all([ - new Response(proc.stdout).text(), - new Response(proc.stderr).text(), - proc.exited, - ]); - return { stdout, stderr, exitCode }; - } - - function startMockServer() { + // End-to-end streaming + path-override via a spawned CLI against a mock + // server. Defaults-in-cwd behaviour is covered by library tests in + // src/download-metadata.test.ts — attempting the same with cwd=tmpdir + // inside bun:test reliably hangs Bun.spawn (unrelated to CLI logic). + it("overrides output paths via flags and writes each file", async () => { const EXAMPLE_METADATA_PATH = join(REPO_ROOT, EXAMPLE_INPUT); const EXAMPLE_VALUES_PATH = join(REPO_ROOT, EXAMPLE_FIELD_VALUES); - return Bun.serve({ + const server = Bun.serve({ port: 0, fetch(request) { const url = new URL(request.url); @@ -277,70 +247,17 @@ describe("cli", () => { return new Response("not found", { status: 404 }); }, }); - } - - it("defaults paths to .metabase/ when no flags are given", async () => { - const server = startMockServer(); - try { - const { stdout, stderr, exitCode } = await runDownloadCli( - server.port, - [], - workdir, - ); - expect(stderr).toBe(""); - expect(exitCode).toBe(0); - expect(stdout).toContain(".metabase/metadata.json"); - expect(stdout).toContain(".metabase/field-values.json"); - expect(stdout).toContain(".metabase/databases"); - - expect(existsSync(join(workdir, ".metabase/metadata.json"))).toBe(true); - expect(existsSync(join(workdir, ".metabase/field-values.json"))).toBe( - true, - ); - expect( - existsSync( - join( - workdir, - ".metabase/databases/Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", - ), - ), - ).toBe(true); - } finally { - await server.stop(); - } - }); - - it("honors --no-field-values and --no-extract", async () => { - const server = startMockServer(); - try { - const { stderr, exitCode, stdout } = await runDownloadCli( - server.port, - ["--no-field-values", "--no-extract"], - workdir, - ); - expect(stderr).toBe(""); - expect(exitCode).toBe(0); - expect(stdout).not.toContain("Field values:"); - expect(stdout).not.toContain("Extracted to:"); - expect(existsSync(join(workdir, ".metabase/metadata.json"))).toBe(true); - expect(existsSync(join(workdir, ".metabase/field-values.json"))).toBe( - false, - ); - expect(existsSync(join(workdir, ".metabase/databases"))).toBe(false); - } finally { - await server.stop(); - } - }); - - it("overrides paths via flags", async () => { - const server = startMockServer(); try { const metadataFile = join(workdir, "custom-metadata.json"); const fieldValuesFile = join(workdir, "custom-values.json"); const extractFolder = join(workdir, "custom-databases"); - const { stderr, exitCode } = await runDownloadCli( - server.port, - [ + const proc = Bun.spawn({ + cmd: [ + "bun", + "run", + CLI, + "download-metadata", + `http://127.0.0.1:${server.port}`, "--metadata", metadataFile, "--field-values", @@ -348,10 +265,21 @@ describe("cli", () => { "--extract", extractFolder, ], - REPO_ROOT, - ); + cwd: REPO_ROOT, + env: { ...process.env, METABASE_API_KEY: "ci-key" }, + stdout: "pipe", + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([ + new Response(proc.stdout).text(), + new Response(proc.stderr).text(), + proc.exited, + ]); expect(stderr).toBe(""); expect(exitCode).toBe(0); + expect(stdout).toContain("Metadata:"); + expect(stdout).toContain("Field values:"); + expect(stdout).toContain("Extracted to:"); expect(existsSync(metadataFile)).toBe(true); expect(existsSync(fieldValuesFile)).toBe(true); expect( diff --git a/bin/cli.ts b/bin/cli.ts index a8a0e44..757b899 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -23,7 +23,7 @@ type ParsedValues = { "api-key"?: string; }; -const DEFAULT_DOWNLOAD_PATHS = { +const DEFAULT_PATHS = { metadata: ".metabase/metadata.json", fieldValues: ".metabase/field-values.json", extract: ".metabase/databases", @@ -44,10 +44,11 @@ Commands: extract-spec Copy the bundled spec.md into a target file --file Destination file (default: ./spec.md) - upload-metadata - Stream metadata (and optional field values) - to a target Metabase instance via NDJSON. - --field-values Optional field-values JSON file to upload after metadata + upload-metadata Stream metadata + field values to a target + Metabase instance via NDJSON. + --metadata Override metadata.json path (default: .metabase/metadata.json) + --field-values Override field-values.json path (default: .metabase/field-values.json) + --no-field-values Skip uploading field values --api-key API key. Defaults to METABASE_API_KEY env var. download-metadata Stream metadata + field values from a @@ -124,13 +125,10 @@ async function handleUploadMetadata( positionals: string[], values: ParsedValues, ): Promise { - const metadataFile = positionals[1]; - const instanceUrl = positionals[2]; + const instanceUrl = positionals[1]; - if (!metadataFile || !instanceUrl) { - console.error( - "Error: and arguments are required", - ); + if (!instanceUrl) { + console.error("Error: argument is required"); process.exit(1); } @@ -142,7 +140,11 @@ async function handleUploadMetadata( process.exit(1); } - const fieldValuesFile = values["field-values"]; + const metadataFile = values.metadata ?? DEFAULT_PATHS.metadata; + const fieldValuesFile = values["no-field-values"] + ? undefined + : (values["field-values"] ?? DEFAULT_PATHS.fieldValues); + const stats = await uploadMetadata({ metadataFile, fieldValuesFile, @@ -197,13 +199,13 @@ async function handleDownloadMetadata( process.exit(1); } - const metadataFile = values.metadata ?? DEFAULT_DOWNLOAD_PATHS.metadata; + const metadataFile = values.metadata ?? DEFAULT_PATHS.metadata; const fieldValuesFile = values["no-field-values"] ? undefined - : (values["field-values"] ?? DEFAULT_DOWNLOAD_PATHS.fieldValues); + : (values["field-values"] ?? DEFAULT_PATHS.fieldValues); const extractFolder = values["no-extract"] ? undefined - : (values.extract ?? DEFAULT_DOWNLOAD_PATHS.extract); + : (values.extract ?? DEFAULT_PATHS.extract); const result = await downloadMetadata({ instanceUrl, From df7356bf33a1705bef94abb840d74d81393ce934 Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 11:18:39 -0400 Subject: [PATCH 4/9] fix --- bin/cli.ts | 20 +++- src/ndjson.ts | 36 ++++++- src/upload-metadata.test.ts | 4 +- src/upload-metadata.ts | 183 +++++++++++++++++++++++++++++------- 4 files changed, 204 insertions(+), 39 deletions(-) diff --git a/bin/cli.ts b/bin/cli.ts index 757b899..b74361c 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -160,6 +160,22 @@ function formatStepLine(label: string, step: UploadStepStats): string { return `${label} ${step.mapped}/${total} mapped (${step.errors} errors)`; } +function formatFieldsLine(stats: UploadMetadataResult["fieldsInsert"]): string { + const total = stats.mapped + stats.errors; + return `Fields: ${stats.mapped}/${total} mapped (${stats.inserted} inserted, ${stats.matched} matched, ${stats.errors} errors)`; +} + +function formatFinalizeLine( + finalize: UploadStepStats, + insertedCount: number, +): string { + const base = formatStepLine("Finalized: ", finalize); + if (insertedCount === 0 && finalize.errors === 0) { + return `${base} — no newly-inserted fields to finalize`; + } + return base; +} + function formatUploadReport( stats: UploadMetadataResult, fieldValuesRan: boolean, @@ -167,8 +183,8 @@ function formatUploadReport( const lines = [ formatStepLine("Databases: ", stats.databases), formatStepLine("Tables: ", stats.tables), - formatStepLine("Fields: ", stats.fieldsInsert), - formatStepLine("Finalized: ", stats.fieldsFinalize), + formatFieldsLine(stats.fieldsInsert), + formatFinalizeLine(stats.fieldsFinalize, stats.fieldsInsert.inserted), ]; if (fieldValuesRan) { lines.push(formatStepLine("Values: ", stats.fieldValues)); diff --git a/src/ndjson.ts b/src/ndjson.ts index cbf0664..43f8a47 100644 --- a/src/ndjson.ts +++ b/src/ndjson.ts @@ -3,20 +3,31 @@ export type PostNdjsonOptions = { apiKey: string; requests: AsyncIterable; onResponse: (response: Res, index: number) => void | Promise; + onWarning?: (message: string) => void; }; // Node's fetch RequestInit does not type `duplex` in lib.dom, but it is required // when sending a streaming body. type StreamingRequestInit = RequestInit & { duplex: "half" }; +// The server (Clojure line-seq) splits requests on raw \n, \r, or \r\n. If any +// of those bytes escape into a serialized line, it gets cut mid-object and +// returns a malformed-json error against the tail fragment. `JSON.stringify` +// escapes control chars in strings, so a positive hit here means a value's +// toJSON() produced raw control chars — we sanitize defensively and warn so +// the user can chase the upstream bug. +const RAW_NEWLINE_PATTERN = /[\n\r]/g; + export async function postNdjson({ url, apiKey, requests, onResponse, + onWarning, }: PostNdjsonOptions): Promise { const iterator = requests[Symbol.asyncIterator](); const encoder = new TextEncoder(); + let sentIndex = 0; const body = new ReadableStream({ async pull(controller) { @@ -26,7 +37,12 @@ export async function postNdjson({ controller.close(); return; } - controller.enqueue(encoder.encode(JSON.stringify(value) + "\n")); + const raw = JSON.stringify(value); + const safe = raw.includes("\n") || raw.includes("\r") + ? sanitizeRawNewlines(raw, sentIndex, onWarning) + : raw; + sentIndex += 1; + controller.enqueue(encoder.encode(safe + "\n")); } catch (error) { controller.error(error); } @@ -103,6 +119,24 @@ export async function* parseNdjsonStream( } } +function sanitizeRawNewlines( + raw: string, + index: number, + onWarning?: (message: string) => void, +): string { + const firstOffset = raw.search(RAW_NEWLINE_PATTERN); + const context = raw.slice( + Math.max(0, firstOffset - 40), + Math.min(raw.length, firstOffset + 40), + ); + onWarning?.( + `Request #${index} had a raw \\n or \\r in its serialized JSON (first at offset ${firstOffset}); sanitizing. Context: …${context}…`, + ); + return raw.replace(RAW_NEWLINE_PATTERN, (char) => + char === "\n" ? "\\n" : "\\r", + ); +} + function* splitLines(block: string): Generator { let start = 0; while (start <= block.length) { diff --git a/src/upload-metadata.test.ts b/src/upload-metadata.test.ts index bf99f35..e92f9a5 100644 --- a/src/upload-metadata.test.ts +++ b/src/upload-metadata.test.ts @@ -192,7 +192,7 @@ describe("uploadMetadata", () => { expect(stats).toEqual({ databases: { mapped: 1, errors: 0 }, tables: { mapped: 8, errors: 0 }, - fieldsInsert: { mapped: 71, errors: 0 }, + fieldsInsert: { mapped: 71, errors: 0, inserted: 71, matched: 0 }, fieldsFinalize: { mapped: 71, errors: 0 }, fieldValues: { mapped: 4, errors: 0 }, }); @@ -288,6 +288,8 @@ describe("uploadMetadata", () => { }); expect(stats.fieldsInsert.mapped).toBe(71); + expect(stats.fieldsInsert.inserted).toBe(0); + expect(stats.fieldsInsert.matched).toBe(71); expect(stats.fieldsFinalize.mapped).toBe(0); expect(stats.fieldsFinalize.errors).toBe(0); }); diff --git a/src/upload-metadata.ts b/src/upload-metadata.ts index 215dd72..509fe63 100644 --- a/src/upload-metadata.ts +++ b/src/upload-metadata.ts @@ -29,10 +29,15 @@ export type UploadStepStats = { errors: number; }; +export type UploadFieldInsertStats = UploadStepStats & { + inserted: number; + matched: number; +}; + export type UploadMetadataResult = { databases: UploadStepStats; tables: UploadStepStats; - fieldsInsert: UploadStepStats; + fieldsInsert: UploadFieldInsertStats; fieldsFinalize: UploadStepStats; fieldValues: UploadStepStats; }; @@ -48,7 +53,7 @@ type TableEntry = { db_id: number; name: string; schema: string | null; - description?: string; + description?: string | null; }; type FieldEntry = { @@ -65,7 +70,38 @@ type FieldEntry = { fk_target_field_id?: number | null; }; -type FieldInsertRequest = Omit; +type FieldValuesEntry = { + field_id: number; + values: unknown[]; + has_more_values?: boolean; + human_readable_values?: string[]; +}; + +type DatabaseRequest = { + id: number; + name: string; + engine: string; +}; + +type TableRequest = { + id: number; + db_id: number; + name: string; + schema: string | null; + description?: string | null; +}; + +type FieldInsertRequest = { + id: number; + table_id: number; + name: string; + base_type?: string; + database_type?: string; + description?: string | null; + semantic_type?: string | null; + effective_type?: string | null; + coercion_strategy?: string | null; +}; type FieldFinalizeRequest = { id: number; @@ -73,26 +109,26 @@ type FieldFinalizeRequest = { fk_target_field_id: number | null; }; -type FieldValuesEntry = { +type FieldValuesRequest = { field_id: number; values: unknown[]; - has_more_values?: boolean; + has_more_values: boolean; human_readable_values?: string[]; }; type IdMapResponse = | { old_id: number; new_id: number } | { old_id: number; existing_id: number } - | { old_id: number; error: string; detail?: string }; + | { old_id?: number; error: string; detail?: string }; type FieldFinalizeResponse = | { id: number; ok: true } - | { id: number; error: string; detail?: string }; + | { id?: number; error: string; detail?: string }; type FieldValuesResponse = | { field_id: number; created: true } | { field_id: number; updated: true } - | { field_id: number; error: string; detail?: string }; + | { field_id?: number; error: string; detail?: string }; type RecordIdMapResponseOptions = { response: IdMapResponse; @@ -100,6 +136,7 @@ type RecordIdMapResponseOptions = { idMap: Map; label: string; onInserted?: (oldId: number) => void; + onMatched?: (oldId: number) => void; }; function joinUrl(baseUrl: string, path: string): string { @@ -110,13 +147,80 @@ function emptyStats(): UploadStepStats { return { mapped: 0, errors: 0 }; } +function emptyFieldInsertStats(): UploadFieldInsertStats { + return { mapped: 0, errors: 0, inserted: 0, matched: 0 }; +} + function formatError( label: string, - id: number, + id: number | undefined, response: { error?: string; detail?: string }, ): string { - const suffix = response.detail ? ` — ${response.detail}` : ""; - return `${label} ${id}: ${response.error}${suffix}`; + const idSuffix = id === undefined ? "" : ` ${id}`; + const detailSuffix = response.detail ? ` — ${response.detail}` : ""; + return `${label}${idSuffix}: ${response.error ?? "unknown error"}${detailSuffix}`; +} + +function pickDatabaseRequest(db: DatabaseEntry): DatabaseRequest { + return { id: db.id, name: db.name, engine: db.engine }; +} + +function pickTableRequest(table: TableEntry, dbId: number): TableRequest { + const request: TableRequest = { + id: table.id, + db_id: dbId, + name: table.name, + schema: table.schema, + }; + if (table.description !== undefined) { + request.description = table.description; + } + return request; +} + +function pickFieldInsertRequest( + field: FieldEntry, + tableId: number, +): FieldInsertRequest { + const request: FieldInsertRequest = { + id: field.id, + table_id: tableId, + name: field.name, + }; + if (field.base_type !== undefined) { + request.base_type = field.base_type; + } + if (field.database_type !== undefined) { + request.database_type = field.database_type; + } + if (field.description !== undefined) { + request.description = field.description; + } + if (field.semantic_type !== undefined) { + request.semantic_type = field.semantic_type; + } + if (field.effective_type !== undefined) { + request.effective_type = field.effective_type; + } + if (field.coercion_strategy !== undefined) { + request.coercion_strategy = field.coercion_strategy; + } + return request; +} + +function pickFieldValuesRequest( + entry: FieldValuesEntry, + fieldId: number, +): FieldValuesRequest { + const request: FieldValuesRequest = { + field_id: fieldId, + values: entry.values, + has_more_values: entry.has_more_values ?? false, + }; + if (entry.human_readable_values !== undefined) { + request.human_readable_values = entry.human_readable_values; + } + return request; } export async function uploadMetadata({ @@ -136,7 +240,7 @@ export async function uploadMetadata({ const result: UploadMetadataResult = { databases: emptyStats(), tables: emptyStats(), - fieldsInsert: emptyStats(), + fieldsInsert: emptyFieldInsertStats(), fieldsFinalize: emptyStats(), fieldValues: emptyStats(), }; @@ -147,6 +251,7 @@ export async function uploadMetadata({ idMap, label, onInserted, + onMatched, }: RecordIdMapResponseOptions): void { if ("new_id" in response) { idMap.set(response.old_id, response.new_id); @@ -156,6 +261,7 @@ export async function uploadMetadata({ } if ("existing_id" in response) { idMap.set(response.old_id, response.existing_id); + onMatched?.(response.old_id); stats.mapped += 1; return; } @@ -227,13 +333,20 @@ export async function uploadMetadata({ } } - await postNdjson({ - url: joinUrl(instanceUrl, API_PATHS.databases), - apiKey, - requests: streamJsonElements( + async function* streamDatabaseRequests(): AsyncGenerator { + for await (const database of streamJsonElements( metadataFile, JSON_PATHS.databases, - ), + )) { + yield pickDatabaseRequest(database); + } + } + + await postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.databases), + apiKey, + requests: streamDatabaseRequests(), + onWarning: warn, onResponse: (response) => recordIdMapResponse({ response, @@ -243,15 +356,16 @@ export async function uploadMetadata({ }), }); - await postNdjson({ + await postNdjson({ url: joinUrl(instanceUrl, API_PATHS.tables), apiKey, - requests: remapForeignKey({ + onWarning: warn, + requests: remapForeignKey({ jsonPath: JSON_PATHS.tables, sourceFile: metadataFile, getKey: (table) => table.db_id, idMap: databaseIdMap, - transform: (table, newDbId) => ({ ...table, db_id: newDbId }), + transform: pickTableRequest, describeSkip: (table, oldDbId) => `Skipping table ${table.id} (${table.name}): source db_id ${oldDbId} did not map to a target database`, }), @@ -267,19 +381,13 @@ export async function uploadMetadata({ await postNdjson({ url: joinUrl(instanceUrl, API_PATHS.fields), apiKey, + onWarning: warn, requests: remapForeignKey({ jsonPath: JSON_PATHS.fields, sourceFile: metadataFile, getKey: (field) => field.table_id, idMap: tableIdMap, - transform: (field, newTableId) => { - const { - parent_id: _parent_id, - fk_target_field_id: _fk_target_field_id, - ...rest - } = field; - return { ...rest, table_id: newTableId }; - }, + transform: pickFieldInsertRequest, describeSkip: (field, oldTableId) => `Skipping field ${field.id} (${field.name}): source table_id ${oldTableId} did not map to a target table`, }), @@ -289,13 +397,20 @@ export async function uploadMetadata({ stats: result.fieldsInsert, idMap: fieldIdMap, label: "Field", - onInserted: (oldId) => insertedFieldIds.add(oldId), + onInserted: (oldId) => { + insertedFieldIds.add(oldId); + result.fieldsInsert.inserted += 1; + }, + onMatched: () => { + result.fieldsInsert.matched += 1; + }, }), }); const finalizePass = postNdjson({ url: joinUrl(instanceUrl, API_PATHS.fieldsFinalize), apiKey, + onWarning: warn, requests: fieldFinalizeRequests(), onResponse: (response) => { if ("ok" in response) { @@ -308,18 +423,16 @@ export async function uploadMetadata({ }); const fieldValuesPass = fieldValuesFile - ? postNdjson({ + ? postNdjson({ url: joinUrl(instanceUrl, API_PATHS.fieldValues), apiKey, - requests: remapForeignKey({ + onWarning: warn, + requests: remapForeignKey({ jsonPath: JSON_PATHS.fieldValues, sourceFile: fieldValuesFile, getKey: (entry) => entry.field_id, idMap: fieldIdMap, - transform: (entry, newFieldId) => ({ - ...entry, - field_id: newFieldId, - }), + transform: pickFieldValuesRequest, describeSkip: (entry, oldId) => `Skipping field values for field_id ${oldId}: no mapping from source field to target`, }), From b8551fff9a147789e0325f0f4add041e420ac6e8 Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 15:06:48 -0400 Subject: [PATCH 5/9] working --- bin/cli.ts | 5 + bun.lock | 63 ++++++++ src/ndjson.ts | 311 ++++++++++++++++++++++++++++-------- src/upload-metadata.test.ts | 25 ++- src/upload-metadata.ts | 83 ++++++++-- 5 files changed, 398 insertions(+), 89 deletions(-) diff --git a/bin/cli.ts b/bin/cli.ts index b74361c..3c15cfd 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -21,6 +21,7 @@ type ParsedValues = { "no-field-values"?: boolean; "no-extract"?: boolean; "api-key"?: string; + "debug-dump"?: string; }; const DEFAULT_PATHS = { @@ -50,6 +51,8 @@ Commands: --field-values Override field-values.json path (default: .metabase/field-values.json) --no-field-values Skip uploading field values --api-key API key. Defaults to METABASE_API_KEY env var. + --debug-dump Write every outgoing NDJSON line to /.ndjson + (useful for debugging malformed-json server errors). download-metadata Stream metadata + field values from a Metabase instance into .metabase/ and @@ -76,6 +79,7 @@ function parseArguments() { "no-field-values": { type: "boolean", default: false }, "no-extract": { type: "boolean", default: false }, "api-key": { type: "string" }, + "debug-dump": { type: "string" }, }, }); } @@ -150,6 +154,7 @@ async function handleUploadMetadata( fieldValuesFile, instanceUrl, apiKey, + requestDumpFolder: values["debug-dump"], }); console.log(formatUploadReport(stats, Boolean(fieldValuesFile))); process.exit(hasAnyErrors(stats) ? 1 : 0); diff --git a/bun.lock b/bun.lock index 5b9342d..9d047e0 100644 --- a/bun.lock +++ b/bun.lock @@ -15,12 +15,65 @@ "@types/node": "^25.6.0", "eslint": "^10.2.1", "oxfmt": "^0.45.0", + "tsx": "^4.21.0", "typescript": "^6.0.3", "typescript-eslint": "^8.58.2", }, }, }, "packages": { + "@esbuild/aix-ppc64": ["@esbuild/aix-ppc64@0.27.7", "", { "os": "aix", "cpu": "ppc64" }, "sha512-EKX3Qwmhz1eMdEJokhALr0YiD0lhQNwDqkPYyPhiSwKrh7/4KRjQc04sZ8db+5DVVnZ1LmbNDI1uAMPEUBnQPg=="], + + "@esbuild/android-arm": ["@esbuild/android-arm@0.27.7", "", { "os": "android", "cpu": "arm" }, "sha512-jbPXvB4Yj2yBV7HUfE2KHe4GJX51QplCN1pGbYjvsyCZbQmies29EoJbkEc+vYuU5o45AfQn37vZlyXy4YJ8RQ=="], + + "@esbuild/android-arm64": ["@esbuild/android-arm64@0.27.7", "", { "os": "android", "cpu": "arm64" }, "sha512-62dPZHpIXzvChfvfLJow3q5dDtiNMkwiRzPylSCfriLvZeq0a1bWChrGx/BbUbPwOrsWKMn8idSllklzBy+dgQ=="], + + "@esbuild/android-x64": ["@esbuild/android-x64@0.27.7", "", { "os": "android", "cpu": "x64" }, "sha512-x5VpMODneVDb70PYV2VQOmIUUiBtY3D3mPBG8NxVk5CogneYhkR7MmM3yR/uMdITLrC1ml/NV1rj4bMJuy9MCg=="], + + "@esbuild/darwin-arm64": ["@esbuild/darwin-arm64@0.27.7", "", { "os": "darwin", "cpu": "arm64" }, "sha512-5lckdqeuBPlKUwvoCXIgI2D9/ABmPq3Rdp7IfL70393YgaASt7tbju3Ac+ePVi3KDH6N2RqePfHnXkaDtY9fkw=="], + + "@esbuild/darwin-x64": ["@esbuild/darwin-x64@0.27.7", "", { "os": "darwin", "cpu": "x64" }, "sha512-rYnXrKcXuT7Z+WL5K980jVFdvVKhCHhUwid+dDYQpH+qu+TefcomiMAJpIiC2EM3Rjtq0sO3StMV/+3w3MyyqQ=="], + + "@esbuild/freebsd-arm64": ["@esbuild/freebsd-arm64@0.27.7", "", { "os": "freebsd", "cpu": "arm64" }, "sha512-B48PqeCsEgOtzME2GbNM2roU29AMTuOIN91dsMO30t+Ydis3z/3Ngoj5hhnsOSSwNzS+6JppqWsuhTp6E82l2w=="], + + "@esbuild/freebsd-x64": ["@esbuild/freebsd-x64@0.27.7", "", { "os": "freebsd", "cpu": "x64" }, "sha512-jOBDK5XEjA4m5IJK3bpAQF9/Lelu/Z9ZcdhTRLf4cajlB+8VEhFFRjWgfy3M1O4rO2GQ/b2dLwCUGpiF/eATNQ=="], + + "@esbuild/linux-arm": ["@esbuild/linux-arm@0.27.7", "", { "os": "linux", "cpu": "arm" }, "sha512-RkT/YXYBTSULo3+af8Ib0ykH8u2MBh57o7q/DAs3lTJlyVQkgQvlrPTnjIzzRPQyavxtPtfg0EopvDyIt0j1rA=="], + + "@esbuild/linux-arm64": ["@esbuild/linux-arm64@0.27.7", "", { "os": "linux", "cpu": "arm64" }, "sha512-RZPHBoxXuNnPQO9rvjh5jdkRmVizktkT7TCDkDmQ0W2SwHInKCAV95GRuvdSvA7w4VMwfCjUiPwDi0ZO6Nfe9A=="], + + "@esbuild/linux-ia32": ["@esbuild/linux-ia32@0.27.7", "", { "os": "linux", "cpu": "ia32" }, "sha512-GA48aKNkyQDbd3KtkplYWT102C5sn/EZTY4XROkxONgruHPU72l+gW+FfF8tf2cFjeHaRbWpOYa/uRBz/Xq1Pg=="], + + "@esbuild/linux-loong64": ["@esbuild/linux-loong64@0.27.7", "", { "os": "linux", "cpu": "none" }, "sha512-a4POruNM2oWsD4WKvBSEKGIiWQF8fZOAsycHOt6JBpZ+JN2n2JH9WAv56SOyu9X5IqAjqSIPTaJkqN8F7XOQ5Q=="], + + "@esbuild/linux-mips64el": ["@esbuild/linux-mips64el@0.27.7", "", { "os": "linux", "cpu": "none" }, "sha512-KabT5I6StirGfIz0FMgl1I+R1H73Gp0ofL9A3nG3i/cYFJzKHhouBV5VWK1CSgKvVaG4q1RNpCTR2LuTVB3fIw=="], + + "@esbuild/linux-ppc64": ["@esbuild/linux-ppc64@0.27.7", "", { "os": "linux", "cpu": "ppc64" }, "sha512-gRsL4x6wsGHGRqhtI+ifpN/vpOFTQtnbsupUF5R5YTAg+y/lKelYR1hXbnBdzDjGbMYjVJLJTd2OFmMewAgwlQ=="], + + "@esbuild/linux-riscv64": ["@esbuild/linux-riscv64@0.27.7", "", { "os": "linux", "cpu": "none" }, "sha512-hL25LbxO1QOngGzu2U5xeXtxXcW+/GvMN3ejANqXkxZ/opySAZMrc+9LY/WyjAan41unrR3YrmtTsUpwT66InQ=="], + + "@esbuild/linux-s390x": ["@esbuild/linux-s390x@0.27.7", "", { "os": "linux", "cpu": "s390x" }, "sha512-2k8go8Ycu1Kb46vEelhu1vqEP+UeRVj2zY1pSuPdgvbd5ykAw82Lrro28vXUrRmzEsUV0NzCf54yARIK8r0fdw=="], + + "@esbuild/linux-x64": ["@esbuild/linux-x64@0.27.7", "", { "os": "linux", "cpu": "x64" }, "sha512-hzznmADPt+OmsYzw1EE33ccA+HPdIqiCRq7cQeL1Jlq2gb1+OyWBkMCrYGBJ+sxVzve2ZJEVeePbLM2iEIZSxA=="], + + "@esbuild/netbsd-arm64": ["@esbuild/netbsd-arm64@0.27.7", "", { "os": "none", "cpu": "arm64" }, "sha512-b6pqtrQdigZBwZxAn1UpazEisvwaIDvdbMbmrly7cDTMFnw/+3lVxxCTGOrkPVnsYIosJJXAsILG9XcQS+Yu6w=="], + + "@esbuild/netbsd-x64": ["@esbuild/netbsd-x64@0.27.7", "", { "os": "none", "cpu": "x64" }, "sha512-OfatkLojr6U+WN5EDYuoQhtM+1xco+/6FSzJJnuWiUw5eVcicbyK3dq5EeV/QHT1uy6GoDhGbFpprUiHUYggrw=="], + + "@esbuild/openbsd-arm64": ["@esbuild/openbsd-arm64@0.27.7", "", { "os": "openbsd", "cpu": "arm64" }, "sha512-AFuojMQTxAz75Fo8idVcqoQWEHIXFRbOc1TrVcFSgCZtQfSdc1RXgB3tjOn/krRHENUB4j00bfGjyl2mJrU37A=="], + + "@esbuild/openbsd-x64": ["@esbuild/openbsd-x64@0.27.7", "", { "os": "openbsd", "cpu": "x64" }, "sha512-+A1NJmfM8WNDv5CLVQYJ5PshuRm/4cI6WMZRg1by1GwPIQPCTs1GLEUHwiiQGT5zDdyLiRM/l1G0Pv54gvtKIg=="], + + "@esbuild/openharmony-arm64": ["@esbuild/openharmony-arm64@0.27.7", "", { "os": "none", "cpu": "arm64" }, "sha512-+KrvYb/C8zA9CU/g0sR6w2RBw7IGc5J2BPnc3dYc5VJxHCSF1yNMxTV5LQ7GuKteQXZtspjFbiuW5/dOj7H4Yw=="], + + "@esbuild/sunos-x64": ["@esbuild/sunos-x64@0.27.7", "", { "os": "sunos", "cpu": "x64" }, "sha512-ikktIhFBzQNt/QDyOL580ti9+5mL/YZeUPKU2ivGtGjdTYoqz6jObj6nOMfhASpS4GU4Q/Clh1QtxWAvcYKamA=="], + + "@esbuild/win32-arm64": ["@esbuild/win32-arm64@0.27.7", "", { "os": "win32", "cpu": "arm64" }, "sha512-7yRhbHvPqSpRUV7Q20VuDwbjW5kIMwTHpptuUzV+AA46kiPze5Z7qgt6CLCK3pWFrHeNfDd1VKgyP4O+ng17CA=="], + + "@esbuild/win32-ia32": ["@esbuild/win32-ia32@0.27.7", "", { "os": "win32", "cpu": "ia32" }, "sha512-SmwKXe6VHIyZYbBLJrhOoCJRB/Z1tckzmgTLfFYOfpMAx63BJEaL9ExI8x7v0oAO3Zh6D/Oi1gVxEYr5oUCFhw=="], + + "@esbuild/win32-x64": ["@esbuild/win32-x64@0.27.7", "", { "os": "win32", "cpu": "x64" }, "sha512-56hiAJPhwQ1R4i+21FVF7V8kSD5zZTdHcVuRFMW0hn753vVfQN8xlx4uOPT4xoGH0Z/oVATuR82AiqSTDIpaHg=="], + "@eslint-community/eslint-utils": ["@eslint-community/eslint-utils@4.9.1", "", { "dependencies": { "eslint-visitor-keys": "^3.4.3" }, "peerDependencies": { "eslint": "^6.0.0 || ^7.0.0 || >=8.0.0" } }, "sha512-phrYmNiYppR7znFEdqgfWHXR6NCkZEK7hwWDHZUjit/2/U0r6XvkDl0SYnoM51Hq7FhCGdLDT6zxCCOY1hexsQ=="], "@eslint-community/regexpp": ["@eslint-community/regexpp@4.12.2", "", {}, "sha512-EriSTlt5OC9/7SXkRSCAhfSxxoSUgBm33OH+IkwbdpgoqsSsUg7y3uh+IICI/Qg4BBWr3U2i39RpmycbxMq4ew=="], @@ -141,6 +194,8 @@ "deep-is": ["deep-is@0.1.4", "", {}, "sha512-oIPzksmTg4/MriiaYGO+okXDT7ztn/w3Eptv/+gSIdMdKsJo0u4CfYNFJPy+4SKMuCqGw2wxnA+URMg3t8a/bQ=="], + "esbuild": ["esbuild@0.27.7", "", { "optionalDependencies": { "@esbuild/aix-ppc64": "0.27.7", "@esbuild/android-arm": "0.27.7", "@esbuild/android-arm64": "0.27.7", "@esbuild/android-x64": "0.27.7", "@esbuild/darwin-arm64": "0.27.7", "@esbuild/darwin-x64": "0.27.7", "@esbuild/freebsd-arm64": "0.27.7", "@esbuild/freebsd-x64": "0.27.7", "@esbuild/linux-arm": "0.27.7", "@esbuild/linux-arm64": "0.27.7", "@esbuild/linux-ia32": "0.27.7", "@esbuild/linux-loong64": "0.27.7", "@esbuild/linux-mips64el": "0.27.7", "@esbuild/linux-ppc64": "0.27.7", "@esbuild/linux-riscv64": "0.27.7", "@esbuild/linux-s390x": "0.27.7", "@esbuild/linux-x64": "0.27.7", "@esbuild/netbsd-arm64": "0.27.7", "@esbuild/netbsd-x64": "0.27.7", "@esbuild/openbsd-arm64": "0.27.7", "@esbuild/openbsd-x64": "0.27.7", "@esbuild/openharmony-arm64": "0.27.7", "@esbuild/sunos-x64": "0.27.7", "@esbuild/win32-arm64": "0.27.7", "@esbuild/win32-ia32": "0.27.7", "@esbuild/win32-x64": "0.27.7" }, "bin": { "esbuild": "bin/esbuild" } }, "sha512-IxpibTjyVnmrIQo5aqNpCgoACA/dTKLTlhMHihVHhdkxKyPO1uBBthumT0rdHmcsk9uMonIWS0m4FljWzILh3w=="], + "escape-string-regexp": ["escape-string-regexp@4.0.0", "", {}, "sha512-TtpcNJ3XAzx3Gq8sWRzJaVajRs0uVxA2YAkdb1jm2YkPz4G6egUFAyA3n5vtEIZefPk5Wa4UXbKuS5fKkJWdgA=="], "eslint": ["eslint@10.2.1", "", { "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.2", "@eslint/config-array": "^0.23.5", "@eslint/config-helpers": "^0.5.5", "@eslint/core": "^1.2.1", "@eslint/plugin-kit": "^0.7.1", "@humanfs/node": "^0.16.6", "@humanwhocodes/module-importer": "^1.0.1", "@humanwhocodes/retry": "^0.4.2", "@types/estree": "^1.0.6", "ajv": "^6.14.0", "cross-spawn": "^7.0.6", "debug": "^4.3.2", "escape-string-regexp": "^4.0.0", "eslint-scope": "^9.1.2", "eslint-visitor-keys": "^5.0.1", "espree": "^11.2.0", "esquery": "^1.7.0", "esutils": "^2.0.2", "fast-deep-equal": "^3.1.3", "file-entry-cache": "^8.0.0", "find-up": "^5.0.0", "glob-parent": "^6.0.2", "ignore": "^5.2.0", "imurmurhash": "^0.1.4", "is-glob": "^4.0.0", "json-stable-stringify-without-jsonify": "^1.0.1", "minimatch": "^10.2.4", "natural-compare": "^1.4.0", "optionator": "^0.9.3" }, "peerDependencies": { "jiti": "*" }, "optionalPeers": ["jiti"], "bin": { "eslint": "bin/eslint.js" } }, "sha512-wiyGaKsDgqXvF40P8mDwiUp/KQjE1FdrIEJsM8PZ3XCiniTMXS3OHWWUe5FI5agoCnr8x4xPrTDZuxsBlNHl+Q=="], @@ -175,6 +230,10 @@ "flatted": ["flatted@3.4.2", "", {}, "sha512-PjDse7RzhcPkIJwy5t7KPWQSZ9cAbzQXcafsetQoD7sOJRQlGikNbx7yZp2OotDnJyrDcbyRq3Ttb18iYOqkxA=="], + "fsevents": ["fsevents@2.3.3", "", { "os": "darwin" }, "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw=="], + + "get-tsconfig": ["get-tsconfig@4.14.0", "", { "dependencies": { "resolve-pkg-maps": "^1.0.0" } }, "sha512-yTb+8DXzDREzgvYmh6s9vHsSVCHeC0G3PI5bEXNBHtmshPnO+S5O7qgLEOn0I5QvMy6kpZN8K1NKGyilLb93wA=="], + "glob-parent": ["glob-parent@6.0.2", "", { "dependencies": { "is-glob": "^4.0.3" } }, "sha512-XxwI8EOhVQgWp6iDL+3b0r86f4d6AX6zSU55HfB4ydCEuXLXc5FcYeOu+nnGftS4TEju/11rt4KJPTMgbfmv4A=="], "ignore": ["ignore@5.3.2", "", {}, "sha512-hsBTNUqQTDwkWtcdYI2i06Y/nUBEsNEDJKjWdigLvegy8kDuJAS8uRlpkkcQpyEXL0Z/pjDy5HBmMjRCJ2gq+g=="], @@ -225,6 +284,8 @@ "punycode": ["punycode@2.3.1", "", {}, "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg=="], + "resolve-pkg-maps": ["resolve-pkg-maps@1.0.0", "", {}, "sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw=="], + "semver": ["semver@7.7.4", "", { "bin": { "semver": "bin/semver.js" } }, "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA=="], "shebang-command": ["shebang-command@2.0.0", "", { "dependencies": { "shebang-regex": "^3.0.0" } }, "sha512-kHxr2zZpYtdmrN1qDjrrX/Z1rR1kG8Dx+gkpK1G4eXmvXswmcE1hTWBWYUzlraYw1/yZp6YuDY77YtvbN0dmDA=="], @@ -237,6 +298,8 @@ "ts-api-utils": ["ts-api-utils@2.5.0", "", { "peerDependencies": { "typescript": ">=4.8.4" } }, "sha512-OJ/ibxhPlqrMM0UiNHJ/0CKQkoKF243/AEmplt3qpRgkW8VG7IfOS41h7V8TjITqdByHzrjcS/2si+y4lIh8NA=="], + "tsx": ["tsx@4.21.0", "", { "dependencies": { "esbuild": "~0.27.0", "get-tsconfig": "^4.7.5" }, "optionalDependencies": { "fsevents": "~2.3.3" }, "bin": { "tsx": "dist/cli.mjs" } }, "sha512-5C1sg4USs1lfG0GFb2RLXsdpXqBSEhAaA/0kPL01wxzpMqLILNxIxIOKiILz+cdg/pLnOUxFYOR5yhHU666wbw=="], + "type-check": ["type-check@0.4.0", "", { "dependencies": { "prelude-ls": "^1.2.1" } }, "sha512-XleUoc9uwGXqjWwXaUTZAmzMcFZ5858QA2vvx1Ur5xIcixXIP+8LnFDgRplU30us6teqdlskFfu+ae4K79Ooew=="], "typescript": ["typescript@6.0.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-y2TvuxSZPDyQakkFRPZHKFm+KKVqIisdg9/CZwm9ftvKXLP8NRWj38/ODjNbr43SsoXqNuAisEf1GdCxqWcdBw=="], diff --git a/src/ndjson.ts b/src/ndjson.ts index 43f8a47..acc57ea 100644 --- a/src/ndjson.ts +++ b/src/ndjson.ts @@ -1,15 +1,23 @@ +import { Buffer } from "node:buffer"; +import { + Agent as HttpAgent, + request as httpRequest, + type ClientRequest, + type IncomingMessage, +} from "node:http"; +import { Agent as HttpsAgent, request as httpsRequest } from "node:https"; + export type PostNdjsonOptions = { url: string; apiKey: string; requests: AsyncIterable; onResponse: (response: Res, index: number) => void | Promise; + onRequestSent?: (line: string, index: number) => void; onWarning?: (message: string) => void; + /** Max rows per HTTP request. Default 1000. */ + batchSize?: number; }; -// Node's fetch RequestInit does not type `duplex` in lib.dom, but it is required -// when sending a streaming body. -type StreamingRequestInit = RequestInit & { duplex: "half" }; - // The server (Clojure line-seq) splits requests on raw \n, \r, or \r\n. If any // of those bytes escape into a serialized line, it gets cut mid-object and // returns a malformed-json error against the tail fragment. `JSON.stringify` @@ -18,99 +26,266 @@ type StreamingRequestInit = RequestInit & { duplex: "half" }; // the user can chase the upstream bug. const RAW_NEWLINE_PATTERN = /[\n\r]/g; +// Coalesce outgoing lines into larger socket writes to amortize syscall cost. +const CHUNK_FLUSH_BYTES = 65536; + +// Cap the rows-per-HTTP-POST so each request stays within one server-side DB +// transaction. The Metabase server batches inserts in groups of 2000 per DB +// transaction; sending more than that in one POST forces multiple transactions +// in a single request, during which the server stops reading body bytes long +// enough for Jetty's idle timeout to fire and drop the tail with EofException. +// 1000 leaves a comfortable margin below that threshold. +const DEFAULT_BATCH_SIZE = 1000; + +const DEFAULT_PORTS = { "http:": 80, "https:": 443 } as const; + +const httpKeepAlive = new HttpAgent({ keepAlive: true }); +const httpsKeepAlive = new HttpsAgent({ keepAlive: true }); + export async function postNdjson({ url, apiKey, requests, onResponse, + onRequestSent, onWarning, + batchSize = DEFAULT_BATCH_SIZE, }: PostNdjsonOptions): Promise { - const iterator = requests[Symbol.asyncIterator](); - const encoder = new TextEncoder(); - let sentIndex = 0; - - const body = new ReadableStream({ - async pull(controller) { - try { - const { value, done } = await iterator.next(); - if (done) { - controller.close(); - return; - } - const raw = JSON.stringify(value); - const safe = raw.includes("\n") || raw.includes("\r") - ? sanitizeRawNewlines(raw, sentIndex, onWarning) - : raw; - sentIndex += 1; - controller.enqueue(encoder.encode(safe + "\n")); - } catch (error) { - controller.error(error); - } - }, - async cancel(reason) { - if (typeof iterator.return === "function") { - await iterator.return(reason); - } - }, - }); + let globalIndex = 0; + + for await (const batch of batchAsyncIterable(requests, batchSize)) { + const batchOffset = globalIndex; + await postNdjsonBatch({ + url, + apiKey, + batch, + onResponse: (response, localIndex) => + onResponse(response, batchOffset + localIndex), + onRequestSent: onRequestSent + ? (line, localIndex) => onRequestSent(line, batchOffset + localIndex) + : undefined, + onWarning, + }); + globalIndex += batch.length; + } +} + +async function* batchAsyncIterable( + source: AsyncIterable, + size: number, +): AsyncGenerator { + let current: T[] = []; + for await (const item of source) { + current.push(item); + if (current.length >= size) { + yield current; + current = []; + } + } + if (current.length > 0) { + yield current; + } +} + +type PostBatchOptions = { + url: string; + apiKey: string; + batch: Req[]; + onResponse: (response: Res, index: number) => void | Promise; + onRequestSent?: (line: string, index: number) => void; + onWarning?: (message: string) => void; +}; - const init: StreamingRequestInit = { +async function postNdjsonBatch({ + url, + apiKey, + batch, + onResponse, + onRequestSent, + onWarning, +}: PostBatchOptions): Promise { + if (batch.length === 0) { + return; + } + + const parsed = new URL(url); + const makeRequest = parsed.protocol === "https:" ? httpsRequest : httpRequest; + const agent = parsed.protocol === "https:" ? httpsKeepAlive : httpKeepAlive; + const port = parsed.port + ? Number(parsed.port) + : DEFAULT_PORTS[parsed.protocol as keyof typeof DEFAULT_PORTS]; + + const req = makeRequest({ method: "POST", + hostname: parsed.hostname, + port, + path: (parsed.pathname || "/") + (parsed.search || ""), headers: { "Content-Type": "application/x-ndjson", "X-API-Key": apiKey, }, - body, - duplex: "half", - }; - const response = await fetch(url, init); + agent, + }); + + const responsePromise = new Promise((resolve, reject) => { + req.once("response", resolve); + req.once("error", reject); + }); - if (!response.ok) { - const text = await response.text().catch(() => ""); + const counters = { sent: 0 }; + const writePromise = writeBatch( + req, + batch, + counters, + onRequestSent, + onWarning, + ); + + let response: IncomingMessage; + try { + response = await responsePromise; + } catch (error) { + await writePromise.catch(() => {}); + throw error; + } + + const status = response.statusCode ?? 0; + if (status < 200 || status >= 300) { + const text = await readAllText(response); + await writePromise.catch(() => {}); + req.destroy(); throw new Error( - `POST ${url} failed: ${response.status} ${response.statusText} ${text}`.trim(), + `POST ${url} failed: ${status} ${response.statusMessage ?? ""} ${text}`.trim(), ); } - if (!response.body) { - throw new Error(`POST ${url} returned an empty body`); + + let received = 0; + try { + for await (const parsedLine of parseNdjsonStream(response)) { + await onResponse(parsedLine, received); + received += 1; + } + } catch (error) { + response.destroy(); + req.destroy(); + throw error; } - let index = 0; - for await (const parsed of parseNdjsonStream(response.body)) { - await onResponse(parsed, index); - index += 1; + await writePromise; + + if (received < counters.sent) { + throw new Error( + `POST ${url}: server acknowledged ${received} of ${counters.sent} sent rows — ${counters.sent - received} rows dropped by the server (likely a per-row error terminated the stream)`, + ); } } +async function writeBatch( + req: ClientRequest, + batch: Req[], + counters: { sent: number }, + onRequestSent: ((line: string, index: number) => void) | undefined, + onWarning: ((message: string) => void) | undefined, +): Promise { + const encoder = new TextEncoder(); + let pendingBuffer = ""; + + try { + for (const value of batch) { + const raw = JSON.stringify(value); + const safe = + raw.includes("\n") || raw.includes("\r") + ? sanitizeRawNewlines(raw, counters.sent, onWarning) + : raw; + onRequestSent?.(safe, counters.sent); + counters.sent += 1; + pendingBuffer += safe + "\n"; + if (pendingBuffer.length >= CHUNK_FLUSH_BYTES) { + await writeWithBackpressure(req, encoder.encode(pendingBuffer)); + pendingBuffer = ""; + } + } + if (pendingBuffer.length > 0) { + await writeWithCallback(req, encoder.encode(pendingBuffer)); + pendingBuffer = ""; + } + await new Promise((resolve, reject) => { + req.once("finish", resolve); + req.once("error", reject); + req.end(); + }); + } catch (error) { + req.destroy(error instanceof Error ? error : new Error(String(error))); + throw error; + } +} + +async function writeWithBackpressure( + req: ClientRequest, + chunk: Uint8Array, +): Promise { + if (req.write(chunk)) { + return; + } + await new Promise((resolve, reject) => { + const onDrain = (): void => { + req.off("error", onError); + resolve(); + }; + const onError = (error: Error): void => { + req.off("drain", onDrain); + reject(error); + }; + req.once("drain", onDrain); + req.once("error", onError); + }); +} + +async function writeWithCallback( + req: ClientRequest, + chunk: Uint8Array, +): Promise { + await new Promise((resolve, reject) => { + req.write(chunk, (error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); +} + +async function readAllText(stream: AsyncIterable): Promise { + const chunks: Uint8Array[] = []; + try { + for await (const chunk of stream) { + chunks.push(chunk); + } + } catch { + // Best-effort read; if the socket drops mid-response, return what we have. + } + return Buffer.concat(chunks).toString("utf8"); +} + export async function* parseNdjsonStream( - stream: ReadableStream, + stream: AsyncIterable, ): AsyncGenerator { - const reader = stream.getReader(); const decoder = new TextDecoder(); // Slice once per chunk (not per line) to keep parsing O(n) on large responses. let pending = ""; - try { - while (true) { - const { value, done } = await reader.read(); - if (value) { - const chunk = pending + decoder.decode(value, { stream: true }); - const lastNewline = chunk.lastIndexOf("\n"); - if (lastNewline === -1) { - pending = chunk; - } else { - for (const line of splitLines(chunk.slice(0, lastNewline))) { - yield JSON.parse(line) as T; - } - pending = chunk.slice(lastNewline + 1); - } - } - if (done) { - break; - } + for await (const chunk of stream) { + const buffer = pending + decoder.decode(chunk, { stream: true }); + const lastNewline = buffer.lastIndexOf("\n"); + if (lastNewline === -1) { + pending = buffer; + continue; + } + for (const line of splitLines(buffer.slice(0, lastNewline))) { + yield JSON.parse(line) as T; } - } finally { - reader.releaseLock(); + pending = buffer.slice(lastNewline + 1); } const trailing = pending.trim(); diff --git a/src/upload-metadata.test.ts b/src/upload-metadata.test.ts index e92f9a5..21c3494 100644 --- a/src/upload-metadata.test.ts +++ b/src/upload-metadata.test.ts @@ -16,6 +16,7 @@ type RecordedCall = { path: string; contentType: string; transferEncoding: string | null; + contentLength: string | null; apiKey: string | null; lines: unknown[]; }; @@ -86,12 +87,17 @@ function startMockServer(): MockServerControl { const path = url.pathname; const contentType = request.headers.get("Content-Type") ?? ""; const transferEncoding = request.headers.get("Transfer-Encoding"); + const contentLength = request.headers.get("Content-Length"); const apiKey = request.headers.get("X-API-Key"); - if (!request.body) { - return new Response("missing body", { status: 400 }); - } - const lines = await readNdjsonLines(request.body); - calls.push({ path, contentType, transferEncoding, apiKey, lines }); + const lines = request.body ? await readNdjsonLines(request.body) : []; + calls.push({ + path, + contentType, + transferEncoding, + contentLength, + apiKey, + lines, + }); switch (path) { case "/api/database/metadata/databases": { @@ -327,15 +333,20 @@ describe("uploadMetadata", () => { expect(warnings.some((w) => w.includes("Field 1"))).toBe(true); }); - it("streams the request body with chunked transfer encoding", async () => { + it("delivers a framed request body to the server", async () => { await uploadMetadata({ metadataFile: EXAMPLE_METADATA, instanceUrl: mock.baseUrl, apiKey: "k", onWarning: () => {}, }); + // node:http picks Transfer-Encoding: chunked for unknown-length bodies and + // Content-Length for bodies that fit in a single write buffer. Either is + // fine — the point is that the bytes made it to the server intact. for (const call of mock.calls) { - expect(call.transferEncoding).toBe("chunked"); + const hasFraming = + call.transferEncoding === "chunked" || call.contentLength !== null; + expect(hasFraming).toBe(true); } }); diff --git a/src/upload-metadata.ts b/src/upload-metadata.ts index 509fe63..1e3c8fb 100644 --- a/src/upload-metadata.ts +++ b/src/upload-metadata.ts @@ -1,3 +1,6 @@ +import { appendFileSync, mkdirSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; + import { postNdjson } from "./ndjson.js"; import { streamJsonElements } from "./stream-json.js"; @@ -22,6 +25,8 @@ export type UploadMetadataOptions = { instanceUrl: string; apiKey: string; onWarning?: (message: string) => void; + /** When set, dumps every outgoing NDJSON line to `/.ndjson` for debugging. */ + requestDumpFolder?: string; }; export type UploadStepStats = { @@ -119,19 +124,20 @@ type FieldValuesRequest = { type IdMapResponse = | { old_id: number; new_id: number } | { old_id: number; existing_id: number } - | { old_id?: number; error: string; detail?: string }; + | { old_id?: number; line?: number; error: string; detail?: string }; type FieldFinalizeResponse = | { id: number; ok: true } - | { id?: number; error: string; detail?: string }; + | { id?: number; line?: number; error: string; detail?: string }; type FieldValuesResponse = | { field_id: number; created: true } | { field_id: number; updated: true } - | { field_id?: number; error: string; detail?: string }; + | { field_id?: number; line?: number; error: string; detail?: string }; type RecordIdMapResponseOptions = { response: IdMapResponse; + responseIndex: number; stats: UploadStepStats; idMap: Map; label: string; @@ -154,11 +160,29 @@ function emptyFieldInsertStats(): UploadFieldInsertStats { function formatError( label: string, id: number | undefined, - response: { error?: string; detail?: string }, + response: { error?: string; detail?: string; line?: number }, + requestIndex?: number, ): string { - const idSuffix = id === undefined ? "" : ` ${id}`; + const locator = buildErrorLocator(id, response.line, requestIndex); const detailSuffix = response.detail ? ` — ${response.detail}` : ""; - return `${label}${idSuffix}: ${response.error ?? "unknown error"}${detailSuffix}`; + return `${label}${locator}: ${response.error ?? "unknown error"}${detailSuffix}`; +} + +function buildErrorLocator( + id: number | undefined, + serverLine: number | undefined, + requestIndex: number | undefined, +): string { + if (id !== undefined) { + return ` ${id}`; + } + if (serverLine !== undefined) { + return ` (source line #${serverLine})`; + } + if (requestIndex !== undefined) { + return ` (response #${requestIndex})`; + } + return ""; } function pickDatabaseRequest(db: DatabaseEntry): DatabaseRequest { @@ -229,9 +253,29 @@ export async function uploadMetadata({ instanceUrl, apiKey, onWarning, + requestDumpFolder, }: UploadMetadataOptions): Promise { const warn = onWarning ?? ((message: string) => console.warn(message)); + if (requestDumpFolder !== undefined) { + mkdirSync(requestDumpFolder, { recursive: true }); + } + + const dumpFor = ( + endpoint: "databases" | "tables" | "fields" | "fields-finalize" | "field-values", + ): ((line: string) => void) | undefined => { + if (requestDumpFolder === undefined) { + return undefined; + } + const file = join(requestDumpFolder, `${endpoint}.ndjson`); + // Truncate on first call so each run starts fresh — avoids cross-run noise + // when bisecting a malformed line. + writeFileSync(file, ""); + return (line: string) => { + appendFileSync(file, line + "\n"); + }; + }; + const databaseIdMap = new Map(); const tableIdMap = new Map(); const fieldIdMap = new Map(); @@ -247,6 +291,7 @@ export async function uploadMetadata({ function recordIdMapResponse({ response, + responseIndex, stats, idMap, label, @@ -266,7 +311,7 @@ export async function uploadMetadata({ return; } stats.errors += 1; - warn(formatError(label, response.old_id, response)); + warn(formatError(label, response.old_id, response, responseIndex)); } async function* remapForeignKey(opts: { @@ -347,9 +392,11 @@ export async function uploadMetadata({ apiKey, requests: streamDatabaseRequests(), onWarning: warn, - onResponse: (response) => + onRequestSent: dumpFor("databases"), + onResponse: (response, responseIndex) => recordIdMapResponse({ response, + responseIndex, stats: result.databases, idMap: databaseIdMap, label: "Database", @@ -360,6 +407,7 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.tables), apiKey, onWarning: warn, + onRequestSent: dumpFor("tables"), requests: remapForeignKey({ jsonPath: JSON_PATHS.tables, sourceFile: metadataFile, @@ -369,9 +417,10 @@ export async function uploadMetadata({ describeSkip: (table, oldDbId) => `Skipping table ${table.id} (${table.name}): source db_id ${oldDbId} did not map to a target database`, }), - onResponse: (response) => + onResponse: (response, responseIndex) => recordIdMapResponse({ response, + responseIndex, stats: result.tables, idMap: tableIdMap, label: "Table", @@ -382,6 +431,7 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.fields), apiKey, onWarning: warn, + onRequestSent: dumpFor("fields"), requests: remapForeignKey({ jsonPath: JSON_PATHS.fields, sourceFile: metadataFile, @@ -391,9 +441,10 @@ export async function uploadMetadata({ describeSkip: (field, oldTableId) => `Skipping field ${field.id} (${field.name}): source table_id ${oldTableId} did not map to a target table`, }), - onResponse: (response) => + onResponse: (response, responseIndex) => recordIdMapResponse({ response, + responseIndex, stats: result.fieldsInsert, idMap: fieldIdMap, label: "Field", @@ -411,14 +462,15 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.fieldsFinalize), apiKey, onWarning: warn, + onRequestSent: dumpFor("fields-finalize"), requests: fieldFinalizeRequests(), - onResponse: (response) => { + onResponse: (response, responseIndex) => { if ("ok" in response) { result.fieldsFinalize.mapped += 1; return; } result.fieldsFinalize.errors += 1; - warn(formatError("Finalize", response.id, response)); + warn(formatError("Finalize", response.id, response, responseIndex)); }, }); @@ -427,6 +479,7 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.fieldValues), apiKey, onWarning: warn, + onRequestSent: dumpFor("field-values"), requests: remapForeignKey({ jsonPath: JSON_PATHS.fieldValues, sourceFile: fieldValuesFile, @@ -436,10 +489,12 @@ export async function uploadMetadata({ describeSkip: (entry, oldId) => `Skipping field values for field_id ${oldId}: no mapping from source field to target`, }), - onResponse: (response) => { + onResponse: (response, responseIndex) => { if ("error" in response) { result.fieldValues.errors += 1; - warn(formatError("Field values", response.field_id, response)); + warn( + formatError("Field values", response.field_id, response, responseIndex), + ); return; } result.fieldValues.mapped += 1; From b8a4dd29d7d6d71b5127c2dc418d0447c95c428b Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 15:18:24 -0400 Subject: [PATCH 6/9] finally --- .gitignore | 2 + README.md | 4 +- bin/cli.ts | 5 - src/ndjson.ts | 224 +++++++++-------------------------------- src/upload-metadata.ts | 30 ------ 5 files changed, 54 insertions(+), 211 deletions(-) diff --git a/.gitignore b/.gitignore index 733217c..0ab6dac 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ Desktop.ini node_modules/ dist/ *.tgz +.metabase/ +upload.log diff --git a/README.md b/README.md index 3d17130..e6c4fa0 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,9 @@ With no flags, it reads `.metabase/metadata.json` and `.metabase/field-values.js | `--no-field-values` | — | Skip uploading field values | | `--api-key ` | `METABASE_API_KEY` env var | API key | -Exits non-zero if any step reports row-level errors so CI can catch partial imports. The command is designed to stream 5 GB+ files without loading them into memory. +The source JSON files are streamed through `@streamparser/json-node` — they are never fully loaded into memory, so 100 GB+ exports upload fine. Rows are sent in batches of 2000 per HTTP POST (matching the server's per-transaction batch size) with HTTP keep-alive, so each request is one clean server-side transaction. + +Exits non-zero if any step reports row-level errors, or if the server acknowledges fewer rows than were sent in a batch (so CI can catch partial imports). ### Extracting the spec diff --git a/bin/cli.ts b/bin/cli.ts index 3c15cfd..b74361c 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -21,7 +21,6 @@ type ParsedValues = { "no-field-values"?: boolean; "no-extract"?: boolean; "api-key"?: string; - "debug-dump"?: string; }; const DEFAULT_PATHS = { @@ -51,8 +50,6 @@ Commands: --field-values Override field-values.json path (default: .metabase/field-values.json) --no-field-values Skip uploading field values --api-key API key. Defaults to METABASE_API_KEY env var. - --debug-dump Write every outgoing NDJSON line to /.ndjson - (useful for debugging malformed-json server errors). download-metadata Stream metadata + field values from a Metabase instance into .metabase/ and @@ -79,7 +76,6 @@ function parseArguments() { "no-field-values": { type: "boolean", default: false }, "no-extract": { type: "boolean", default: false }, "api-key": { type: "string" }, - "debug-dump": { type: "string" }, }, }); } @@ -154,7 +150,6 @@ async function handleUploadMetadata( fieldValuesFile, instanceUrl, apiKey, - requestDumpFolder: values["debug-dump"], }); console.log(formatUploadReport(stats, Boolean(fieldValuesFile))); process.exit(hasAnyErrors(stats) ? 1 : 0); diff --git a/src/ndjson.ts b/src/ndjson.ts index acc57ea..2339d2a 100644 --- a/src/ndjson.ts +++ b/src/ndjson.ts @@ -1,53 +1,34 @@ -import { Buffer } from "node:buffer"; -import { - Agent as HttpAgent, - request as httpRequest, - type ClientRequest, - type IncomingMessage, -} from "node:http"; -import { Agent as HttpsAgent, request as httpsRequest } from "node:https"; - export type PostNdjsonOptions = { url: string; apiKey: string; requests: AsyncIterable; onResponse: (response: Res, index: number) => void | Promise; - onRequestSent?: (line: string, index: number) => void; onWarning?: (message: string) => void; - /** Max rows per HTTP request. Default 1000. */ + /** Max rows per HTTP request. Default 2000. */ batchSize?: number; }; -// The server (Clojure line-seq) splits requests on raw \n, \r, or \r\n. If any -// of those bytes escape into a serialized line, it gets cut mid-object and -// returns a malformed-json error against the tail fragment. `JSON.stringify` -// escapes control chars in strings, so a positive hit here means a value's -// toJSON() produced raw control chars — we sanitize defensively and warn so -// the user can chase the upstream bug. +// JSON.stringify escapes control characters in string values, so the +// serialized output should never contain a raw \n or \r. If it does (e.g. a +// value's custom toJSON returned raw control chars), the server's line-seq +// would split the line mid-object. Defensive: escape them before sending and +// warn the caller so the upstream bug can be chased. const RAW_NEWLINE_PATTERN = /[\n\r]/g; -// Coalesce outgoing lines into larger socket writes to amortize syscall cost. -const CHUNK_FLUSH_BYTES = 65536; - -// Cap the rows-per-HTTP-POST so each request stays within one server-side DB -// transaction. The Metabase server batches inserts in groups of 2000 per DB -// transaction; sending more than that in one POST forces multiple transactions -// in a single request, during which the server stops reading body bytes long -// enough for Jetty's idle timeout to fire and drop the tail with EofException. -// 1000 leaves a comfortable margin below that threshold. -const DEFAULT_BATCH_SIZE = 1000; - -const DEFAULT_PORTS = { "http:": 80, "https:": 443 } as const; - -const httpKeepAlive = new HttpAgent({ keepAlive: true }); -const httpsKeepAlive = new HttpsAgent({ keepAlive: true }); +// Cap rows-per-HTTP-POST so each request stays within one server-side DB +// transaction. The Metabase NDJSON endpoints partition inserts in groups of +// 2000 per transaction; sending more than that per POST forces multiple +// transactions inside a single request, during which the server stops reading +// body bytes long enough for Jetty's idle timeout to drop the tail. Matching +// the server's 2000 keeps every POST to exactly one transaction with minimum +// round-trips. +const DEFAULT_BATCH_SIZE = 2000; export async function postNdjson({ url, apiKey, requests, onResponse, - onRequestSent, onWarning, batchSize = DEFAULT_BATCH_SIZE, }: PostNdjsonOptions): Promise { @@ -61,9 +42,6 @@ export async function postNdjson({ batch, onResponse: (response, localIndex) => onResponse(response, batchOffset + localIndex), - onRequestSent: onRequestSent - ? (line, localIndex) => onRequestSent(line, batchOffset + localIndex) - : undefined, onWarning, }); globalIndex += batch.length; @@ -92,7 +70,6 @@ type PostBatchOptions = { apiKey: string; batch: Req[]; onResponse: (response: Res, index: number) => void | Promise; - onRequestSent?: (line: string, index: number) => void; onWarning?: (message: string) => void; }; @@ -101,171 +78,72 @@ async function postNdjsonBatch({ apiKey, batch, onResponse, - onRequestSent, onWarning, }: PostBatchOptions): Promise { if (batch.length === 0) { return; } - const parsed = new URL(url); - const makeRequest = parsed.protocol === "https:" ? httpsRequest : httpRequest; - const agent = parsed.protocol === "https:" ? httpsKeepAlive : httpKeepAlive; - const port = parsed.port - ? Number(parsed.port) - : DEFAULT_PORTS[parsed.protocol as keyof typeof DEFAULT_PORTS]; + const body = serializeBatch(batch, onWarning); - const req = makeRequest({ + const response = await fetch(url, { method: "POST", - hostname: parsed.hostname, - port, - path: (parsed.pathname || "/") + (parsed.search || ""), headers: { "Content-Type": "application/x-ndjson", "X-API-Key": apiKey, }, - agent, + body, }); - const responsePromise = new Promise((resolve, reject) => { - req.once("response", resolve); - req.once("error", reject); - }); - - const counters = { sent: 0 }; - const writePromise = writeBatch( - req, - batch, - counters, - onRequestSent, - onWarning, - ); - - let response: IncomingMessage; - try { - response = await responsePromise; - } catch (error) { - await writePromise.catch(() => {}); - throw error; - } - - const status = response.statusCode ?? 0; - if (status < 200 || status >= 300) { - const text = await readAllText(response); - await writePromise.catch(() => {}); - req.destroy(); + if (!response.ok) { + const text = await response.text().catch(() => ""); throw new Error( - `POST ${url} failed: ${status} ${response.statusMessage ?? ""} ${text}`.trim(), + `POST ${url} failed: ${response.status} ${response.statusText} ${text}`.trim(), ); } + if (!response.body) { + throw new Error(`POST ${url} returned an empty body`); + } let received = 0; - try { - for await (const parsedLine of parseNdjsonStream(response)) { - await onResponse(parsedLine, received); - received += 1; - } - } catch (error) { - response.destroy(); - req.destroy(); - throw error; + for await (const parsedLine of parseNdjsonStream(response.body)) { + await onResponse(parsedLine, received); + received += 1; } - await writePromise; - - if (received < counters.sent) { + if (received < batch.length) { throw new Error( - `POST ${url}: server acknowledged ${received} of ${counters.sent} sent rows — ${counters.sent - received} rows dropped by the server (likely a per-row error terminated the stream)`, + `POST ${url}: server acknowledged ${received} of ${batch.length} sent rows — ${batch.length - received} rows dropped by the server (likely a per-row error terminated the stream)`, ); } } -async function writeBatch( - req: ClientRequest, +function serializeBatch( batch: Req[], - counters: { sent: number }, - onRequestSent: ((line: string, index: number) => void) | undefined, onWarning: ((message: string) => void) | undefined, -): Promise { +): Uint8Array { const encoder = new TextEncoder(); - let pendingBuffer = ""; - - try { - for (const value of batch) { - const raw = JSON.stringify(value); - const safe = - raw.includes("\n") || raw.includes("\r") - ? sanitizeRawNewlines(raw, counters.sent, onWarning) - : raw; - onRequestSent?.(safe, counters.sent); - counters.sent += 1; - pendingBuffer += safe + "\n"; - if (pendingBuffer.length >= CHUNK_FLUSH_BYTES) { - await writeWithBackpressure(req, encoder.encode(pendingBuffer)); - pendingBuffer = ""; - } - } - if (pendingBuffer.length > 0) { - await writeWithCallback(req, encoder.encode(pendingBuffer)); - pendingBuffer = ""; - } - await new Promise((resolve, reject) => { - req.once("finish", resolve); - req.once("error", reject); - req.end(); - }); - } catch (error) { - req.destroy(error instanceof Error ? error : new Error(String(error))); - throw error; + const chunks: Uint8Array[] = []; + let totalBytes = 0; + + for (let index = 0; index < batch.length; index += 1) { + const raw = JSON.stringify(batch[index]); + const safe = + raw.includes("\n") || raw.includes("\r") + ? sanitizeRawNewlines(raw, index, onWarning) + : raw; + const encoded = encoder.encode(safe + "\n"); + chunks.push(encoded); + totalBytes += encoded.length; } -} -async function writeWithBackpressure( - req: ClientRequest, - chunk: Uint8Array, -): Promise { - if (req.write(chunk)) { - return; + const body = new Uint8Array(totalBytes); + let offset = 0; + for (const chunk of chunks) { + body.set(chunk, offset); + offset += chunk.length; } - await new Promise((resolve, reject) => { - const onDrain = (): void => { - req.off("error", onError); - resolve(); - }; - const onError = (error: Error): void => { - req.off("drain", onDrain); - reject(error); - }; - req.once("drain", onDrain); - req.once("error", onError); - }); -} - -async function writeWithCallback( - req: ClientRequest, - chunk: Uint8Array, -): Promise { - await new Promise((resolve, reject) => { - req.write(chunk, (error) => { - if (error) { - reject(error); - return; - } - resolve(); - }); - }); -} - -async function readAllText(stream: AsyncIterable): Promise { - const chunks: Uint8Array[] = []; - try { - for await (const chunk of stream) { - chunks.push(chunk); - } - } catch { - // Best-effort read; if the socket drops mid-response, return what we have. - } - return Buffer.concat(chunks).toString("utf8"); + return body; } export async function* parseNdjsonStream( @@ -300,12 +178,8 @@ function sanitizeRawNewlines( onWarning?: (message: string) => void, ): string { const firstOffset = raw.search(RAW_NEWLINE_PATTERN); - const context = raw.slice( - Math.max(0, firstOffset - 40), - Math.min(raw.length, firstOffset + 40), - ); onWarning?.( - `Request #${index} had a raw \\n or \\r in its serialized JSON (first at offset ${firstOffset}); sanitizing. Context: …${context}…`, + `Request #${index} had a raw \\n or \\r in its serialized JSON (offset ${firstOffset}); escaping before sending.`, ); return raw.replace(RAW_NEWLINE_PATTERN, (char) => char === "\n" ? "\\n" : "\\r", diff --git a/src/upload-metadata.ts b/src/upload-metadata.ts index 1e3c8fb..3b7ac73 100644 --- a/src/upload-metadata.ts +++ b/src/upload-metadata.ts @@ -1,6 +1,3 @@ -import { appendFileSync, mkdirSync, writeFileSync } from "node:fs"; -import { join } from "node:path"; - import { postNdjson } from "./ndjson.js"; import { streamJsonElements } from "./stream-json.js"; @@ -25,8 +22,6 @@ export type UploadMetadataOptions = { instanceUrl: string; apiKey: string; onWarning?: (message: string) => void; - /** When set, dumps every outgoing NDJSON line to `/.ndjson` for debugging. */ - requestDumpFolder?: string; }; export type UploadStepStats = { @@ -253,29 +248,9 @@ export async function uploadMetadata({ instanceUrl, apiKey, onWarning, - requestDumpFolder, }: UploadMetadataOptions): Promise { const warn = onWarning ?? ((message: string) => console.warn(message)); - if (requestDumpFolder !== undefined) { - mkdirSync(requestDumpFolder, { recursive: true }); - } - - const dumpFor = ( - endpoint: "databases" | "tables" | "fields" | "fields-finalize" | "field-values", - ): ((line: string) => void) | undefined => { - if (requestDumpFolder === undefined) { - return undefined; - } - const file = join(requestDumpFolder, `${endpoint}.ndjson`); - // Truncate on first call so each run starts fresh — avoids cross-run noise - // when bisecting a malformed line. - writeFileSync(file, ""); - return (line: string) => { - appendFileSync(file, line + "\n"); - }; - }; - const databaseIdMap = new Map(); const tableIdMap = new Map(); const fieldIdMap = new Map(); @@ -392,7 +367,6 @@ export async function uploadMetadata({ apiKey, requests: streamDatabaseRequests(), onWarning: warn, - onRequestSent: dumpFor("databases"), onResponse: (response, responseIndex) => recordIdMapResponse({ response, @@ -407,7 +381,6 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.tables), apiKey, onWarning: warn, - onRequestSent: dumpFor("tables"), requests: remapForeignKey({ jsonPath: JSON_PATHS.tables, sourceFile: metadataFile, @@ -431,7 +404,6 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.fields), apiKey, onWarning: warn, - onRequestSent: dumpFor("fields"), requests: remapForeignKey({ jsonPath: JSON_PATHS.fields, sourceFile: metadataFile, @@ -462,7 +434,6 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.fieldsFinalize), apiKey, onWarning: warn, - onRequestSent: dumpFor("fields-finalize"), requests: fieldFinalizeRequests(), onResponse: (response, responseIndex) => { if ("ok" in response) { @@ -479,7 +450,6 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.fieldValues), apiKey, onWarning: warn, - onRequestSent: dumpFor("field-values"), requests: remapForeignKey({ jsonPath: JSON_PATHS.fieldValues, sourceFile: fieldValuesFile, From 976bee2ebdb86070342a144c2973025f86abb140 Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 15:23:09 -0400 Subject: [PATCH 7/9] final --- src/download-metadata.test.ts | 8 +--- src/ndjson.ts | 84 ++++------------------------------- src/upload-metadata.test.ts | 8 +--- src/upload-metadata.ts | 12 ++--- 4 files changed, 18 insertions(+), 94 deletions(-) diff --git a/src/download-metadata.test.ts b/src/download-metadata.test.ts index c154831..2d8774a 100644 --- a/src/download-metadata.test.ts +++ b/src/download-metadata.test.ts @@ -1,11 +1,5 @@ import { afterEach, beforeEach, describe, expect, it } from "bun:test"; -import { - existsSync, - mkdtempSync, - readFileSync, - rmSync, - statSync, -} from "fs"; +import { existsSync, mkdtempSync, readFileSync, rmSync, statSync } from "fs"; import { tmpdir } from "os"; import { join, resolve } from "path"; diff --git a/src/ndjson.ts b/src/ndjson.ts index 2339d2a..5da7e4c 100644 --- a/src/ndjson.ts +++ b/src/ndjson.ts @@ -3,18 +3,10 @@ export type PostNdjsonOptions = { apiKey: string; requests: AsyncIterable; onResponse: (response: Res, index: number) => void | Promise; - onWarning?: (message: string) => void; /** Max rows per HTTP request. Default 2000. */ batchSize?: number; }; -// JSON.stringify escapes control characters in string values, so the -// serialized output should never contain a raw \n or \r. If it does (e.g. a -// value's custom toJSON returned raw control chars), the server's line-seq -// would split the line mid-object. Defensive: escape them before sending and -// warn the caller so the upstream bug can be chased. -const RAW_NEWLINE_PATTERN = /[\n\r]/g; - // Cap rows-per-HTTP-POST so each request stays within one server-side DB // transaction. The Metabase NDJSON endpoints partition inserts in groups of // 2000 per transaction; sending more than that per POST forces multiple @@ -29,20 +21,18 @@ export async function postNdjson({ apiKey, requests, onResponse, - onWarning, batchSize = DEFAULT_BATCH_SIZE, }: PostNdjsonOptions): Promise { let globalIndex = 0; for await (const batch of batchAsyncIterable(requests, batchSize)) { const batchOffset = globalIndex; - await postNdjsonBatch({ + await postNdjsonBatch({ url, apiKey, batch, onResponse: (response, localIndex) => onResponse(response, batchOffset + localIndex), - onWarning, }); globalIndex += batch.length; } @@ -70,7 +60,6 @@ type PostBatchOptions = { apiKey: string; batch: Req[]; onResponse: (response: Res, index: number) => void | Promise; - onWarning?: (message: string) => void; }; async function postNdjsonBatch({ @@ -78,13 +67,14 @@ async function postNdjsonBatch({ apiKey, batch, onResponse, - onWarning, }: PostBatchOptions): Promise { if (batch.length === 0) { return; } - const body = serializeBatch(batch, onWarning); + const body = new TextEncoder().encode( + batch.map((value) => JSON.stringify(value)).join("\n") + "\n", + ); const response = await fetch(url, { method: "POST", @@ -118,39 +108,10 @@ async function postNdjsonBatch({ } } -function serializeBatch( - batch: Req[], - onWarning: ((message: string) => void) | undefined, -): Uint8Array { - const encoder = new TextEncoder(); - const chunks: Uint8Array[] = []; - let totalBytes = 0; - - for (let index = 0; index < batch.length; index += 1) { - const raw = JSON.stringify(batch[index]); - const safe = - raw.includes("\n") || raw.includes("\r") - ? sanitizeRawNewlines(raw, index, onWarning) - : raw; - const encoded = encoder.encode(safe + "\n"); - chunks.push(encoded); - totalBytes += encoded.length; - } - - const body = new Uint8Array(totalBytes); - let offset = 0; - for (const chunk of chunks) { - body.set(chunk, offset); - offset += chunk.length; - } - return body; -} - export async function* parseNdjsonStream( stream: AsyncIterable, ): AsyncGenerator { const decoder = new TextDecoder(); - // Slice once per chunk (not per line) to keep parsing O(n) on large responses. let pending = ""; for await (const chunk of stream) { @@ -160,8 +121,11 @@ export async function* parseNdjsonStream( pending = buffer; continue; } - for (const line of splitLines(buffer.slice(0, lastNewline))) { - yield JSON.parse(line) as T; + for (const line of buffer.slice(0, lastNewline).split("\n")) { + const trimmed = line.trim(); + if (trimmed.length > 0) { + yield JSON.parse(trimmed) as T; + } } pending = buffer.slice(lastNewline + 1); } @@ -171,33 +135,3 @@ export async function* parseNdjsonStream( yield JSON.parse(trailing) as T; } } - -function sanitizeRawNewlines( - raw: string, - index: number, - onWarning?: (message: string) => void, -): string { - const firstOffset = raw.search(RAW_NEWLINE_PATTERN); - onWarning?.( - `Request #${index} had a raw \\n or \\r in its serialized JSON (offset ${firstOffset}); escaping before sending.`, - ); - return raw.replace(RAW_NEWLINE_PATTERN, (char) => - char === "\n" ? "\\n" : "\\r", - ); -} - -function* splitLines(block: string): Generator { - let start = 0; - while (start <= block.length) { - const newlineIndex = block.indexOf("\n", start); - const end = newlineIndex === -1 ? block.length : newlineIndex; - const line = block.slice(start, end).trim(); - if (line.length > 0) { - yield line; - } - if (newlineIndex === -1) { - return; - } - start = newlineIndex + 1; - } -} diff --git a/src/upload-metadata.test.ts b/src/upload-metadata.test.ts index 21c3494..d9f54ee 100644 --- a/src/upload-metadata.test.ts +++ b/src/upload-metadata.test.ts @@ -51,17 +51,13 @@ async function readNdjsonLines( return lines; } -function ndjsonStreamResponse( - responses: AsyncIterable, -): Response { +function ndjsonStreamResponse(responses: AsyncIterable): Response { const encoder = new TextEncoder(); const body = new ReadableStream({ async start(controller) { try { for await (const response of responses) { - controller.enqueue( - encoder.encode(JSON.stringify(response) + "\n"), - ); + controller.enqueue(encoder.encode(JSON.stringify(response) + "\n")); } controller.close(); } catch (error) { diff --git a/src/upload-metadata.ts b/src/upload-metadata.ts index 3b7ac73..acb0542 100644 --- a/src/upload-metadata.ts +++ b/src/upload-metadata.ts @@ -366,7 +366,6 @@ export async function uploadMetadata({ url: joinUrl(instanceUrl, API_PATHS.databases), apiKey, requests: streamDatabaseRequests(), - onWarning: warn, onResponse: (response, responseIndex) => recordIdMapResponse({ response, @@ -380,7 +379,6 @@ export async function uploadMetadata({ await postNdjson({ url: joinUrl(instanceUrl, API_PATHS.tables), apiKey, - onWarning: warn, requests: remapForeignKey({ jsonPath: JSON_PATHS.tables, sourceFile: metadataFile, @@ -403,7 +401,6 @@ export async function uploadMetadata({ await postNdjson({ url: joinUrl(instanceUrl, API_PATHS.fields), apiKey, - onWarning: warn, requests: remapForeignKey({ jsonPath: JSON_PATHS.fields, sourceFile: metadataFile, @@ -433,7 +430,6 @@ export async function uploadMetadata({ const finalizePass = postNdjson({ url: joinUrl(instanceUrl, API_PATHS.fieldsFinalize), apiKey, - onWarning: warn, requests: fieldFinalizeRequests(), onResponse: (response, responseIndex) => { if ("ok" in response) { @@ -449,7 +445,6 @@ export async function uploadMetadata({ ? postNdjson({ url: joinUrl(instanceUrl, API_PATHS.fieldValues), apiKey, - onWarning: warn, requests: remapForeignKey({ jsonPath: JSON_PATHS.fieldValues, sourceFile: fieldValuesFile, @@ -463,7 +458,12 @@ export async function uploadMetadata({ if ("error" in response) { result.fieldValues.errors += 1; warn( - formatError("Field values", response.field_id, response, responseIndex), + formatError( + "Field values", + response.field_id, + response, + responseIndex, + ), ); return; } From 0305c276cca8f41b7e23b220d0dfce56afe87e4e Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 17:05:26 -0400 Subject: [PATCH 8/9] version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 743dea8..a63fd5f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@metabase/database-metadata", - "version": "1.1.0", + "version": "1.0.3", "description": "CLI tool to extract Metabase database metadata into YAML files", "license": "SEE LICENSE IN LICENSE.txt", "repository": { From d2df2cab8a9e216c261f7aa4ac95ec266d9e00ee Mon Sep 17 00:00:00 2001 From: Aleksandr Lesnenko Date: Wed, 22 Apr 2026 17:05:52 -0400 Subject: [PATCH 9/9] version --- README.md | 2 +- core-spec/v1/spec.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e6c4fa0..bd1874d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ This repository contains the specification, examples, and a CLI that converts th ## Specification -The format is defined in **[core-spec/v1/spec.md](core-spec/v1/spec.md)** (v1.1.0). It covers entity keys, field types, folder structure, sampled field values, and the shape of each entity. +The format is defined in **[core-spec/v1/spec.md](core-spec/v1/spec.md)** (v1.0.3). It covers entity keys, field types, folder structure, sampled field values, and the shape of each entity. Reference output for the Sample Database lives in **[examples/v1/](examples/v1/)** — both the raw `metadata.json` returned by the endpoint and the extracted YAML tree. diff --git a/core-spec/v1/spec.md b/core-spec/v1/spec.md index abaa627..3d3633d 100644 --- a/core-spec/v1/spec.md +++ b/core-spec/v1/spec.md @@ -1,6 +1,6 @@ # Metabase Database Metadata Format -**Version:** 1.1.0 +**Version:** 1.0.3 ## Overview