diff --git a/src/api/datasets/snapshot.ts b/src/api/datasets/snapshot.ts index d5a21b0..0330983 100644 --- a/src/api/datasets/snapshot.ts +++ b/src/api/datasets/snapshot.ts @@ -1,10 +1,22 @@ import { API_ENDPOINT } from '../../utils/constants'; import { APIError, BRDError } from '../../utils/errors'; -import { request, getDispatcher, assertResponse } from '../../utils/net'; -import { parseJSON } from '../../utils/misc'; +import { + request, + stream, + getDispatcher, + assertResponse, + throwInvalidStatus, +} from '../../utils/net'; +import { + routeDownloadStream, + getFilename, + getAbsAndEnsureDir, +} from '../../utils/files'; +import { parseJSON, getRandomInt, sleep } from '../../utils/misc'; import { SnapshotIdSchema, SnapshotDownloadOptionsSchema, + SnapshotDownloadOptionsSchemaType, } from '../../schemas/datasets'; import { assertSchema } from '../../schemas/utils'; import type { @@ -13,6 +25,16 @@ import type { } from '../../types/datasets'; import { BaseAPI, BaseAPIOptions } from './base'; +const assertDownloadStatus = (status: number) => { + if (status < 202) return; + + if (status === 202) { + throw new BRDError('snapshot is not ready yet, please try again later'); + } + + throwInvalidStatus(status, 'snapshot download failed'); +}; + export class SnapshotAPI extends BaseAPI { constructor(opts: BaseAPIOptions) { super(opts); @@ -36,9 +58,9 @@ export class SnapshotAPI extends BaseAPI { * Download the data from a dataset snapshot. * @param snapshotId - The unique identifier of the snapshot * @param opts - Download options including format and compression settings - * @returns A promise that resolves with the snapshot data + * @returns A promise that resolves with the full filename where the data is saved */ - async download(snapshotId: string, options: SnapshotDownloadOptions = {}) { + async download(snapshotId: string, options?: SnapshotDownloadOptions) { const safeId = assertSchema( SnapshotIdSchema, snapshotId, @@ -46,7 +68,7 @@ export class SnapshotAPI extends BaseAPI { ); const safeOpts = assertSchema( SnapshotDownloadOptionsSchema, - options, + options || {}, 'snapshot.download: invalid options', ); return this.#download(safeId, safeOpts); @@ -85,27 +107,73 @@ export class SnapshotAPI extends BaseAPI { } } - async #download(snapshotId: string, opts: SnapshotDownloadOptions = {}) { + async #download( + snapshotId: string, + options: SnapshotDownloadOptionsSchemaType, + ): Promise { this.logger.info(`fetching snapshot for id ${snapshotId}`); + const url = API_ENDPOINT.SNAPSHOT_DOWNLOAD.replace( '{snapshot_id}', snapshotId, ); try { - const response = await request(url, { - headers: this.authHeaders, - query: opts, - dispatcher: getDispatcher(), - }); + if (options.statusPolling) { + await this.#awaitReady(snapshotId); + } - return await assertResponse(response, false); + const filename = getFilename(options.filename, options.format); + const target = await getAbsAndEnsureDir(filename); + + this.logger.info( + `starting streaming snapshot ${snapshotId} data to ${target}`, + ); + + await stream( + url, + { + method: 'GET', + headers: this.authHeaders, + query: { + format: options.format, + compress: options.compress, + }, + opaque: { + filename: target, + assertStatus: assertDownloadStatus, + }, + }, + routeDownloadStream, + ); + + return target; } catch (e: unknown) { if (e instanceof BRDError) throw e; throw new APIError(`operation failed: ${(e as Error).message}`); } } + async #awaitReady(snapshotId: string): Promise { + this.logger.info(`polling snapshot status for id ${snapshotId}`); + + for (;;) { + const { status } = await this.#getStatus(snapshotId); + + if (status === 'ready') break; + if (status === 'failed') { + throw new BRDError('snapshot generation failed'); + } + + const delayMs = getRandomInt(10_000, 30_000); + this.logger.info( + `snapshot ${snapshotId} is not ready yet, waiting for ${delayMs}ms`, + ); + + await sleep(delayMs); + } + } + async #cancel(snapshotId: string) { this.logger.info(`cancelling snapshot for id ${snapshotId}`); const url = API_ENDPOINT.SNAPSHOT_CANCEL.replace( diff --git a/src/schemas/datasets.ts b/src/schemas/datasets.ts index 19fc680..18b9918 100644 --- a/src/schemas/datasets.ts +++ b/src/schemas/datasets.ts @@ -1,4 +1,5 @@ import { z } from 'zod'; +import { FilenameSchema } from './shared'; const SnapshotFormatSchema = z .enum(['json', 'csv', 'ndjson', 'jsonl']) @@ -55,8 +56,14 @@ export const ChatGPTInputSchema = z export const SnapshotDownloadOptionsSchema = z.object({ format: SnapshotFormatSchema, compress: z.boolean().default(false), + filename: FilenameSchema.optional(), + statusPolling: z.boolean().default(true), }); +export type SnapshotDownloadOptionsSchemaType = z.infer< + typeof SnapshotDownloadOptionsSchema +>; + export const SnapshotIdSchema = z .string() .trim() diff --git a/src/schemas/misc.ts b/src/schemas/misc.ts index 2fff5a6..1c38f20 100644 --- a/src/schemas/misc.ts +++ b/src/schemas/misc.ts @@ -1,15 +1,11 @@ import { z } from 'zod'; +import { FilenameSchema } from './shared'; const ContentFormatSchema = z .enum(['json', 'txt', 'JSON', 'TXT']) .transform((v) => v.toLowerCase() as 'json' | 'txt') .default('json'); -const FilenameSchema = z - .string() - .min(1) - .transform((v) => v.replace(/[<>:"\\|?*]/g, '_')); - export const SaveOptionsSchema = z.object({ filename: FilenameSchema.optional(), format: ContentFormatSchema, diff --git a/src/schemas/shared.ts b/src/schemas/shared.ts index 0838b4f..c59e8e2 100644 --- a/src/schemas/shared.ts +++ b/src/schemas/shared.ts @@ -15,3 +15,8 @@ export const ZoneNameSchema = z .refine((val) => !val.endsWith('_'), { message: 'zone name cannot end with an underscore', }); + +export const FilenameSchema = z + .string() + .min(1) + .transform((v) => v.replace(/[<>:"\\|?*]/g, '_')); diff --git a/src/types/datasets.ts b/src/types/datasets.ts index 6a4b878..d05784b 100644 --- a/src/types/datasets.ts +++ b/src/types/datasets.ts @@ -49,8 +49,14 @@ export type DiscoverOptions = Omit< >; export interface SnapshotDownloadOptions { - format?: SnapshotFormat; - compress?: boolean; + /** + * If set to true, the SDK will poll the snapshot status until it is ready + * @default true + */ + statusPolling: boolean; + filename?: string; + format: string; + compress: boolean; } export type SnapshotStatus = 'running' | 'ready' | 'failed'; diff --git a/src/utils/files.ts b/src/utils/files.ts index d8c613d..b890d42 100644 --- a/src/utils/files.ts +++ b/src/utils/files.ts @@ -1,6 +1,7 @@ +import { createWriteStream, type Stats } from 'node:fs'; import fs from 'node:fs/promises'; -import type { Stats } from 'node:fs'; import path from 'node:path'; +import { type Dispatcher } from 'undici'; import { getLogger } from './logger'; import { BRDError, FSError } from './errors'; import { isStrArray } from './misc'; @@ -34,7 +35,7 @@ export const stringifyResults = ( export const getFilename = ( filename: string | void, - format: ContentFormat, + format: string, ): string => { if (filename) { return path.extname(filename) ? filename : `${filename}.${format}`; @@ -42,16 +43,26 @@ export const getFilename = ( return `brightdata_content_${Date.now()}.${format}`; }; -export const writeContent = async (content: string, filename: string) => { +export const getAbsAndEnsureDir = async (filename: string) => { try { const target = path.resolve(filename); + await fs.mkdir(path.dirname(target), { recursive: true }); + return target; + } catch (e: unknown) { + const msg = `failed to create dirs ${filename}:`; + throw new FSError(`${msg} ${(e as Error).message}`); + } +}; + +export const writeContent = async (content: string, filename: string) => { + try { + const target = await getAbsAndEnsureDir(filename); logger.info(`writing ${target}`); - await fs.mkdir(path.dirname(target), { recursive: true }); await fs.writeFile(target, content, 'utf8'); const stats = await statSafe(target); - if (!stats) throw new Error('file was not created successfully'); + if (!stats) throw new FSError('file was not created successfully'); logger.info(`written successfully: ${target} (${stats.size} bytes)`); return target; @@ -73,3 +84,19 @@ export const writeContent = async (content: string, filename: string) => { throw new FSError(`${msg} ${err.message}`); } }; + +export interface WritingOpaque { + filename: string; + assertStatus?: (status: number) => void; +} + +export const routeDownloadStream: Dispatcher.StreamFactory = ({ + statusCode, + opaque, +}) => { + const { assertStatus, filename } = opaque; + + assertStatus?.(statusCode); + + return createWriteStream(filename); +}; diff --git a/src/utils/misc.ts b/src/utils/misc.ts index da37da5..629ef1d 100644 --- a/src/utils/misc.ts +++ b/src/utils/misc.ts @@ -30,3 +30,9 @@ export const dropEmptyKeys = (obj: Record) => { export const maskKey = (key: string) => key.length > 8 ? `***${key.slice(-4)}` : '***'; + +export const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + +export const getRandomInt = (min: number, max: number) => + Math.floor(Math.random() * (max - min + 1)) + min; diff --git a/src/utils/net.ts b/src/utils/net.ts index 38e5be4..717fb2f 100644 --- a/src/utils/net.ts +++ b/src/utils/net.ts @@ -3,7 +3,9 @@ import { Agent, interceptors, request as lib_request, + stream as lib_stream, } from 'undici'; +import type { UrlObject } from 'node:url'; import { DEFAULT_TIMEOUT, MAX_RETRIES, @@ -33,25 +35,50 @@ export const getDispatcher = (params: GetDispatcherOptions = {}) => { ); }; -export const request: typeof lib_request = async (url, opts) => { +const log = ( + method = 'GET', + url: string | URL | UrlObject, + query?: Record, + body?: unknown, +) => { let meta = ''; - if (opts?.query) { - meta += ` query=${JSON.stringify(opts.query)}`; + if (query) { + meta += ` query=${JSON.stringify(query)}`; } - if (opts?.body) { + + if (typeof body === 'string') { if (meta) meta += ' '; - meta += 'body='; - meta += - typeof opts.body === 'string' - ? opts.body - : JSON.stringify(opts.body); + meta += `body=${body}`; } - logRequest(opts?.method || 'GET', JSON.stringify(url), meta); + logRequest(method, JSON.stringify(url), meta); +}; + +export const request: typeof lib_request = async (url, opts) => { + log(opts?.method, url, opts?.query, opts?.body); return lib_request(url, opts); }; +export const stream: typeof lib_stream = async (url, opts, factory) => { + log(opts?.method, url, opts?.query, opts?.body); + return lib_stream(url, opts, factory); +}; + +export function throwInvalidStatus(status: number, responseTxt: string): never { + if (status === 401) { + throw new AuthenticationError( + 'invalid API key or insufficient permissions', + ); + } + + if (status === 400) { + throw new ValidationError(`bad request: ${responseTxt}`); + } + + throw new APIError(`request failed`, status, responseTxt); +} + export async function assertResponse( response: Dispatcher.ResponseData, parse?: true, @@ -68,17 +95,5 @@ export async function assertResponse( return parse ? await response.body.text() : response.body; } - if (response.statusCode === 401) { - throw new AuthenticationError( - 'invalid API key or insufficient permissions', - ); - } - - const responseTxt = await response.body.text(); - - if (response.statusCode === 400) { - throw new ValidationError(`bad request: ${responseTxt}`); - } - - throw new APIError(`request failed`, response.statusCode, responseTxt); + throwInvalidStatus(response.statusCode, await response.body.text()); }