From df98b144db2ca3d406e549289e574f83c0282e5f Mon Sep 17 00:00:00 2001 From: Artem Shibakov Date: Tue, 7 Oct 2025 11:57:55 +0400 Subject: [PATCH 1/3] feat: add snapshot status polling while downloading --- src/api/datasets/snapshot.ts | 59 ++++++++++++++++++++++++++++++++---- src/schemas/datasets.ts | 6 +++- src/types/datasets.ts | 10 +++++- src/utils/misc.ts | 6 ++++ 4 files changed, 73 insertions(+), 8 deletions(-) diff --git a/src/api/datasets/snapshot.ts b/src/api/datasets/snapshot.ts index d5a21b0..1ef773f 100644 --- a/src/api/datasets/snapshot.ts +++ b/src/api/datasets/snapshot.ts @@ -1,14 +1,17 @@ +import { type Dispatcher } from 'undici'; import { API_ENDPOINT } from '../../utils/constants'; import { APIError, BRDError } from '../../utils/errors'; import { request, getDispatcher, assertResponse } from '../../utils/net'; -import { parseJSON } from '../../utils/misc'; +import { parseJSON, getRandomInt, sleep } from '../../utils/misc'; import { SnapshotIdSchema, + SnapshotDownloadEndpointOptionsSchema, SnapshotDownloadOptionsSchema, } from '../../schemas/datasets'; import { assertSchema } from '../../schemas/utils'; import type { SnapshotDownloadOptions, + SnapshotDownloadEndpointOptions, SnapshotStatusResponse, } from '../../types/datasets'; import { BaseAPI, BaseAPIOptions } from './base'; @@ -38,18 +41,27 @@ export class SnapshotAPI extends BaseAPI { * @param opts - Download options including format and compression settings * @returns A promise that resolves with the snapshot data */ - async download(snapshotId: string, options: SnapshotDownloadOptions = {}) { + async download( + snapshotId: string, + downloadOptions?: SnapshotDownloadEndpointOptions, + options?: SnapshotDownloadOptions, + ) { const safeId = assertSchema( SnapshotIdSchema, snapshotId, 'snapshot.download: invalid snapshot id', ); + const safeEOpts = assertSchema( + SnapshotDownloadEndpointOptionsSchema, + downloadOptions || {}, + 'snapshot.download: invalid options', + ); const safeOpts = assertSchema( SnapshotDownloadOptionsSchema, - options, + options || {}, 'snapshot.download: invalid options', ); - return this.#download(safeId, safeOpts); + return this.#download(safeId, safeEOpts, safeOpts); } /** * Cancel the dataset gathering process. @@ -85,7 +97,11 @@ export class SnapshotAPI extends BaseAPI { } } - async #download(snapshotId: string, opts: SnapshotDownloadOptions = {}) { + async #download( + snapshotId: string, + endpointOpts: SnapshotDownloadEndpointOptions, + options: SnapshotDownloadOptions, + ): Promise { this.logger.info(`fetching snapshot for id ${snapshotId}`); const url = API_ENDPOINT.SNAPSHOT_DOWNLOAD.replace( '{snapshot_id}', @@ -95,10 +111,21 @@ export class SnapshotAPI extends BaseAPI { try { const response = await request(url, { headers: this.authHeaders, - query: opts, + query: endpointOpts, dispatcher: getDispatcher(), }); + if (response.statusCode === 202) { + if (!options.statusPolling) { + throw new BRDError( + 'snapshot is not ready yet, please try again later', + ); + } + + await this.#awaitReady(snapshotId); + return this.#download(snapshotId, endpointOpts, options); + } + return await assertResponse(response, false); } catch (e: unknown) { if (e instanceof BRDError) throw e; @@ -106,6 +133,26 @@ export class SnapshotAPI extends BaseAPI { } } + async #awaitReady(snapshotId: string): Promise { + this.logger.info(`polling snapshot status for id ${snapshotId}`); + + for (;;) { + const { status } = await this.#getStatus(snapshotId); + + if (status === 'ready') break; + if (status === 'failed') { + throw new BRDError('snapshot generation failed'); + } + + const delayMs = getRandomInt(10_000, 30_000); + this.logger.info( + `snapshot ${snapshotId} is not ready yet, waiting for ${delayMs}ms`, + ); + + await sleep(delayMs); + } + } + async #cancel(snapshotId: string) { this.logger.info(`cancelling snapshot for id ${snapshotId}`); const url = API_ENDPOINT.SNAPSHOT_CANCEL.replace( diff --git a/src/schemas/datasets.ts b/src/schemas/datasets.ts index 19fc680..ef26369 100644 --- a/src/schemas/datasets.ts +++ b/src/schemas/datasets.ts @@ -52,11 +52,15 @@ export const ChatGPTInputSchema = z v.map((item) => ({ ...item, url: 'https://chatgpt.com/' })), ); -export const SnapshotDownloadOptionsSchema = z.object({ +export const SnapshotDownloadEndpointOptionsSchema = z.object({ format: SnapshotFormatSchema, compress: z.boolean().default(false), }); +export const SnapshotDownloadOptionsSchema = z.object({ + statusPolling: z.boolean().default(true), +}); + export const SnapshotIdSchema = z .string() .trim() diff --git a/src/types/datasets.ts b/src/types/datasets.ts index 6a4b878..85dccb0 100644 --- a/src/types/datasets.ts +++ b/src/types/datasets.ts @@ -48,11 +48,19 @@ export type DiscoverOptions = Omit< 'async' | 'discoverBy' | 'type' >; -export interface SnapshotDownloadOptions { +export interface SnapshotDownloadEndpointOptions { format?: SnapshotFormat; compress?: boolean; } +export interface SnapshotDownloadOptions { + /** + * If set to true, the SDK will poll the snapshot status until it is ready + * @default true + */ + statusPolling: boolean; +} + export type SnapshotStatus = 'running' | 'ready' | 'failed'; export interface SnapshotMeta { diff --git a/src/utils/misc.ts b/src/utils/misc.ts index da37da5..629ef1d 100644 --- a/src/utils/misc.ts +++ b/src/utils/misc.ts @@ -30,3 +30,9 @@ export const dropEmptyKeys = (obj: Record) => { export const maskKey = (key: string) => key.length > 8 ? `***${key.slice(-4)}` : '***'; + +export const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + +export const getRandomInt = (min: number, max: number) => + Math.floor(Math.random() * (max - min + 1)) + min; From 7cea26855e06a226fd8272e120486da03360c1ac Mon Sep 17 00:00:00 2001 From: Artem Shibakov Date: Tue, 7 Oct 2025 12:22:51 +0400 Subject: [PATCH 2/3] feat: add stream wrapper --- src/utils/net.ts | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/utils/net.ts b/src/utils/net.ts index 38e5be4..24a273e 100644 --- a/src/utils/net.ts +++ b/src/utils/net.ts @@ -3,7 +3,9 @@ import { Agent, interceptors, request as lib_request, + stream as lib_stream, } from 'undici'; +import type { UrlObject } from 'node:url'; import { DEFAULT_TIMEOUT, MAX_RETRIES, @@ -33,25 +35,36 @@ export const getDispatcher = (params: GetDispatcherOptions = {}) => { ); }; -export const request: typeof lib_request = async (url, opts) => { +const log = ( + method = 'GET', + url: string | URL | UrlObject, + query?: Record, + body?: unknown, +) => { let meta = ''; - if (opts?.query) { - meta += ` query=${JSON.stringify(opts.query)}`; + if (query) { + meta += ` query=${JSON.stringify(query)}`; } - if (opts?.body) { + + if (typeof body === 'string') { if (meta) meta += ' '; - meta += 'body='; - meta += - typeof opts.body === 'string' - ? opts.body - : JSON.stringify(opts.body); + meta += `body=${body}`; } - logRequest(opts?.method || 'GET', JSON.stringify(url), meta); + logRequest(method, JSON.stringify(url), meta); +}; + +export const request: typeof lib_request = async (url, opts) => { + log(opts?.method, url, opts?.query, opts?.body); return lib_request(url, opts); }; +export const stream: typeof lib_stream = async (url, opts, factory) => { + log(opts?.method, url, opts?.query, opts?.body); + return lib_stream(url, opts, factory); +}; + export async function assertResponse( response: Dispatcher.ResponseData, parse?: true, From 0a85d6e6fbba98393779179ba997813549ecdbcd Mon Sep 17 00:00:00 2001 From: Artem Shibakov Date: Tue, 7 Oct 2025 18:42:47 +0400 Subject: [PATCH 3/3] feat: saving snapshots via streams --- src/api/datasets/snapshot.ts | 89 ++++++++++++++++++++++-------------- src/schemas/datasets.ts | 11 +++-- src/schemas/misc.ts | 6 +-- src/schemas/shared.ts | 5 ++ src/types/datasets.ts | 8 ++-- src/utils/files.ts | 37 +++++++++++++-- src/utils/net.ts | 28 ++++++------ 7 files changed, 118 insertions(+), 66 deletions(-) diff --git a/src/api/datasets/snapshot.ts b/src/api/datasets/snapshot.ts index 1ef773f..0330983 100644 --- a/src/api/datasets/snapshot.ts +++ b/src/api/datasets/snapshot.ts @@ -1,21 +1,40 @@ -import { type Dispatcher } from 'undici'; import { API_ENDPOINT } from '../../utils/constants'; import { APIError, BRDError } from '../../utils/errors'; -import { request, getDispatcher, assertResponse } from '../../utils/net'; +import { + request, + stream, + getDispatcher, + assertResponse, + throwInvalidStatus, +} from '../../utils/net'; +import { + routeDownloadStream, + getFilename, + getAbsAndEnsureDir, +} from '../../utils/files'; import { parseJSON, getRandomInt, sleep } from '../../utils/misc'; import { SnapshotIdSchema, - SnapshotDownloadEndpointOptionsSchema, SnapshotDownloadOptionsSchema, + SnapshotDownloadOptionsSchemaType, } from '../../schemas/datasets'; import { assertSchema } from '../../schemas/utils'; import type { SnapshotDownloadOptions, - SnapshotDownloadEndpointOptions, SnapshotStatusResponse, } from '../../types/datasets'; import { BaseAPI, BaseAPIOptions } from './base'; +const assertDownloadStatus = (status: number) => { + if (status < 202) return; + + if (status === 202) { + throw new BRDError('snapshot is not ready yet, please try again later'); + } + + throwInvalidStatus(status, 'snapshot download failed'); +}; + export class SnapshotAPI extends BaseAPI { constructor(opts: BaseAPIOptions) { super(opts); @@ -39,29 +58,20 @@ export class SnapshotAPI extends BaseAPI { * Download the data from a dataset snapshot. * @param snapshotId - The unique identifier of the snapshot * @param opts - Download options including format and compression settings - * @returns A promise that resolves with the snapshot data + * @returns A promise that resolves with the full filename where the data is saved */ - async download( - snapshotId: string, - downloadOptions?: SnapshotDownloadEndpointOptions, - options?: SnapshotDownloadOptions, - ) { + async download(snapshotId: string, options?: SnapshotDownloadOptions) { const safeId = assertSchema( SnapshotIdSchema, snapshotId, 'snapshot.download: invalid snapshot id', ); - const safeEOpts = assertSchema( - SnapshotDownloadEndpointOptionsSchema, - downloadOptions || {}, - 'snapshot.download: invalid options', - ); const safeOpts = assertSchema( SnapshotDownloadOptionsSchema, options || {}, 'snapshot.download: invalid options', ); - return this.#download(safeId, safeEOpts, safeOpts); + return this.#download(safeId, safeOpts); } /** * Cancel the dataset gathering process. @@ -99,34 +109,45 @@ export class SnapshotAPI extends BaseAPI { async #download( snapshotId: string, - endpointOpts: SnapshotDownloadEndpointOptions, - options: SnapshotDownloadOptions, - ): Promise { + options: SnapshotDownloadOptionsSchemaType, + ): Promise { this.logger.info(`fetching snapshot for id ${snapshotId}`); + const url = API_ENDPOINT.SNAPSHOT_DOWNLOAD.replace( '{snapshot_id}', snapshotId, ); try { - const response = await request(url, { - headers: this.authHeaders, - query: endpointOpts, - dispatcher: getDispatcher(), - }); - - if (response.statusCode === 202) { - if (!options.statusPolling) { - throw new BRDError( - 'snapshot is not ready yet, please try again later', - ); - } - + if (options.statusPolling) { await this.#awaitReady(snapshotId); - return this.#download(snapshotId, endpointOpts, options); } - return await assertResponse(response, false); + const filename = getFilename(options.filename, options.format); + const target = await getAbsAndEnsureDir(filename); + + this.logger.info( + `starting streaming snapshot ${snapshotId} data to ${target}`, + ); + + await stream( + url, + { + method: 'GET', + headers: this.authHeaders, + query: { + format: options.format, + compress: options.compress, + }, + opaque: { + filename: target, + assertStatus: assertDownloadStatus, + }, + }, + routeDownloadStream, + ); + + return target; } catch (e: unknown) { if (e instanceof BRDError) throw e; throw new APIError(`operation failed: ${(e as Error).message}`); diff --git a/src/schemas/datasets.ts b/src/schemas/datasets.ts index ef26369..18b9918 100644 --- a/src/schemas/datasets.ts +++ b/src/schemas/datasets.ts @@ -1,4 +1,5 @@ import { z } from 'zod'; +import { FilenameSchema } from './shared'; const SnapshotFormatSchema = z .enum(['json', 'csv', 'ndjson', 'jsonl']) @@ -52,15 +53,17 @@ export const ChatGPTInputSchema = z v.map((item) => ({ ...item, url: 'https://chatgpt.com/' })), ); -export const SnapshotDownloadEndpointOptionsSchema = z.object({ +export const SnapshotDownloadOptionsSchema = z.object({ format: SnapshotFormatSchema, compress: z.boolean().default(false), -}); - -export const SnapshotDownloadOptionsSchema = z.object({ + filename: FilenameSchema.optional(), statusPolling: z.boolean().default(true), }); +export type SnapshotDownloadOptionsSchemaType = z.infer< + typeof SnapshotDownloadOptionsSchema +>; + export const SnapshotIdSchema = z .string() .trim() diff --git a/src/schemas/misc.ts b/src/schemas/misc.ts index 2fff5a6..1c38f20 100644 --- a/src/schemas/misc.ts +++ b/src/schemas/misc.ts @@ -1,15 +1,11 @@ import { z } from 'zod'; +import { FilenameSchema } from './shared'; const ContentFormatSchema = z .enum(['json', 'txt', 'JSON', 'TXT']) .transform((v) => v.toLowerCase() as 'json' | 'txt') .default('json'); -const FilenameSchema = z - .string() - .min(1) - .transform((v) => v.replace(/[<>:"\\|?*]/g, '_')); - export const SaveOptionsSchema = z.object({ filename: FilenameSchema.optional(), format: ContentFormatSchema, diff --git a/src/schemas/shared.ts b/src/schemas/shared.ts index 0838b4f..c59e8e2 100644 --- a/src/schemas/shared.ts +++ b/src/schemas/shared.ts @@ -15,3 +15,8 @@ export const ZoneNameSchema = z .refine((val) => !val.endsWith('_'), { message: 'zone name cannot end with an underscore', }); + +export const FilenameSchema = z + .string() + .min(1) + .transform((v) => v.replace(/[<>:"\\|?*]/g, '_')); diff --git a/src/types/datasets.ts b/src/types/datasets.ts index 85dccb0..d05784b 100644 --- a/src/types/datasets.ts +++ b/src/types/datasets.ts @@ -48,17 +48,15 @@ export type DiscoverOptions = Omit< 'async' | 'discoverBy' | 'type' >; -export interface SnapshotDownloadEndpointOptions { - format?: SnapshotFormat; - compress?: boolean; -} - export interface SnapshotDownloadOptions { /** * If set to true, the SDK will poll the snapshot status until it is ready * @default true */ statusPolling: boolean; + filename?: string; + format: string; + compress: boolean; } export type SnapshotStatus = 'running' | 'ready' | 'failed'; diff --git a/src/utils/files.ts b/src/utils/files.ts index d8c613d..b890d42 100644 --- a/src/utils/files.ts +++ b/src/utils/files.ts @@ -1,6 +1,7 @@ +import { createWriteStream, type Stats } from 'node:fs'; import fs from 'node:fs/promises'; -import type { Stats } from 'node:fs'; import path from 'node:path'; +import { type Dispatcher } from 'undici'; import { getLogger } from './logger'; import { BRDError, FSError } from './errors'; import { isStrArray } from './misc'; @@ -34,7 +35,7 @@ export const stringifyResults = ( export const getFilename = ( filename: string | void, - format: ContentFormat, + format: string, ): string => { if (filename) { return path.extname(filename) ? filename : `${filename}.${format}`; @@ -42,16 +43,26 @@ export const getFilename = ( return `brightdata_content_${Date.now()}.${format}`; }; -export const writeContent = async (content: string, filename: string) => { +export const getAbsAndEnsureDir = async (filename: string) => { try { const target = path.resolve(filename); + await fs.mkdir(path.dirname(target), { recursive: true }); + return target; + } catch (e: unknown) { + const msg = `failed to create dirs ${filename}:`; + throw new FSError(`${msg} ${(e as Error).message}`); + } +}; + +export const writeContent = async (content: string, filename: string) => { + try { + const target = await getAbsAndEnsureDir(filename); logger.info(`writing ${target}`); - await fs.mkdir(path.dirname(target), { recursive: true }); await fs.writeFile(target, content, 'utf8'); const stats = await statSafe(target); - if (!stats) throw new Error('file was not created successfully'); + if (!stats) throw new FSError('file was not created successfully'); logger.info(`written successfully: ${target} (${stats.size} bytes)`); return target; @@ -73,3 +84,19 @@ export const writeContent = async (content: string, filename: string) => { throw new FSError(`${msg} ${err.message}`); } }; + +export interface WritingOpaque { + filename: string; + assertStatus?: (status: number) => void; +} + +export const routeDownloadStream: Dispatcher.StreamFactory = ({ + statusCode, + opaque, +}) => { + const { assertStatus, filename } = opaque; + + assertStatus?.(statusCode); + + return createWriteStream(filename); +}; diff --git a/src/utils/net.ts b/src/utils/net.ts index 24a273e..717fb2f 100644 --- a/src/utils/net.ts +++ b/src/utils/net.ts @@ -65,6 +65,20 @@ export const stream: typeof lib_stream = async (url, opts, factory) => { return lib_stream(url, opts, factory); }; +export function throwInvalidStatus(status: number, responseTxt: string): never { + if (status === 401) { + throw new AuthenticationError( + 'invalid API key or insufficient permissions', + ); + } + + if (status === 400) { + throw new ValidationError(`bad request: ${responseTxt}`); + } + + throw new APIError(`request failed`, status, responseTxt); +} + export async function assertResponse( response: Dispatcher.ResponseData, parse?: true, @@ -81,17 +95,5 @@ export async function assertResponse( return parse ? await response.body.text() : response.body; } - if (response.statusCode === 401) { - throw new AuthenticationError( - 'invalid API key or insufficient permissions', - ); - } - - const responseTxt = await response.body.text(); - - if (response.statusCode === 400) { - throw new ValidationError(`bad request: ${responseTxt}`); - } - - throw new APIError(`request failed`, response.statusCode, responseTxt); + throwInvalidStatus(response.statusCode, await response.body.text()); }