Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 80 additions & 12 deletions src/api/datasets/snapshot.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
import { API_ENDPOINT } from '../../utils/constants';
import { APIError, BRDError } from '../../utils/errors';
import { request, getDispatcher, assertResponse } from '../../utils/net';
import { parseJSON } from '../../utils/misc';
import {
request,
stream,
getDispatcher,
assertResponse,
throwInvalidStatus,
} from '../../utils/net';
import {
routeDownloadStream,
getFilename,
getAbsAndEnsureDir,
} from '../../utils/files';
import { parseJSON, getRandomInt, sleep } from '../../utils/misc';
import {
SnapshotIdSchema,
SnapshotDownloadOptionsSchema,
SnapshotDownloadOptionsSchemaType,
} from '../../schemas/datasets';
import { assertSchema } from '../../schemas/utils';
import type {
Expand All @@ -13,6 +25,16 @@ import type {
} from '../../types/datasets';
import { BaseAPI, BaseAPIOptions } from './base';

const assertDownloadStatus = (status: number) => {
if (status < 202) return;

if (status === 202) {
throw new BRDError('snapshot is not ready yet, please try again later');
}

throwInvalidStatus(status, 'snapshot download failed');
};

export class SnapshotAPI extends BaseAPI {
constructor(opts: BaseAPIOptions) {
super(opts);
Expand All @@ -36,17 +58,17 @@ export class SnapshotAPI extends BaseAPI {
* Download the data from a dataset snapshot.
* @param snapshotId - The unique identifier of the snapshot
* @param opts - Download options including format and compression settings
* @returns A promise that resolves with the snapshot data
* @returns A promise that resolves with the full filename where the data is saved
*/
async download(snapshotId: string, options: SnapshotDownloadOptions = {}) {
async download(snapshotId: string, options?: SnapshotDownloadOptions) {
const safeId = assertSchema(
SnapshotIdSchema,
snapshotId,
'snapshot.download: invalid snapshot id',
);
const safeOpts = assertSchema(
SnapshotDownloadOptionsSchema,
options,
options || {},
'snapshot.download: invalid options',
);
return this.#download(safeId, safeOpts);
Expand Down Expand Up @@ -85,27 +107,73 @@ export class SnapshotAPI extends BaseAPI {
}
}

async #download(snapshotId: string, opts: SnapshotDownloadOptions = {}) {
async #download(
snapshotId: string,
options: SnapshotDownloadOptionsSchemaType,
): Promise<string> {
this.logger.info(`fetching snapshot for id ${snapshotId}`);

const url = API_ENDPOINT.SNAPSHOT_DOWNLOAD.replace(
'{snapshot_id}',
snapshotId,
);

try {
const response = await request(url, {
headers: this.authHeaders,
query: opts,
dispatcher: getDispatcher(),
});
if (options.statusPolling) {
await this.#awaitReady(snapshotId);
}

return await assertResponse(response, false);
const filename = getFilename(options.filename, options.format);
const target = await getAbsAndEnsureDir(filename);

this.logger.info(
`starting streaming snapshot ${snapshotId} data to ${target}`,
);

await stream(
url,
{
method: 'GET',
headers: this.authHeaders,
query: {
format: options.format,
compress: options.compress,
},
opaque: {
filename: target,
assertStatus: assertDownloadStatus,
},
},
routeDownloadStream,
);

return target;
} catch (e: unknown) {
if (e instanceof BRDError) throw e;
throw new APIError(`operation failed: ${(e as Error).message}`);
}
}

async #awaitReady(snapshotId: string): Promise<void> {
this.logger.info(`polling snapshot status for id ${snapshotId}`);

for (;;) {
const { status } = await this.#getStatus(snapshotId);

if (status === 'ready') break;
if (status === 'failed') {
throw new BRDError('snapshot generation failed');
}

const delayMs = getRandomInt(10_000, 30_000);
this.logger.info(
`snapshot ${snapshotId} is not ready yet, waiting for ${delayMs}ms`,
);

await sleep(delayMs);
}
}

async #cancel(snapshotId: string) {
this.logger.info(`cancelling snapshot for id ${snapshotId}`);
const url = API_ENDPOINT.SNAPSHOT_CANCEL.replace(
Expand Down
7 changes: 7 additions & 0 deletions src/schemas/datasets.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { z } from 'zod';
import { FilenameSchema } from './shared';

const SnapshotFormatSchema = z
.enum(['json', 'csv', 'ndjson', 'jsonl'])
Expand Down Expand Up @@ -55,8 +56,14 @@ export const ChatGPTInputSchema = z
export const SnapshotDownloadOptionsSchema = z.object({
format: SnapshotFormatSchema,
compress: z.boolean().default(false),
filename: FilenameSchema.optional(),
statusPolling: z.boolean().default(true),
});

export type SnapshotDownloadOptionsSchemaType = z.infer<
typeof SnapshotDownloadOptionsSchema
>;

export const SnapshotIdSchema = z
.string()
.trim()
Expand Down
6 changes: 1 addition & 5 deletions src/schemas/misc.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { z } from 'zod';
import { FilenameSchema } from './shared';

const ContentFormatSchema = z
.enum(['json', 'txt', 'JSON', 'TXT'])
.transform((v) => v.toLowerCase() as 'json' | 'txt')
.default('json');

const FilenameSchema = z
.string()
.min(1)
.transform((v) => v.replace(/[<>:"\\|?*]/g, '_'));

export const SaveOptionsSchema = z.object({
filename: FilenameSchema.optional(),
format: ContentFormatSchema,
Expand Down
5 changes: 5 additions & 0 deletions src/schemas/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ export const ZoneNameSchema = z
.refine((val) => !val.endsWith('_'), {
message: 'zone name cannot end with an underscore',
});

export const FilenameSchema = z
.string()
.min(1)
.transform((v) => v.replace(/[<>:"\\|?*]/g, '_'));
10 changes: 8 additions & 2 deletions src/types/datasets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ export type DiscoverOptions = Omit<
>;

export interface SnapshotDownloadOptions {
format?: SnapshotFormat;
compress?: boolean;
/**
* If set to true, the SDK will poll the snapshot status until it is ready
* @default true
*/
statusPolling: boolean;
filename?: string;
format: string;
compress: boolean;
}

export type SnapshotStatus = 'running' | 'ready' | 'failed';
Expand Down
37 changes: 32 additions & 5 deletions src/utils/files.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createWriteStream, type Stats } from 'node:fs';
import fs from 'node:fs/promises';
import type { Stats } from 'node:fs';
import path from 'node:path';
import { type Dispatcher } from 'undici';
import { getLogger } from './logger';
import { BRDError, FSError } from './errors';
import { isStrArray } from './misc';
Expand Down Expand Up @@ -34,24 +35,34 @@ export const stringifyResults = (

export const getFilename = (
filename: string | void,
format: ContentFormat,
format: string,
): string => {
if (filename) {
return path.extname(filename) ? filename : `${filename}.${format}`;
}
return `brightdata_content_${Date.now()}.${format}`;
};

export const writeContent = async (content: string, filename: string) => {
export const getAbsAndEnsureDir = async (filename: string) => {
try {
const target = path.resolve(filename);
await fs.mkdir(path.dirname(target), { recursive: true });
return target;
} catch (e: unknown) {
const msg = `failed to create dirs ${filename}:`;
throw new FSError(`${msg} ${(e as Error).message}`);
}
};

export const writeContent = async (content: string, filename: string) => {
try {
const target = await getAbsAndEnsureDir(filename);
logger.info(`writing ${target}`);

await fs.mkdir(path.dirname(target), { recursive: true });
await fs.writeFile(target, content, 'utf8');
const stats = await statSafe(target);

if (!stats) throw new Error('file was not created successfully');
if (!stats) throw new FSError('file was not created successfully');

logger.info(`written successfully: ${target} (${stats.size} bytes)`);
return target;
Expand All @@ -73,3 +84,19 @@ export const writeContent = async (content: string, filename: string) => {
throw new FSError(`${msg} ${err.message}`);
}
};

export interface WritingOpaque {
filename: string;
assertStatus?: (status: number) => void;
}

export const routeDownloadStream: Dispatcher.StreamFactory<WritingOpaque> = ({
statusCode,
opaque,
}) => {
const { assertStatus, filename } = opaque;

assertStatus?.(statusCode);

return createWriteStream(filename);
};
6 changes: 6 additions & 0 deletions src/utils/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ export const dropEmptyKeys = (obj: Record<string, unknown>) => {

export const maskKey = (key: string) =>
key.length > 8 ? `***${key.slice(-4)}` : '***';

export const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));

export const getRandomInt = (min: number, max: number) =>
Math.floor(Math.random() * (max - min + 1)) + min;
61 changes: 38 additions & 23 deletions src/utils/net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {
Agent,
interceptors,
request as lib_request,
stream as lib_stream,
} from 'undici';
import type { UrlObject } from 'node:url';
import {
DEFAULT_TIMEOUT,
MAX_RETRIES,
Expand Down Expand Up @@ -33,25 +35,50 @@ export const getDispatcher = (params: GetDispatcherOptions = {}) => {
);
};

export const request: typeof lib_request = async (url, opts) => {
const log = (
method = 'GET',
url: string | URL | UrlObject,
query?: Record<string, unknown>,
body?: unknown,
) => {
let meta = '';

if (opts?.query) {
meta += ` query=${JSON.stringify(opts.query)}`;
if (query) {
meta += ` query=${JSON.stringify(query)}`;
}
if (opts?.body) {

if (typeof body === 'string') {
if (meta) meta += ' ';
meta += 'body=';
meta +=
typeof opts.body === 'string'
? opts.body
: JSON.stringify(opts.body);
meta += `body=${body}`;
}

logRequest(opts?.method || 'GET', JSON.stringify(url), meta);
logRequest(method, JSON.stringify(url), meta);
};

export const request: typeof lib_request = async (url, opts) => {
log(opts?.method, url, opts?.query, opts?.body);
return lib_request(url, opts);
};

export const stream: typeof lib_stream = async (url, opts, factory) => {
log(opts?.method, url, opts?.query, opts?.body);
return lib_stream(url, opts, factory);
};

export function throwInvalidStatus(status: number, responseTxt: string): never {
if (status === 401) {
throw new AuthenticationError(
'invalid API key or insufficient permissions',
);
}

if (status === 400) {
throw new ValidationError(`bad request: ${responseTxt}`);
}

throw new APIError(`request failed`, status, responseTxt);
}

export async function assertResponse(
response: Dispatcher.ResponseData,
parse?: true,
Expand All @@ -68,17 +95,5 @@ export async function assertResponse(
return parse ? await response.body.text() : response.body;
}

if (response.statusCode === 401) {
throw new AuthenticationError(
'invalid API key or insufficient permissions',
);
}

const responseTxt = await response.body.text();

if (response.statusCode === 400) {
throw new ValidationError(`bad request: ${responseTxt}`);
}

throw new APIError(`request failed`, response.statusCode, responseTxt);
throwInvalidStatus(response.statusCode, await response.body.text());
}