diff --git a/.changeset/retry-transient-errors-js.md b/.changeset/retry-transient-errors-js.md new file mode 100644 index 0000000000..07a46aad35 --- /dev/null +++ b/.changeset/retry-transient-errors-js.md @@ -0,0 +1,5 @@ +--- +'e2b': minor +--- + +feat(js-sdk): automatically retry requests on transient failures. The SDK now retries on connection errors and `429`/`502`/`503`/`504` responses using exponential backoff with jitter, and honors a server-provided `Retry-After` header (so rate limiting is handled transparently). Idempotent requests are retried on any transient failure; non-idempotent requests (e.g. `Sandbox.create`) are only retried when the server provably did not process them (e.g. throttling). Configure via the new `retries` option (or `E2B_MAX_RETRIES` env var); set `retries: 0` to disable. Defaults to `3` retries. diff --git a/.changeset/retry-transient-errors-python.md b/.changeset/retry-transient-errors-python.md new file mode 100644 index 0000000000..1cb2bfef9e --- /dev/null +++ b/.changeset/retry-transient-errors-python.md @@ -0,0 +1,5 @@ +--- +'@e2b/python-sdk': minor +--- + +feat(python-sdk): automatically retry requests on transient failures. The SDK now retries on connection errors and `429`/`502`/`503`/`504` responses using exponential backoff with jitter, and honors a server-provided `Retry-After` header (so rate limiting is handled transparently). Idempotent requests are retried on any transient failure; non-idempotent requests (e.g. `Sandbox.create`) are only retried when the server provably did not process them (e.g. throttling). The envd RPC retry now also uses backoff between attempts. Configure via the new `retries` option (or `E2B_MAX_RETRIES` env var); set `retries=0` to disable. Defaults to `3` retries. diff --git a/packages/js-sdk/src/api/index.ts b/packages/js-sdk/src/api/index.ts index afb6fe81de..98a4245565 100644 --- a/packages/js-sdk/src/api/index.ts +++ b/packages/js-sdk/src/api/index.ts @@ -6,6 +6,7 @@ import { createApiFetch } from './http2' import { ConnectionConfig } from '../connectionConfig' import { AuthenticationError, RateLimitError, SandboxError } from '../errors' import { createApiLogger } from '../logs' +import { withRetry } from '../retry' const API_KEY_PATTERN = /^e2b_[0-9a-f]+$/ const API_KEY_EXAMPLE = `e2b_${'0'.repeat(40)}` @@ -95,7 +96,7 @@ class ApiClient { this.api = createClient({ baseUrl: config.apiUrl, - fetch: createApiFetch(), + fetch: withRetry(createApiFetch(), config.retries), // In HTTP 1.1, all connections are considered persistent unless declared otherwise // keepalive: true, headers: { diff --git a/packages/js-sdk/src/connectionConfig.ts b/packages/js-sdk/src/connectionConfig.ts index a1004b5c1d..58392c6b9d 100644 --- a/packages/js-sdk/src/connectionConfig.ts +++ b/packages/js-sdk/src/connectionConfig.ts @@ -1,6 +1,7 @@ import { Logger } from './logs' import { getEnvVar, version } from './api/metadata' import { runtime } from './utils' +import { resolveMaxRetries } from './retry' // Remove once all deployments support sandbox subdomains const supportedDomains = ['e2b.app', 'e2b.dev', 'e2b.pro', 'e2b-staging.dev'] @@ -57,6 +58,19 @@ export interface ConnectionOpts { * @default 60_000 // 60 seconds */ requestTimeoutMs?: number + /** + * Number of times to retry a request after a transient failure (e.g. a + * network error, a `429` rate-limit, or a `502`/`503`/`504`). Retries use + * exponential backoff with jitter and honor a server-provided `Retry-After` + * header. Non-idempotent requests (e.g. creating a sandbox) are only retried + * when the server provably did not process the request (e.g. throttling, a + * refused connection, or a DNS failure), avoiding duplicate side effects. + * + * Set to `0` to disable retries. + * + * @default E2B_MAX_RETRIES // environment variable or `3` + */ + retries?: number /** * Logger to use for logging messages. It can accept any object that implements `Logger` interface—for example, {@link console}. */ @@ -180,6 +194,7 @@ export class ConnectionConfig { readonly logger?: Logger readonly requestTimeoutMs: number + readonly retries: number readonly apiKey?: string readonly accessToken?: string @@ -192,6 +207,7 @@ export class ConnectionConfig { this.domain = opts?.domain || ConnectionConfig.domain this.accessToken = opts?.accessToken || ConnectionConfig.accessToken this.requestTimeoutMs = opts?.requestTimeoutMs ?? REQUEST_TIMEOUT_MS + this.retries = resolveMaxRetries(opts?.retries) this.logger = opts?.logger this.headers = opts?.headers || {} this.headers['User-Agent'] = `e2b-js-sdk/${version}` diff --git a/packages/js-sdk/src/retry.ts b/packages/js-sdk/src/retry.ts new file mode 100644 index 0000000000..d51fa7f978 --- /dev/null +++ b/packages/js-sdk/src/retry.ts @@ -0,0 +1,352 @@ +import { parseIntEnv } from './api/metadata' +import { wait } from './utils' + +/** Default number of *retries* (i.e. attempts after the first). */ +export const DEFAULT_MAX_RETRIES = 3 + +/** Base for the exponential backoff, in milliseconds. */ +const DEFAULT_BACKOFF_BASE_MS = 100 +/** Upper bound for a single backoff delay, in milliseconds. */ +const DEFAULT_BACKOFF_CAP_MS = 8_000 +/** + * Upper bound (in bytes) on request bodies we are willing to buffer in memory + * so the request can be replayed across retries. Larger bodies (e.g. file + * uploads) are sent once and not retried. + */ +const MAX_REPLAYABLE_BODY_BYTES = 1024 * 1024 // 1 MiB + +/** + * HTTP methods that are idempotent per the HTTP spec and can therefore be + * retried on any transient failure. + */ +const IDEMPOTENT_METHODS = new Set(['GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE']) + +/** + * A transient failure is either: + * - `rejected`: the server demonstrably did NOT process the request (e.g. it + * was throttled, or the connection never reached the server). Replaying is + * always safe, even for non-idempotent requests. + * - `ambiguous`: the request may or may not have been processed (e.g. the + * connection dropped after sending). Replaying is only safe for idempotent + * requests. + */ +type FailureKind = 'rejected' | 'ambiguous' + +/** + * HTTP status codes that indicate a transient failure worth retrying, mapped to + * whether the request was definitely not processed (`rejected`) or might have + * been (`ambiguous`). + * + * `500` is intentionally excluded because it is frequently a deterministic + * server-side error rather than a transient one. + * + * Keep in sync with `_RETRYABLE_STATUS` in the Python SDK (`e2b/_retry.py`). + */ +export const RETRYABLE_STATUS: ReadonlyMap = new Map([ + [408, 'ambiguous'], // request timeout + [429, 'rejected'], // throttled — not processed + [502, 'ambiguous'], // bad gateway + [503, 'ambiguous'], // service unavailable — may be returned after processing + [504, 'ambiguous'], // gateway timeout +]) + +/** + * Lower-cased error codes (from `error.code` or `error.cause.code`) that + * indicate a transient, connection-level failure that is safe to retry, mapped + * to whether the request could have reached the server. + */ +export const RETRYABLE_ERROR_CODES: ReadonlyMap = new Map([ + ['econnrefused', 'rejected'], // never accepted the connection + ['enotfound', 'rejected'], // DNS failure — never reached server + ['eai_again', 'rejected'], // DNS failure — never reached server + ['enetunreach', 'rejected'], + ['ehostunreach', 'rejected'], + ['econnreset', 'ambiguous'], // dropped mid-flight + ['epipe', 'ambiguous'], + ['etimedout', 'ambiguous'], +]) + +export interface RetryPolicy { + /** Number of retries (attempts after the first). `0` disables retries. */ + retries: number + backoffBaseMs: number + backoffCapMs: number +} + +/** + * Resolve the configured number of retries, falling back to the + * `E2B_MAX_RETRIES` environment variable and finally + * {@link DEFAULT_MAX_RETRIES}. + */ +export function resolveMaxRetries(retries?: number): number { + if (retries !== undefined) { + if (!Number.isInteger(retries) || retries < 0) { + throw new Error( + `Invalid retries=${retries}: expected a non-negative integer.` + ) + } + return retries + } + + const fromEnv = parseIntEnv('E2B_MAX_RETRIES', DEFAULT_MAX_RETRIES) + if (fromEnv < 0) { + throw new Error( + `Invalid E2B_MAX_RETRIES=${fromEnv}: expected a non-negative integer.` + ) + } + return fromEnv +} + +function defaultPolicy(retries: number): RetryPolicy { + return { + retries, + backoffBaseMs: DEFAULT_BACKOFF_BASE_MS, + backoffCapMs: DEFAULT_BACKOFF_CAP_MS, + } +} + +/** + * Parse a `Retry-After` header value (either delta-seconds or an HTTP date) + * into a delay in milliseconds. Returns `undefined` when the value is missing + * or unparseable. + */ +export function parseRetryAfter( + value: string | null, + now: number = Date.now() +): number | undefined { + if (!value) return undefined + + const trimmed = value.trim() + // delta-seconds form + if (/^\d+$/.test(trimmed)) { + return Number.parseInt(trimmed, 10) * 1000 + } + + // HTTP-date form + const date = Date.parse(trimmed) + if (!Number.isNaN(date)) { + return Math.max(0, date - now) + } + + return undefined +} + +/** + * Compute the delay before the next attempt. A server-provided `Retry-After` + * takes precedence; otherwise an exponential backoff with full jitter is used. + */ +export function computeDelayMs( + attempt: number, + policy: RetryPolicy, + response?: Response +): number { + const retryAfter = parseRetryAfter( + response?.headers.get('retry-after') ?? null + ) + if (retryAfter !== undefined) { + return Math.min(retryAfter, policy.backoffCapMs * 4) + } + + const exp = Math.min(policy.backoffCapMs, policy.backoffBaseMs * 2 ** attempt) + // Full jitter: a random value in [0, exp]. + return Math.random() * exp +} + +function retryableStatusKind(status: number): FailureKind | undefined { + return RETRYABLE_STATUS.get(status) +} + +/** + * Classify a thrown error as a transient, connection-level failure (returning + * its {@link FailureKind}) or `undefined` when it is not retryable. User/timeout + * aborts are explicitly not retryable. + */ +export function retryableErrorKind(err: unknown): FailureKind | undefined { + if (isAbortError(err)) return undefined + + const codes: string[] = [] + let current: unknown = err + // Walk the `cause` chain (undici wraps the real error in `cause`). + for (let i = 0; i < 5 && current; i++) { + const e = current as { code?: unknown; cause?: unknown; name?: unknown } + if (typeof e.code === 'string') codes.push(e.code.toLowerCase()) + if (typeof e.name === 'string') codes.push(e.name.toLowerCase()) + current = e.cause + } + + for (const code of codes) { + const kind = RETRYABLE_ERROR_CODES.get(code) + if (kind) return kind + // undici low-level socket/transport errors are ambiguous mid-flight drops. + if (code.startsWith('und_err_') || code === 'fetch failed') { + return 'ambiguous' + } + } + + return undefined +} + +/** + * Single decision point for both the response and error paths: retry only when + * attempts remain, the failure is transient, and the failure is safe to replay + * for this request. `rejected` failures are always safe; `ambiguous` failures + * are only safe for idempotent requests. + */ +function shouldRetry( + kind: FailureKind | undefined, + attempt: number, + policy: RetryPolicy, + idempotent: boolean +): boolean { + if (kind === undefined || attempt >= policy.retries) return false + return kind === 'rejected' || idempotent +} + +function isAbortError(err: unknown): boolean { + const name = (err as { name?: unknown } | null)?.name + return name === 'AbortError' || name === 'TimeoutError' +} + +function isStreamLike(body: unknown): boolean { + return ( + typeof body === 'object' && + body !== null && + typeof (body as { getReader?: unknown }).getReader === 'function' + ) +} + +/** + * Buffer a request body up to {@link MAX_REPLAYABLE_BODY_BYTES} so the request + * can be replayed across retries. Reads the (cloned) body incrementally and + * bails out as soon as the cap is exceeded so we never buffer large uploads in + * memory. Returns `undefined` body for bodyless requests, and + * `replayable: false` when the body is too large to buffer safely (in which + * case the request is sent once, without retries). + */ +async function bufferBody( + request: Request +): Promise<{ replayable: boolean; body?: Uint8Array }> { + if ( + request.body === null || + request.method === 'GET' || + request.method === 'HEAD' + ) { + return { replayable: true } + } + + try { + const reader = request.clone().body!.getReader() + const chunks: Uint8Array[] = [] + let total = 0 + + for (;;) { + const { done, value } = await reader.read() + if (done) break + total += value.byteLength + if (total > MAX_REPLAYABLE_BODY_BYTES) { + // Too large to buffer for replay — the caller sends the pristine + // original once. We must NOT call `reader.cancel()` here: cancelling + // one branch of a teed body (`request.clone()`) never resolves while + // the other branch is unread, which would hang the request forever. + return { replayable: false } + } + chunks.push(value) + } + + const body = new Uint8Array(total) + let offset = 0 + for (const chunk of chunks) { + body.set(chunk, offset) + offset += chunk.byteLength + } + return { replayable: true, body } + } catch { + // Body could not be buffered (e.g. a one-shot stream) — do not retry. + return { replayable: false } + } +} + +/** + * Wrap a `fetch` implementation with automatic retries for transient failures. + * + * Behavior: + * - Retries on transient HTTP statuses ({@link RETRYABLE_STATUS}) and + * connection-level errors ({@link retryableErrorKind}). + * - Honors a server-provided `Retry-After` header; otherwise uses exponential + * backoff with full jitter. + * - Idempotent methods are retried on any transient failure. Non-idempotent + * methods (e.g. `POST`) are only retried on `rejected` failures where the + * server provably did not process the request. The request body is buffered + * (up to {@link MAX_REPLAYABLE_BODY_BYTES}) so it can be replayed. + * - Respects the request's `AbortSignal`: aborts (including request-timeout + * aborts) stop retrying immediately and the total timeout bounds the whole + * operation, not each attempt. + * + * @param innerFetch the underlying fetch to wrap + * @param retries the number of retries, resolved via {@link resolveMaxRetries} + * by the caller; `0` disables retries (the inner fetch is returned unwrapped). + */ +export function withRetry( + innerFetch: typeof fetch, + retries: number +): typeof fetch { + const policy = defaultPolicy(retries) + + if (policy.retries <= 0) { + return innerFetch + } + + return (async (input, init) => { + // A raw streaming body (e.g. connect-web server/bidi streams) cannot be + // replayed and would throw when constructing a Request without `duplex`. + // Send it once, unwrapped. + if (isStreamLike(init?.body)) { + return innerFetch(input, init) + } + + const request = new Request(input as RequestInfo, init) + const method = request.method.toUpperCase() + const idempotent = IDEMPOTENT_METHODS.has(method) + + const { replayable, body } = await bufferBody(request) + + // If we cannot safely replay the body, fall back to a single attempt. + // Forward the already-constructed `request` (not `input`/`init`), since + // constructing `request` above may have transferred/disturbed the original + // input's body. + if (!replayable) { + return innerFetch(request) + } + + const buildAttempt = (): Request => + body !== undefined ? new Request(request, { body }) : request.clone() + + let attempt = 0 + for (;;) { + try { + const response = await innerFetch(buildAttempt()) + + const kind = retryableStatusKind(response.status) + if (!shouldRetry(kind, attempt, policy, idempotent)) { + return response + } + + // Drain and discard the body so the connection can be reused. + await response.body?.cancel().catch(() => {}) + + const delay = computeDelayMs(attempt, policy, response) + await wait(delay, request.signal) + attempt++ + } catch (err) { + const kind = retryableErrorKind(err) + if (!shouldRetry(kind, attempt, policy, idempotent)) { + throw err + } + + const delay = computeDelayMs(attempt, policy) + await wait(delay, request.signal) + attempt++ + } + } + }) as typeof fetch +} diff --git a/packages/js-sdk/src/sandbox/index.ts b/packages/js-sdk/src/sandbox/index.ts index 92d4c24576..73d098b59b 100644 --- a/packages/js-sdk/src/sandbox/index.ts +++ b/packages/js-sdk/src/sandbox/index.ts @@ -10,6 +10,7 @@ import { import { EnvdApiClient, handleEnvdApiError } from '../envd/api' import { createEnvdFetch, createEnvdRpcFetch } from '../envd/http2' import { createRpcLogger } from '../logs' +import { withRetry } from '../retry' import { Commands, Pty } from './commands' import { Filesystem } from './filesystem' import { Git } from './git' @@ -159,8 +160,17 @@ export class Sandbox extends SandboxApi { 'E2b-Sandbox-Id': this.sandboxId, 'E2b-Sandbox-Port': this.envdPort.toString(), } - const envdFetch = createEnvdFetch() - const envdRpcFetch = createEnvdRpcFetch() + // Retries are conservative: idempotent requests retry on any transient + // failure, while non-idempotent ones (all RPCs are POST) only retry on + // `rejected` failures where the request was provably not processed. + const envdFetch = withRetry( + createEnvdFetch(), + this.connectionConfig.retries + ) + const envdRpcFetch = withRetry( + createEnvdRpcFetch(), + this.connectionConfig.retries + ) const rpcTransport = createConnectTransport({ baseUrl: this.envdApiUrl, diff --git a/packages/js-sdk/src/utils.ts b/packages/js-sdk/src/utils.ts index d571e91046..107a608d8a 100644 --- a/packages/js-sdk/src/utils.ts +++ b/packages/js-sdk/src/utils.ts @@ -103,8 +103,27 @@ export function stripAnsi(text: string): string { return text.replace(ansiRegex(), '') } -export async function wait(ms: number) { - return new Promise((resolve) => setTimeout(resolve, ms)) +/** + * Resolve after `ms` milliseconds. When an `AbortSignal` is provided, reject + * immediately if it is already aborted and reject (clearing the timer) if it + * aborts while waiting, using the signal's `reason`. + */ +export function wait(ms: number, signal?: AbortSignal | null): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason ?? new DOMException('Aborted', 'AbortError')) + return + } + const timer = setTimeout(() => { + signal?.removeEventListener('abort', onAbort) + resolve() + }, ms) + const onAbort = () => { + clearTimeout(timer) + reject(signal?.reason ?? new DOMException('Aborted', 'AbortError')) + } + signal?.addEventListener('abort', onAbort, { once: true }) + }) } /** diff --git a/packages/js-sdk/src/volume/client.ts b/packages/js-sdk/src/volume/client.ts index bbe527e158..107c72bca5 100644 --- a/packages/js-sdk/src/volume/client.ts +++ b/packages/js-sdk/src/volume/client.ts @@ -4,6 +4,7 @@ import type { components, paths } from './schema.gen' import { defaultHeaders, getEnvVar } from '../api/metadata' import { buildRequestSignal } from '../connectionConfig' import { createApiLogger, Logger } from '../logs' +import { resolveMaxRetries, withRetry } from '../retry' import type { Volume } from './index' const FILE_TIMEOUT_MS = 3_600_000 // 1 hour @@ -49,6 +50,17 @@ export interface VolumeApiOpts { */ headers?: Record + /** + * Number of times to retry a request after a transient failure (e.g. a + * network error or a `429`/`502`/`503`/`504`). Retries use exponential + * backoff with jitter and honor a server-provided `Retry-After` header. + * + * Set to `0` to disable retries. + * + * @default E2B_MAX_RETRIES // environment variable or `3` + */ + retries?: number + /** * An optional `AbortSignal` that can be used to cancel the in-flight request. * When the signal is aborted, the underlying `fetch` is aborted and the @@ -65,6 +77,7 @@ export class VolumeConnectionConfig { readonly headers?: Record readonly logger?: Logger readonly requestTimeoutMs?: number + readonly retries: number readonly signal?: AbortSignal constructor(volume: Volume, opts?: VolumeApiOpts) { @@ -78,6 +91,7 @@ export class VolumeConnectionConfig { this.headers = opts?.headers this.logger = opts?.logger this.requestTimeoutMs = opts?.requestTimeoutMs + this.retries = resolveMaxRetries(opts?.retries) this.signal = opts?.signal } @@ -110,6 +124,9 @@ class VolumeApiClient { constructor(config: VolumeConnectionConfig) { this.api = createClient({ baseUrl: config.apiUrl, + // Volume content uploads/downloads can be large; withRetry only retries + // small, replayable bodies. + fetch: withRetry(fetch, config.retries), headers: { ...defaultHeaders, ...(config.token && { Authorization: `Bearer ${config.token}` }), diff --git a/packages/js-sdk/tests/retry.test.ts b/packages/js-sdk/tests/retry.test.ts new file mode 100644 index 0000000000..ebf11f482f --- /dev/null +++ b/packages/js-sdk/tests/retry.test.ts @@ -0,0 +1,338 @@ +import { assert, test, describe, beforeEach, afterEach, vi } from 'vitest' +import { + parseRetryAfter, + computeDelayMs, + resolveMaxRetries, + retryableErrorKind, + withRetry, + RETRYABLE_STATUS, + RETRYABLE_ERROR_CODES, +} from '../src/retry' + +const policy = { retries: 3, backoffBaseMs: 500, backoffCapMs: 8_000 } + +/** + * Build a fake `fetch` that returns/throws the queued outcomes in order and + * records the requests it received. + */ +function fakeFetch( + outcomes: Array Response | Error)> +): { + fetch: typeof fetch + calls: Request[] +} { + const calls: Request[] = [] + let i = 0 + const fn = (async (input: RequestInfo | URL, init?: RequestInit) => { + const req = input instanceof Request ? input : new Request(input, init) + calls.push(req) + let outcome = outcomes[Math.min(i, outcomes.length - 1)] + i++ + if (typeof outcome === 'function') outcome = outcome() + if (outcome instanceof Error) throw outcome + return outcome + }) as typeof fetch + return { fetch: fn, calls } +} + +function err(code: string): Error { + const e = new Error(code) as Error & { code?: string } + e.code = code + return e +} + +describe('parseRetryAfter', () => { + test('parses delta-seconds', () => { + assert.equal(parseRetryAfter('2'), 2000) + assert.equal(parseRetryAfter('0'), 0) + }) + + test('parses HTTP-date', () => { + const now = Date.parse('2020-01-01T00:00:00Z') + const date = new Date(now + 5000).toUTCString() + assert.equal(parseRetryAfter(date, now), 5000) + }) + + test('returns undefined for missing/garbage', () => { + assert.equal(parseRetryAfter(null), undefined) + assert.equal(parseRetryAfter('soon'), undefined) + }) +}) + +describe('computeDelayMs', () => { + test('honors Retry-After over backoff', () => { + const res = new Response(null, { + status: 429, + headers: { 'retry-after': '2' }, + }) + assert.equal(computeDelayMs(0, policy, res), 2000) + }) + + test('exponential backoff with full jitter stays within bounds', () => { + for (let attempt = 0; attempt < 6; attempt++) { + const d = computeDelayMs(attempt, policy) + const exp = Math.min( + policy.backoffCapMs, + policy.backoffBaseMs * 2 ** attempt + ) + assert.ok(d >= 0 && d <= exp, `attempt ${attempt}: ${d} <= ${exp}`) + } + }) +}) + +describe('resolveMaxRetries', () => { + let original: string | undefined + beforeEach(() => { + original = process.env.E2B_MAX_RETRIES + }) + afterEach(() => { + if (original === undefined) delete process.env.E2B_MAX_RETRIES + else process.env.E2B_MAX_RETRIES = original + }) + + test('explicit value wins', () => { + assert.equal(resolveMaxRetries(5), 5) + assert.equal(resolveMaxRetries(0), 0) + }) + + test('falls back to env then default', () => { + delete process.env.E2B_MAX_RETRIES + assert.equal(resolveMaxRetries(), 3) + process.env.E2B_MAX_RETRIES = '7' + assert.equal(resolveMaxRetries(), 7) + }) + + test('rejects negative', () => { + assert.throws(() => resolveMaxRetries(-1)) + }) +}) + +describe('retryableErrorKind', () => { + test('classifies rejected vs ambiguous', () => { + assert.equal(retryableErrorKind(err('ECONNREFUSED')), 'rejected') + assert.equal(retryableErrorKind(err('ENOTFOUND')), 'rejected') + assert.equal(retryableErrorKind(err('ECONNRESET')), 'ambiguous') + assert.equal(retryableErrorKind(err('UND_ERR_SOCKET')), 'ambiguous') + }) + + test('walks the cause chain', () => { + const wrapped = new TypeError('fetch failed') + ;(wrapped as { cause?: unknown }).cause = err('ECONNRESET') + assert.equal(retryableErrorKind(wrapped), 'ambiguous') + }) + + test('aborts and unknown errors are not retryable', () => { + const abort = new DOMException('Aborted', 'AbortError') + assert.equal(retryableErrorKind(abort), undefined) + assert.equal(retryableErrorKind(new Error('boom')), undefined) + }) +}) + +describe('withRetry', () => { + beforeEach(() => { + // Make backoff instant so tests don't sleep. + vi.spyOn(Math, 'random').mockReturnValue(0) + }) + afterEach(() => { + vi.restoreAllMocks() + }) + + test('retries=0 returns the inner fetch unwrapped', () => { + const inner = (() => {}) as unknown as typeof fetch + assert.strictEqual(withRetry(inner, 0), inner) + }) + + test('retries a 503 then succeeds (GET, idempotent)', async () => { + const { fetch, calls } = fakeFetch([ + new Response(null, { status: 503 }), + new Response('ok', { status: 200 }), + ]) + const wrapped = withRetry(fetch, 3) + const res = await wrapped('https://api.test/sandboxes') + assert.equal(res.status, 200) + assert.equal(calls.length, 2) + }) + + test('honors Retry-After delay on 429', async () => { + const sleeps: number[] = [] + const realSetTimeout = globalThis.setTimeout + vi.spyOn(globalThis, 'setTimeout').mockImplementation((( + cb: () => void, + ms?: number + ) => { + sleeps.push(ms ?? 0) + return realSetTimeout(cb, 0) + }) as typeof setTimeout) + + const { fetch } = fakeFetch([ + new Response(null, { status: 429, headers: { 'retry-after': '2' } }), + new Response('ok', { status: 200 }), + ]) + const wrapped = withRetry(fetch, 3) + const res = await wrapped('https://api.test/sandboxes') + assert.equal(res.status, 200) + assert.deepEqual(sleeps, [2000]) + }) + + test('does not retry non-retryable status', async () => { + const { fetch, calls } = fakeFetch([new Response(null, { status: 400 })]) + const wrapped = withRetry(fetch, 3) + const res = await wrapped('https://api.test/sandboxes') + assert.equal(res.status, 400) + assert.equal(calls.length, 1) + }) + + test('exhausts retries and returns the last response', async () => { + const { fetch, calls } = fakeFetch([new Response(null, { status: 502 })]) + const wrapped = withRetry(fetch, 2) + const res = await wrapped('https://api.test/sandboxes') + assert.equal(res.status, 502) + assert.equal(calls.length, 3) // 1 + 2 retries + }) + + test('POST: 502 (ambiguous) is NOT retried', async () => { + const { fetch, calls } = fakeFetch([new Response(null, { status: 502 })]) + const wrapped = withRetry(fetch, 3) + const res = await wrapped('https://api.test/rpc', { + method: 'POST', + body: 'x', + }) + assert.equal(res.status, 502) + assert.equal(calls.length, 1) + }) + + test('POST: 429 (rejected) IS retried', async () => { + const { fetch, calls } = fakeFetch([ + new Response(null, { status: 429 }), + new Response('ok', { status: 200 }), + ]) + const wrapped = withRetry(fetch, 3) + const res = await wrapped('https://api.test/rpc', { + method: 'POST', + body: 'x', + }) + assert.equal(res.status, 200) + assert.equal(calls.length, 2) + }) + + test('POST: 503 (ambiguous) is NOT retried', async () => { + const { fetch, calls } = fakeFetch([new Response(null, { status: 503 })]) + const wrapped = withRetry(fetch, 3) + const res = await wrapped('https://api.test/rpc', { + method: 'POST', + body: 'x', + }) + assert.equal(res.status, 503) + assert.equal(calls.length, 1) + }) + + test('retries on a transient connection error then succeeds', async () => { + const { fetch, calls } = fakeFetch([ + err('ECONNRESET'), + new Response('ok', { status: 200 }), + ]) + const wrapped = withRetry(fetch, 3) + const res = await wrapped('https://api.test/sandboxes') + assert.equal(res.status, 200) + assert.equal(calls.length, 2) + }) + + test('does not retry on abort', async () => { + const { fetch, calls } = fakeFetch([ + new DOMException('Aborted', 'AbortError'), + ]) + const wrapped = withRetry(fetch, 3) + let threw = false + try { + await wrapped('https://api.test/sandboxes') + } catch { + threw = true + } + assert.ok(threw) + assert.equal(calls.length, 1) + }) + + test('streaming body is passed through without retry', async () => { + const { fetch, calls } = fakeFetch([new Response(null, { status: 503 })]) + const wrapped = withRetry(fetch, 3) + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk')) + controller.close() + }, + }) + const res = await wrapped('https://api.test/rpc', { + method: 'POST', + body: stream, + // @ts-expect-error duplex is required for stream bodies in undici + duplex: 'half', + }) + assert.equal(res.status, 503) + assert.equal(calls.length, 1) + }) + + test('large body exceeding the buffer cap is sent once without retry', async () => { + const { fetch, calls } = fakeFetch([ + new Response(null, { status: 429 }), + new Response('ok', { status: 200 }), + ]) + const wrapped = withRetry(fetch, 3) + // 2 MiB > MAX_REPLAYABLE_BODY_BYTES (1 MiB): not buffered, so not replayed + // even though 429 on a POST would normally be retried. + const body = new Uint8Array(2 * 1024 * 1024) + const res = await wrapped('https://api.test/rpc', { method: 'POST', body }) + assert.equal(res.status, 429) + assert.equal(calls.length, 1) + }) + + test('does not retry once the signal aborts between attempts', async () => { + const controller = new AbortController() + const { fetch, calls } = fakeFetch([ + // First attempt fails transiently, then we abort during the backoff. + () => { + controller.abort() + return new Response(null, { status: 429 }) + }, + new Response('ok', { status: 200 }), + ]) + const wrapped = withRetry(fetch, 3) + let threw = false + try { + await wrapped('https://api.test/sandboxes', { signal: controller.signal }) + } catch { + threw = true + } + assert.ok(threw) + assert.equal(calls.length, 1) + }) +}) + +describe('classification tables (parity with Python e2b/_retry.py)', () => { + test('RETRYABLE_STATUS matches the agreed policy', () => { + assert.deepEqual( + [...RETRYABLE_STATUS.entries()].sort((a, b) => a[0] - b[0]), + [ + [408, 'ambiguous'], + [429, 'rejected'], + [502, 'ambiguous'], + [503, 'ambiguous'], + [504, 'ambiguous'], + ] + ) + // 500 is intentionally not retryable. + assert.equal(RETRYABLE_STATUS.has(500), false) + }) + + test('RETRYABLE_ERROR_CODES matches the agreed policy', () => { + assert.deepEqual([...RETRYABLE_ERROR_CODES.entries()].sort(), [ + ['eai_again', 'rejected'], + ['econnrefused', 'rejected'], + ['econnreset', 'ambiguous'], + ['ehostunreach', 'rejected'], + ['enetunreach', 'rejected'], + ['enotfound', 'rejected'], + ['epipe', 'ambiguous'], + ['etimedout', 'ambiguous'], + ]) + }) +}) diff --git a/packages/python-sdk/e2b/_retry.py b/packages/python-sdk/e2b/_retry.py new file mode 100644 index 0000000000..a38ac0c561 --- /dev/null +++ b/packages/python-sdk/e2b/_retry.py @@ -0,0 +1,289 @@ +"""Shared retry primitives for the E2B SDK. + +Provides transient-failure classification, ``Retry-After`` parsing, exponential +backoff with jitter, and retry drivers used by the httpx transports +(control-plane and volume REST). +""" + +import asyncio +import os +import random +import time +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime +from typing import Awaitable, Callable, Optional + +import httpx + +#: Default number of *retries* (i.e. attempts after the first). +DEFAULT_MAX_RETRIES = 3 + +_BACKOFF_BASE_SEC = 0.1 +_BACKOFF_CAP_SEC = 8.0 + +#: HTTP methods that are idempotent per the HTTP spec and can be retried on any +#: transient failure. +_IDEMPOTENT_METHODS = frozenset({"GET", "HEAD", "OPTIONS", "PUT", "DELETE"}) + +# A transient failure is either ``rejected`` (the server demonstrably did not +# process the request, so replaying is always safe) or ``ambiguous`` (the +# request may have been processed, so replaying is only safe for idempotent +# requests). +_REJECTED = "rejected" +_AMBIGUOUS = "ambiguous" + +# ``500`` is intentionally excluded as it is frequently a deterministic error. +# Keep in sync with ``RETRYABLE_STATUS`` in the JS SDK (``src/retry.ts``). +_RETRYABLE_STATUS = { + 408: _AMBIGUOUS, # request timeout + 429: _REJECTED, # throttled — not processed + 502: _AMBIGUOUS, # bad gateway + 503: _AMBIGUOUS, # service unavailable — may be returned after processing + 504: _AMBIGUOUS, # gateway timeout +} + + +def resolve_max_retries(retries: Optional[int]) -> int: + """Resolve the configured number of retries, falling back to the + ``E2B_MAX_RETRIES`` environment variable and finally + :data:`DEFAULT_MAX_RETRIES`. + """ + if retries is not None: + # ``bool`` is a subclass of ``int`` but is not a valid retry count. + if isinstance(retries, bool) or not isinstance(retries, int) or retries < 0: + raise ValueError( + f"Invalid retries={retries!r}: expected a non-negative integer." + ) + return retries + + raw = os.getenv("E2B_MAX_RETRIES") + if raw is None or raw == "": + return DEFAULT_MAX_RETRIES + + value = int(raw) + if value < 0: + raise ValueError( + f"Invalid E2B_MAX_RETRIES={raw}: expected a non-negative integer." + ) + return value + + +def parse_retry_after( + value: Optional[str], now: Optional[datetime] = None +) -> Optional[float]: + """Parse a ``Retry-After`` header value (delta-seconds or HTTP date) into a + delay in seconds. Returns ``None`` when missing or unparseable. + """ + if not value: + return None + + trimmed = value.strip() + if trimmed.isdigit(): + return float(int(trimmed)) + + try: + date = parsedate_to_datetime(trimmed) + except (TypeError, ValueError): + return None + if date is None: + return None + if date.tzinfo is None: + date = date.replace(tzinfo=timezone.utc) + + current = now or datetime.now(timezone.utc) + return max(0.0, (date - current).total_seconds()) + + +def compute_delay( + attempt: int, + retry_after: Optional[float] = None, + base: float = _BACKOFF_BASE_SEC, + cap: float = _BACKOFF_CAP_SEC, +) -> float: + """Compute the delay (seconds) before the next attempt. A server-provided + ``Retry-After`` takes precedence; otherwise exponential backoff with full + jitter is used. + """ + if retry_after is not None: + return min(retry_after, cap * 4) + + exp = min(cap, base * (2**attempt)) + return random.uniform(0, exp) + + +def _status_kind(status: int) -> Optional[str]: + return _RETRYABLE_STATUS.get(status) + + +def classify_exception(exc: BaseException) -> Optional[str]: + """Classify a transport exception as a transient failure kind, or ``None`` + when it is not retryable. + """ + # Connection establishment failed — the request never reached the server. + if isinstance(exc, (httpx.ConnectError, httpx.ConnectTimeout)): + return _REJECTED + # The request may have been sent/processed before the failure. + if isinstance( + exc, + ( + httpx.ReadError, + httpx.ReadTimeout, + httpx.WriteError, + httpx.WriteTimeout, + httpx.PoolTimeout, + httpx.RemoteProtocolError, + ), + ): + return _AMBIGUOUS + return None + + +def _should_retry( + kind: Optional[str], attempt: int, retries: int, idempotent: bool +) -> bool: + """Single decision point for both the response and error paths: retry only + when attempts remain, the failure is transient, and the failure is safe to + replay for this request. ``rejected`` failures are always safe; ``ambiguous`` + failures are only safe for idempotent requests. + """ + if kind is None or attempt >= retries: + return False + return kind == _REJECTED or idempotent + + +def _is_replayable(request: httpx.Request) -> bool: + # A bodyless request (``_content == b""``) or one whose body has been + # buffered into bytes can be safely re-sent. A streaming (one-shot) body + # has ``_content is None`` and must not be retried — regardless of method, + # since DELETE/OPTIONS may also carry a streaming body. + return getattr(request, "_content", None) is not None + + +def _is_idempotent(request: httpx.Request) -> bool: + return request.method.upper() in _IDEMPOTENT_METHODS + + +def _operation_deadline(request: httpx.Request) -> Optional[float]: + """Absolute :func:`time.monotonic` deadline bounding the whole operation + (all attempts + backoff), derived from the per-request timeout httpx + attached to the request. Returns ``None`` when no timeout applies. + + This mirrors the JS SDK, where a single ``AbortSignal`` bounds the entire + retried operation rather than each individual attempt. + """ + ext = request.extensions.get("timeout") if request.extensions else None + if not ext: + return None + values = [v for v in ext.values() if v is not None] + if not values: + return None + return time.monotonic() + max(values) + + +def _clamp_timeout(request: httpx.Request, deadline: Optional[float]) -> None: + """Shrink the request's per-attempt timeout to the time remaining before + ``deadline`` so a single attempt cannot overrun the whole-operation budget. + """ + if deadline is None: + return + ext = request.extensions.get("timeout") + if not ext: + return + remaining = max(0.0, deadline - time.monotonic()) + request.extensions = { + **request.extensions, + "timeout": { + k: (min(v, remaining) if v is not None else v) for k, v in ext.items() + }, + } + + +def _can_retry_before_deadline(deadline: Optional[float], delay: float) -> bool: + """Whether a backoff of ``delay`` still leaves time for another attempt.""" + if deadline is None: + return True + return time.monotonic() + delay < deadline + + +def retry_request_sync( + request: httpx.Request, + send: Callable[[httpx.Request], httpx.Response], + retries: int, + sleep: Callable[[float], None] = time.sleep, +) -> httpx.Response: + """Drive a sync request with retries on transient failures.""" + if retries <= 0 or not _is_replayable(request): + return send(request) + + idempotent = _is_idempotent(request) + deadline = _operation_deadline(request) + attempt = 0 + while True: + try: + response = send(request) + except Exception as exc: + kind = classify_exception(exc) + delay = compute_delay(attempt) + if not _should_retry( + kind, attempt, retries, idempotent + ) or not _can_retry_before_deadline(deadline, delay): + raise + sleep(delay) + _clamp_timeout(request, deadline) + attempt += 1 + continue + + kind = _status_kind(response.status_code) + retry_after = parse_retry_after(response.headers.get("retry-after")) + delay = compute_delay(attempt, retry_after) + if not _should_retry( + kind, attempt, retries, idempotent + ) or not _can_retry_before_deadline(deadline, delay): + return response + + response.close() + sleep(delay) + _clamp_timeout(request, deadline) + attempt += 1 + + +async def retry_request_async( + request: httpx.Request, + send: Callable[[httpx.Request], Awaitable[httpx.Response]], + retries: int, + sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, +) -> httpx.Response: + """Drive an async request with retries on transient failures.""" + if retries <= 0 or not _is_replayable(request): + return await send(request) + + idempotent = _is_idempotent(request) + deadline = _operation_deadline(request) + attempt = 0 + while True: + try: + response = await send(request) + except Exception as exc: + kind = classify_exception(exc) + delay = compute_delay(attempt) + if not _should_retry( + kind, attempt, retries, idempotent + ) or not _can_retry_before_deadline(deadline, delay): + raise + await sleep(delay) + _clamp_timeout(request, deadline) + attempt += 1 + continue + + kind = _status_kind(response.status_code) + retry_after = parse_retry_after(response.headers.get("retry-after")) + delay = compute_delay(attempt, retry_after) + if not _should_retry( + kind, attempt, retries, idempotent + ) or not _can_retry_before_deadline(deadline, delay): + return response + + await response.aclose() + await sleep(delay) + _clamp_timeout(request, deadline) + attempt += 1 diff --git a/packages/python-sdk/e2b/api/client_async/__init__.py b/packages/python-sdk/e2b/api/client_async/__init__.py index 85300f5902..3382230278 100644 --- a/packages/python-sdk/e2b/api/client_async/__init__.py +++ b/packages/python-sdk/e2b/api/client_async/__init__.py @@ -6,6 +6,7 @@ from e2b.api import AsyncApiClient, limits from e2b.connection_config import ConnectionConfig +from e2b._retry import retry_request_async logger = logging.getLogger(__name__) @@ -19,9 +20,16 @@ def get_api_client(config: ConnectionConfig, **kwargs) -> AsyncApiClient: class AsyncTransportWithLogger(httpx.AsyncHTTPTransport): - _instances: Dict[Tuple[int, bool], "AsyncTransportWithLogger"] = {} + _instances: Dict[Tuple[int, bool, int], "AsyncTransportWithLogger"] = {} + + def __init__(self, *args, retries: int = 0, **kwargs): + self._retries = retries + super().__init__(*args, **kwargs) async def handle_async_request(self, request): + return await retry_request_async(request, self._send, self._retries) + + async def _send(self, request): url = f"{request.url.scheme}://{request.url.host}{request.url.path}" logger.info(f"Request: {request.method} {url}") response = await super().handle_async_request(request) @@ -39,7 +47,7 @@ def pool(self): def get_transport( config: ConnectionConfig, http2: bool = True ) -> AsyncTransportWithLogger: - loop_id = (id(asyncio.get_running_loop()), http2) + loop_id = (id(asyncio.get_running_loop()), http2, config.retries) if loop_id in AsyncTransportWithLogger._instances: return AsyncTransportWithLogger._instances[loop_id] @@ -48,6 +56,7 @@ def get_transport( limits=limits, proxy=config.proxy, http2=http2, + retries=config.retries, ) AsyncTransportWithLogger._instances[loop_id] = transport @@ -55,13 +64,13 @@ def get_transport( class AsyncEnvdTransportWithLogger(AsyncTransportWithLogger): - _instances: Dict[Tuple[int, bool], "AsyncEnvdTransportWithLogger"] = {} + _instances: Dict[Tuple[int, bool, int], "AsyncEnvdTransportWithLogger"] = {} def get_envd_transport( config: ConnectionConfig, http2: bool = True ) -> AsyncEnvdTransportWithLogger: - loop_id = (id(asyncio.get_running_loop()), http2) + loop_id = (id(asyncio.get_running_loop()), http2, config.retries) if loop_id in AsyncEnvdTransportWithLogger._instances: return AsyncEnvdTransportWithLogger._instances[loop_id] @@ -70,6 +79,7 @@ def get_envd_transport( limits=limits, proxy=config.proxy, http2=http2, + retries=config.retries, ) AsyncEnvdTransportWithLogger._instances[loop_id] = transport diff --git a/packages/python-sdk/e2b/api/client_sync/__init__.py b/packages/python-sdk/e2b/api/client_sync/__init__.py index 0e444df63d..420532252c 100644 --- a/packages/python-sdk/e2b/api/client_sync/__init__.py +++ b/packages/python-sdk/e2b/api/client_sync/__init__.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Dict, Tuple import httpx import logging @@ -6,6 +6,7 @@ from e2b.api import ApiClient, limits from e2b.connection_config import ConnectionConfig +from e2b._retry import retry_request_sync logger = logging.getLogger(__name__) @@ -19,9 +20,16 @@ def get_api_client(config: ConnectionConfig, **kwargs) -> ApiClient: class TransportWithLogger(httpx.HTTPTransport): - _instances: Dict[bool, "TransportWithLogger"] = {} + _instances: Dict[Tuple[bool, int], "TransportWithLogger"] = {} + + def __init__(self, *args, retries: int = 0, **kwargs): + self._retries = retries + super().__init__(*args, **kwargs) def handle_request(self, request): + return retry_request_sync(request, self._send, self._retries) + + def _send(self, request): url = f"{request.url.scheme}://{request.url.host}{request.url.path}" logger.info(f"Request: {request.method} {url}") response = super().handle_request(request) @@ -37,7 +45,8 @@ def pool(self): def get_transport(config: ConnectionConfig, http2: bool = True) -> TransportWithLogger: - cached = TransportWithLogger._instances.get(http2) + key = (http2, config.retries) + cached = TransportWithLogger._instances.get(key) if cached is not None: return cached @@ -45,8 +54,9 @@ def get_transport(config: ConnectionConfig, http2: bool = True) -> TransportWith limits=limits, proxy=config.proxy, http2=http2, + retries=config.retries, ) - TransportWithLogger._instances[http2] = transport + TransportWithLogger._instances[key] = transport return transport @@ -57,10 +67,11 @@ class EnvdTransportWithLogger(TransportWithLogger): def get_envd_transport( config: ConnectionConfig, http2: bool = True ) -> EnvdTransportWithLogger: - instances: Dict[bool, EnvdTransportWithLogger] = getattr( + instances: Dict[Tuple[bool, int], EnvdTransportWithLogger] = getattr( EnvdTransportWithLogger._thread_local, "instances", {} ) - cached = instances.get(http2) + key = (http2, config.retries) + cached = instances.get(key) if cached is not None: return cached @@ -68,7 +79,8 @@ def get_envd_transport( limits=limits, proxy=config.proxy, http2=http2, + retries=config.retries, ) - instances[http2] = transport + instances[key] = transport EnvdTransportWithLogger._thread_local.instances = instances return transport diff --git a/packages/python-sdk/e2b/connection_config.py b/packages/python-sdk/e2b/connection_config.py index 4189ccf313..66f423ee31 100644 --- a/packages/python-sdk/e2b/connection_config.py +++ b/packages/python-sdk/e2b/connection_config.py @@ -7,6 +7,7 @@ from e2b.api.metadata import package_version from e2b.sandbox_domains import is_supported_sandbox_domain +from e2b._retry import resolve_max_retries REQUEST_TIMEOUT: float = 60.0 # 60 seconds @@ -45,6 +46,9 @@ class ApiParams(TypedDict, total=False): sandbox_url: Optional[str] """URL to connect to sandbox, defaults to `E2B_SANDBOX_URL` environment variable.""" + retries: Optional[int] + """Number of times to retry a request after a transient failure (e.g. a network error, a `429` rate-limit, or a `502`/`503`/`504`). Retries use exponential backoff with jitter and honor a server-provided `Retry-After` header. Non-idempotent requests (e.g. creating a sandbox) are only retried when the server provably did not process the request (e.g. throttling, a refused connection, or a DNS failure), avoiding duplicate side effects. Set to `0` to disable retries. Defaults to `E2B_MAX_RETRIES` environment variable or `3`.""" + class ConnectionConfig: """ @@ -89,6 +93,7 @@ def __init__( headers: Optional[Dict[str, str]] = None, extra_sandbox_headers: Optional[Dict[str, str]] = None, proxy: Optional[ProxyTypes] = None, + retries: Optional[int] = None, ): self.domain = domain or ConnectionConfig._domain() self.debug = debug or ConnectionConfig._debug() @@ -100,6 +105,8 @@ def __init__( self.proxy = proxy + self.retries = resolve_max_retries(retries) + self.request_timeout = ConnectionConfig._get_request_timeout( REQUEST_TIMEOUT, request_timeout, @@ -197,6 +204,7 @@ def get_api_params( domain = opts.get("domain") debug = opts.get("debug") proxy = opts.get("proxy") + retries = opts.get("retries") req_headers = self.headers.copy() if headers is not None: @@ -211,6 +219,7 @@ def get_api_params( request_timeout=self.get_request_timeout(request_timeout), headers=req_headers, proxy=proxy if proxy is not None else self.proxy, + retries=retries if retries is not None else self.retries, ) ) diff --git a/packages/python-sdk/e2b/sandbox_async/commands/command.py b/packages/python-sdk/e2b/sandbox_async/commands/command.py index 32b75fd26b..8091d0577a 100644 --- a/packages/python-sdk/e2b/sandbox_async/commands/command.py +++ b/packages/python-sdk/e2b/sandbox_async/commands/command.py @@ -40,6 +40,7 @@ def __init__( async_pool=pool, json=True, headers=connection_config.sandbox_headers, + retries=connection_config.retries, ) async def list( diff --git a/packages/python-sdk/e2b/sandbox_async/commands/pty.py b/packages/python-sdk/e2b/sandbox_async/commands/pty.py index 3585b13246..cc4310dbde 100644 --- a/packages/python-sdk/e2b/sandbox_async/commands/pty.py +++ b/packages/python-sdk/e2b/sandbox_async/commands/pty.py @@ -42,6 +42,7 @@ def __init__( async_pool=pool, json=True, headers=connection_config.sandbox_headers, + retries=connection_config.retries, ) async def kill( diff --git a/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py b/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py index 309f4f6169..92976678f0 100644 --- a/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py +++ b/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py @@ -84,6 +84,7 @@ def __init__( async_pool=pool, json=True, headers=connection_config.sandbox_headers, + retries=connection_config.retries, ) @overload diff --git a/packages/python-sdk/e2b/sandbox_sync/commands/command.py b/packages/python-sdk/e2b/sandbox_sync/commands/command.py index 512b7d9923..bed8e64547 100644 --- a/packages/python-sdk/e2b/sandbox_sync/commands/command.py +++ b/packages/python-sdk/e2b/sandbox_sync/commands/command.py @@ -39,6 +39,7 @@ def __init__( pool=pool, json=True, headers=connection_config.sandbox_headers, + retries=connection_config.retries, ) def list( diff --git a/packages/python-sdk/e2b/sandbox_sync/commands/pty.py b/packages/python-sdk/e2b/sandbox_sync/commands/pty.py index fd936ef404..a98a9d22bd 100644 --- a/packages/python-sdk/e2b/sandbox_sync/commands/pty.py +++ b/packages/python-sdk/e2b/sandbox_sync/commands/pty.py @@ -38,6 +38,7 @@ def __init__( pool=pool, json=True, headers=connection_config.sandbox_headers, + retries=connection_config.retries, ) def kill( diff --git a/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py b/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py index b145200e41..5e2f36d35b 100644 --- a/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py +++ b/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py @@ -82,6 +82,7 @@ def __init__( pool=pool, json=True, headers=connection_config.sandbox_headers, + retries=connection_config.retries, ) @overload diff --git a/packages/python-sdk/e2b/volume/client_async/__init__.py b/packages/python-sdk/e2b/volume/client_async/__init__.py index f1e44a8a33..2304de563d 100644 --- a/packages/python-sdk/e2b/volume/client_async/__init__.py +++ b/packages/python-sdk/e2b/volume/client_async/__init__.py @@ -9,6 +9,7 @@ from e2b.exceptions import AuthenticationException from e2b.volume.client.client import AuthenticatedClient as AsyncVolumeApiClient from e2b.volume.connection_config import VolumeConnectionConfig +from e2b._retry import retry_request_async logger = logging.getLogger(__name__) @@ -45,7 +46,14 @@ def get_api_client(config: VolumeConnectionConfig, **kwargs) -> AsyncVolumeApiCl class AsyncTransportWithLogger(httpx.AsyncHTTPTransport): singleton: Optional["AsyncTransportWithLogger"] = None + def __init__(self, *args, retries: int = 0, **kwargs): + self._retries = retries + super().__init__(*args, **kwargs) + async def handle_async_request(self, request): + return await retry_request_async(request, self._send, self._retries) + + async def _send(self, request): url = f"{request.url.scheme}://{request.url.host}{request.url.path}" logger.info(f"Request: {request.method} {url}") response = await super().handle_async_request(request) @@ -58,12 +66,16 @@ def pool(self): def get_transport(config: VolumeConnectionConfig) -> AsyncTransportWithLogger: - if AsyncTransportWithLogger.singleton is not None: + if ( + AsyncTransportWithLogger.singleton is not None + and AsyncTransportWithLogger.singleton._retries == config.retries + ): return AsyncTransportWithLogger.singleton transport = AsyncTransportWithLogger( limits=limits, proxy=config.proxy, + retries=config.retries, ) AsyncTransportWithLogger.singleton = transport return transport diff --git a/packages/python-sdk/e2b/volume/client_sync/__init__.py b/packages/python-sdk/e2b/volume/client_sync/__init__.py index 5afdcfef2d..353cff957d 100644 --- a/packages/python-sdk/e2b/volume/client_sync/__init__.py +++ b/packages/python-sdk/e2b/volume/client_sync/__init__.py @@ -9,6 +9,7 @@ from e2b.exceptions import AuthenticationException from e2b.volume.client.client import AuthenticatedClient as VolumeApiClient from e2b.volume.connection_config import VolumeConnectionConfig +from e2b._retry import retry_request_sync logger = logging.getLogger(__name__) @@ -45,7 +46,14 @@ def get_api_client(config: VolumeConnectionConfig, **kwargs) -> VolumeApiClient: class TransportWithLogger(httpx.HTTPTransport): singleton: Optional["TransportWithLogger"] = None + def __init__(self, *args, retries: int = 0, **kwargs): + self._retries = retries + super().__init__(*args, **kwargs) + def handle_request(self, request): + return retry_request_sync(request, self._send, self._retries) + + def _send(self, request): url = f"{request.url.scheme}://{request.url.host}{request.url.path}" logger.info(f"Request: {request.method} {url}") response = super().handle_request(request) @@ -58,12 +66,16 @@ def pool(self): def get_transport(config: VolumeConnectionConfig) -> TransportWithLogger: - if TransportWithLogger.singleton is not None: + if ( + TransportWithLogger.singleton is not None + and TransportWithLogger.singleton._retries == config.retries + ): return TransportWithLogger.singleton transport = TransportWithLogger( limits=limits, proxy=config.proxy, + retries=config.retries, ) TransportWithLogger.singleton = transport return transport diff --git a/packages/python-sdk/e2b/volume/connection_config.py b/packages/python-sdk/e2b/volume/connection_config.py index f5a0b4aec3..011effad52 100644 --- a/packages/python-sdk/e2b/volume/connection_config.py +++ b/packages/python-sdk/e2b/volume/connection_config.py @@ -6,6 +6,7 @@ from typing_extensions import Unpack from e2b.api.metadata import package_version +from e2b._retry import resolve_max_retries REQUEST_TIMEOUT: float = 60.0 # 60 seconds FILE_TIMEOUT: float = 3600.0 # 1 hour @@ -37,6 +38,9 @@ class VolumeApiParams(TypedDict, total=False): proxy: Optional[ProxyTypes] """Proxy to use for the request.""" + retries: Optional[int] + """Number of times to retry a request after a transient failure (e.g. a network error, a `429` rate-limit, or a `502`/`503`/`504`). Retries use exponential backoff with jitter and honor a server-provided `Retry-After` header. Set to `0` to disable retries. Defaults to `E2B_MAX_RETRIES` environment variable or `3`.""" + class VolumeConnectionConfig: """ @@ -82,6 +86,7 @@ def __init__( request_timeout: Optional[float] = None, headers: Optional[Dict[str, str]] = None, proxy: Optional[ProxyTypes] = None, + retries: Optional[int] = None, ): self.domain = domain or self._domain() self.debug = debug if debug is not None else self._debug() @@ -94,6 +99,7 @@ def __init__( self.access_token = token or self._access_token() self.token = self.access_token self.proxy = proxy + self.retries = resolve_max_retries(retries) self.headers = headers or {} self.headers["User-Agent"] = f"e2b-python-sdk/{package_version}" @@ -119,6 +125,7 @@ def get_api_params( token = opts.get("token") api_url = opts.get("api_url") proxy = opts.get("proxy") + retries = opts.get("retries") req_headers = self.headers.copy() if headers is not None: @@ -133,5 +140,6 @@ def get_api_params( request_timeout=self.get_request_timeout(request_timeout), headers=req_headers, proxy=proxy if proxy is not None else self.proxy, + retries=retries if retries is not None else self.retries, ) ) diff --git a/packages/python-sdk/e2b/volume/volume_async.py b/packages/python-sdk/e2b/volume/volume_async.py index d7533d2916..10cfac6177 100644 --- a/packages/python-sdk/e2b/volume/volume_async.py +++ b/packages/python-sdk/e2b/volume/volume_async.py @@ -88,6 +88,7 @@ def _get_volume_config( request_timeout=opts.get("request_timeout"), headers=opts.get("headers"), proxy=opts.get("proxy"), + retries=opts.get("retries"), ) @classmethod diff --git a/packages/python-sdk/e2b/volume/volume_sync.py b/packages/python-sdk/e2b/volume/volume_sync.py index b56ff31ba5..cbdab1cabe 100644 --- a/packages/python-sdk/e2b/volume/volume_sync.py +++ b/packages/python-sdk/e2b/volume/volume_sync.py @@ -88,6 +88,7 @@ def _get_volume_config( request_timeout=opts.get("request_timeout"), headers=opts.get("headers"), proxy=opts.get("proxy"), + retries=opts.get("retries"), ) @classmethod diff --git a/packages/python-sdk/e2b_connect/client.py b/packages/python-sdk/e2b_connect/client.py index b41fedb03c..fe662568e8 100644 --- a/packages/python-sdk/e2b_connect/client.py +++ b/packages/python-sdk/e2b_connect/client.py @@ -1,12 +1,17 @@ +import asyncio import gzip import inspect import json +import random import struct +import time import typing from httpcore import ( ConnectionPool, AsyncConnectionPool, + ConnectError, + ConnectTimeout, RemoteProtocolError, Response, ) @@ -107,12 +112,67 @@ def make_error(error): return ConnectException(status, error.get("message", "")) +# Exponential backoff with full jitter, in seconds. +_BACKOFF_BASE_SEC = 0.1 +_BACKOFF_CAP_SEC = 8.0 +# Upper bound for a server-provided ``Retry-After``, matching +# ``e2b._retry.compute_delay`` (``cap * 4``), so a large header cannot block a +# call far beyond what callers expect. +_RETRY_AFTER_CAP_SEC = _BACKOFF_CAP_SEC * 4 + + +def _backoff_delay(attempt: int) -> float: + exp = min(_BACKOFF_CAP_SEC, _BACKOFF_BASE_SEC * (2**attempt)) + return random.uniform(0, exp) + + +def _retry_delay(retry_after: Optional[float], attempt: int) -> float: + """Delay before the next retry: a (capped) server ``Retry-After`` takes + precedence, otherwise exponential backoff with full jitter. + """ + if retry_after is not None: + return min(retry_after, _RETRY_AFTER_CAP_SEC) + return _backoff_delay(attempt) + + +# Connection-level failures the server provably did not process, so replaying a +# (non-idempotent) RPC is safe. Mirrors the ``rejected`` class in ``e2b._retry``. +# ``RemoteProtocolError`` is also retried for envd HTTP/2 idle-connection resets. +_REJECTED_EXCEPTIONS = (RemoteProtocolError, ConnectError, ConnectTimeout) + +# Transient HTTP status that is rejected before processing (safe to replay). +# ``502``/``503``/``504`` are ambiguous for a non-idempotent RPC and are not +# retried, matching the POST policy in ``e2b._retry``. +_REJECTED_STATUS = 429 + + +def _retry_after_seconds(headers) -> Optional[float]: + """Parse a ``Retry-After`` delta-seconds header (the form envd/edge emit) + from httpcore's ``(name, value)`` byte-tuple headers. Returns ``None`` when + absent or not a plain integer. + """ + for name, value in headers: + if name.lower() == b"retry-after": + text = value.decode("ascii", "ignore").strip() + return float(int(text)) if text.isdigit() else None + return None + + +def _resolve_retries(retries: Optional[int], args: tuple) -> int: + # When ``retries`` is None, read the count from ``self`` (the first arg). + if retries is not None: + return retries + return args[0]._connection_retries + + def _sync_retry(func, exc, retries): def retry(*args, **kwargs): - for _ in range(retries): + count = _resolve_retries(retries, args) + for attempt in range(count): try: return func(*args, **kwargs) except exc: + time.sleep(_backoff_delay(attempt)) continue return func(*args, **kwargs) @@ -122,10 +182,12 @@ def retry(*args, **kwargs): def _async_retry(func, exc, retries): async def retry(*args, **kwargs): - for _ in range(retries): + count = _resolve_retries(retries, args) + for attempt in range(count): try: return await func(*args, **kwargs) except exc: + await asyncio.sleep(_backoff_delay(attempt)) continue return await func(*args, **kwargs) @@ -133,7 +195,11 @@ async def retry(*args, **kwargs): return retry -def _retry(exc: typing.Type[Exception], retries: int): +def _retry(exc: typing.Type[Exception], retries: Optional[int] = None): + """Retry ``func`` on ``exc`` with exponential backoff. When ``retries`` is + None, the count is read from the instance's ``_connection_retries``. + """ + def decorator(func): if inspect.iscoroutinefunction(func): return _async_retry(func, exc, retries) @@ -188,6 +254,7 @@ def __init__( compressor=None, json: Optional[bool] = False, headers: Optional[Dict[str, str]] = None, + retries: int = 3, ): if headers is None: headers = {} @@ -199,7 +266,7 @@ def __init__( self._response_type = response_type self._compressor = compressor self._headers = headers - self._connection_retries = 3 + self._connection_retries = retries def _prepare_unary_request( self, @@ -263,7 +330,6 @@ def _process_unary_response( msg_type=self._response_type, ) - @_retry(RemoteProtocolError, 3) async def acall_unary( self, req, @@ -281,10 +347,26 @@ async def acall_unary( **opts, ) - res = await self.async_pool.request(**req_data) - return self._process_unary_response(res) + retries = self._connection_retries + attempt = 0 + while True: + try: + res = await self.async_pool.request(**req_data) + except _REJECTED_EXCEPTIONS: + if attempt >= retries: + raise + await asyncio.sleep(_backoff_delay(attempt)) + attempt += 1 + continue + + if res.status == _REJECTED_STATUS and attempt < retries: + delay = _retry_delay(_retry_after_seconds(res.headers), attempt) + await asyncio.sleep(delay) + attempt += 1 + continue + + return self._process_unary_response(res) - @_retry(RemoteProtocolError, 3) def call_unary( self, req, @@ -302,8 +384,25 @@ def call_unary( **opts, ) - res = self.pool.request(**req_data) - return self._process_unary_response(res) + retries = self._connection_retries + attempt = 0 + while True: + try: + res = self.pool.request(**req_data) + except _REJECTED_EXCEPTIONS: + if attempt >= retries: + raise + time.sleep(_backoff_delay(attempt)) + attempt += 1 + continue + + if res.status == _REJECTED_STATUS and attempt < retries: + delay = _retry_delay(_retry_after_seconds(res.headers), attempt) + time.sleep(delay) + attempt += 1 + continue + + return self._process_unary_response(res) def _create_stream_timeout(self, timeout: Optional[float]): if timeout: @@ -361,7 +460,7 @@ def _prepare_server_stream_request( }, } - @_retry(RemoteProtocolError, 3) + @_retry(RemoteProtocolError) async def acall_server_stream( self, req, @@ -395,7 +494,7 @@ async def acall_server_stream( for parsed in parser.parse(chunk): yield parsed - @_retry(RemoteProtocolError, 3) + @_retry(RemoteProtocolError) def call_server_stream( self, req, diff --git a/packages/python-sdk/tests/e2b_connect/test_client.py b/packages/python-sdk/tests/e2b_connect/test_client.py index ea1f272603..84e90b1d4f 100644 --- a/packages/python-sdk/tests/e2b_connect/test_client.py +++ b/packages/python-sdk/tests/e2b_connect/test_client.py @@ -1,8 +1,11 @@ import asyncio +from typing import cast +import httpcore import pytest +from httpcore import ConnectError, ConnectionPool, RemoteProtocolError -from e2b_connect.client import _retry +from e2b_connect.client import Client, ConnectException, _retry class GoodError(Exception): @@ -132,3 +135,146 @@ async def f(): result = await f() assert result is True assert total == 2 + + +class _FakeMsg: + def SerializeToString(self): + return b"" + + +class _FakePool: + def __init__(self): + self.calls = 0 + + def request(self, **kwargs): + self.calls += 1 + raise RemoteProtocolError("boom") + + +def test_client_honors_configured_retries(monkeypatch): + monkeypatch.setattr("e2b_connect.client.time.sleep", lambda _: None) + + pool = _FakePool() + client = Client( + pool=cast(ConnectionPool, pool), + url="http://api.test", + response_type=object, + retries=2, + ) + + with pytest.raises(RemoteProtocolError): + client.call_unary(_FakeMsg()) + + assert pool.calls == 3 + + +def test_client_retries_zero_disables_retries(monkeypatch): + monkeypatch.setattr("e2b_connect.client.time.sleep", lambda _: None) + + pool = _FakePool() + client = Client( + pool=cast(ConnectionPool, pool), + url="http://api.test", + response_type=object, + retries=0, + ) + + with pytest.raises(RemoteProtocolError): + client.call_unary(_FakeMsg()) + + assert pool.calls == 1 + + +class _StatusPool: + def __init__(self, status, headers=None): + self.status = status + self.headers = headers or [] + self.calls = 0 + + def request(self, **kwargs): + self.calls += 1 + # `httpcore.ConnectionPool.request` reads and closes before returning. + res = httpcore.Response(self.status, headers=self.headers, content=b"") + res.read() + return res + + +class _ConnErrorPool: + def __init__(self): + self.calls = 0 + + def request(self, **kwargs): + self.calls += 1 + raise ConnectError("refused") + + +def test_client_retries_429(monkeypatch): + monkeypatch.setattr("e2b_connect.client.time.sleep", lambda _: None) + + pool = _StatusPool(429, headers=[(b"retry-after", b"0")]) + client = Client( + pool=cast(ConnectionPool, pool), + url="http://api.test", + response_type=object, + retries=2, + ) + + # 429 is "rejected" — safe to replay even for a non-idempotent RPC. + with pytest.raises(ConnectException): + client.call_unary(_FakeMsg()) + + assert pool.calls == 3 + + +def test_client_caps_retry_after(monkeypatch): + slept: list = [] + monkeypatch.setattr("e2b_connect.client.time.sleep", slept.append) + + pool = _StatusPool(429, headers=[(b"retry-after", b"3600")]) + client = Client( + pool=cast(ConnectionPool, pool), + url="http://api.test", + response_type=object, + retries=1, + ) + + with pytest.raises(ConnectException): + client.call_unary(_FakeMsg()) + + # A huge Retry-After is capped (matches e2b._retry.compute_delay: cap * 4). + assert slept == [32.0] + + +def test_client_does_not_retry_ambiguous_502(monkeypatch): + monkeypatch.setattr("e2b_connect.client.time.sleep", lambda _: None) + + pool = _StatusPool(502) + client = Client( + pool=cast(ConnectionPool, pool), + url="http://api.test", + response_type=object, + retries=3, + ) + + # 502 is ambiguous for a non-idempotent RPC — must not be replayed. + with pytest.raises(ConnectException): + client.call_unary(_FakeMsg()) + + assert pool.calls == 1 + + +def test_client_retries_connect_error(monkeypatch): + monkeypatch.setattr("e2b_connect.client.time.sleep", lambda _: None) + + pool = _ConnErrorPool() + client = Client( + pool=cast(ConnectionPool, pool), + url="http://api.test", + response_type=object, + retries=2, + ) + + with pytest.raises(ConnectError): + client.call_unary(_FakeMsg()) + + assert pool.calls == 3 diff --git a/packages/python-sdk/tests/test_api_client_transport.py b/packages/python-sdk/tests/test_api_client_transport.py index 4a6b0dc488..604de7fdbe 100644 --- a/packages/python-sdk/tests/test_api_client_transport.py +++ b/packages/python-sdk/tests/test_api_client_transport.py @@ -26,7 +26,10 @@ def test_sync_api_client_proxy_uses_explicit_transport(test_api_key): try: assert "proxy" not in api_client._httpx_args - assert httpx_client._transport is TransportWithLogger._instances[True] + assert ( + httpx_client._transport + is TransportWithLogger._instances[(True, config.retries)] + ) assert httpx_client._mounts == {} finally: httpx_client.close() @@ -101,7 +104,7 @@ async def test_async_api_client_proxy_uses_explicit_transport(test_api_key): api_client = get_async_api_client(config) httpx_client = api_client.get_async_httpx_client() transport = AsyncTransportWithLogger._instances[ - (id(asyncio.get_running_loop()), True) + (id(asyncio.get_running_loop()), True, config.retries) ] try: diff --git a/packages/python-sdk/tests/test_connection_config.py b/packages/python-sdk/tests/test_connection_config.py index 85f108fe8b..eaff8d7e26 100644 --- a/packages/python-sdk/tests/test_connection_config.py +++ b/packages/python-sdk/tests/test_connection_config.py @@ -94,3 +94,21 @@ def test_sandbox_direct_url_uses_explicit_url_first(): config.get_sandbox_direct_url("sandbox-id", "e2b.app") == "https://sandbox.example.com" ) + + +def test_get_api_params_propagates_retries(monkeypatch): + monkeypatch.delenv("E2B_MAX_RETRIES", raising=False) + + config = ConnectionConfig(retries=0) + params = config.get_api_params() + + assert params["retries"] == 0 + + +def test_get_api_params_retries_override(monkeypatch): + monkeypatch.delenv("E2B_MAX_RETRIES", raising=False) + + config = ConnectionConfig(retries=3) + params = config.get_api_params(retries=1) + + assert params["retries"] == 1 diff --git a/packages/python-sdk/tests/test_retry.py b/packages/python-sdk/tests/test_retry.py new file mode 100644 index 0000000000..4fc63c81cf --- /dev/null +++ b/packages/python-sdk/tests/test_retry.py @@ -0,0 +1,387 @@ +import httpx +import pytest + +from e2b._retry import ( + _RETRYABLE_STATUS, + classify_exception, + compute_delay, + parse_retry_after, + resolve_max_retries, + retry_request_async, + retry_request_sync, +) + + +def _response(status, headers=None): + return httpx.Response(status, headers=headers or {}) + + +class _Sender: + """Returns/raises queued outcomes in order and records requests.""" + + def __init__(self, outcomes): + self.outcomes = outcomes + self.calls = [] + self.i = 0 + + def __call__(self, request): + self.calls.append(request) + outcome = self.outcomes[min(self.i, len(self.outcomes) - 1)] + self.i += 1 + if isinstance(outcome, Exception): + raise outcome + return outcome + + +# --------------------------------------------------------------------------- +# resolve_max_retries +# --------------------------------------------------------------------------- + + +def test_resolve_max_retries_explicit(): + assert resolve_max_retries(5) == 5 + assert resolve_max_retries(0) == 0 + + +def test_resolve_max_retries_env_then_default(monkeypatch): + monkeypatch.delenv("E2B_MAX_RETRIES", raising=False) + assert resolve_max_retries(None) == 3 + monkeypatch.setenv("E2B_MAX_RETRIES", "7") + assert resolve_max_retries(None) == 7 + + +def test_resolve_max_retries_negative_raises(): + with pytest.raises(ValueError): + resolve_max_retries(-1) + + +def test_resolve_max_retries_non_integer_raises(): + with pytest.raises(ValueError): + resolve_max_retries(2.5) # type: ignore[arg-type] + with pytest.raises(ValueError): + resolve_max_retries(True) + + +# --------------------------------------------------------------------------- +# parse_retry_after / compute_delay +# --------------------------------------------------------------------------- + + +def test_parse_retry_after_seconds(): + assert parse_retry_after("2") == 2.0 + assert parse_retry_after("0") == 0.0 + + +def test_parse_retry_after_http_date(): + from datetime import datetime, timezone + + now = datetime(2020, 1, 1, tzinfo=timezone.utc) + future = "Wed, 01 Jan 2020 00:00:05 GMT" + assert parse_retry_after(future, now) == 5.0 + + +def test_parse_retry_after_invalid(): + assert parse_retry_after(None) is None + assert parse_retry_after("soon") is None + + +def test_compute_delay_honors_retry_after(): + assert compute_delay(0, retry_after=2.0) == 2.0 + + +def test_compute_delay_jitter_bounds(): + for attempt in range(6): + d = compute_delay(attempt, base=0.5, cap=8.0) + exp = min(8.0, 0.5 * (2**attempt)) + assert 0 <= d <= exp + + +# --------------------------------------------------------------------------- +# classify_exception +# --------------------------------------------------------------------------- + + +def test_classify_exception(): + assert classify_exception(httpx.ConnectError("x")) == "rejected" + assert classify_exception(httpx.ConnectTimeout("x")) == "rejected" + assert classify_exception(httpx.ReadError("x")) == "ambiguous" + assert classify_exception(httpx.RemoteProtocolError("x")) == "ambiguous" + assert classify_exception(ValueError("x")) is None + + +# --------------------------------------------------------------------------- +# retry_request_sync +# --------------------------------------------------------------------------- + + +def _no_sleep(_): + pass + + +def test_sync_retries_503_then_succeeds(): + sender = _Sender([_response(503), _response(200)]) + req = httpx.Request("GET", "http://api.test/sandboxes") + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 200 + assert len(sender.calls) == 2 + + +def test_sync_honors_retry_after(): + sleeps = [] + sender = _Sender([_response(429, {"retry-after": "2"}), _response(200)]) + req = httpx.Request("GET", "http://api.test/sandboxes") + res = retry_request_sync(req, sender, retries=3, sleep=sleeps.append) + assert res.status_code == 200 + assert sleeps == [2.0] + + +def test_sync_does_not_retry_non_retryable_status(): + sender = _Sender([_response(400)]) + req = httpx.Request("GET", "http://api.test/sandboxes") + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 400 + assert len(sender.calls) == 1 + + +def test_sync_exhausts_and_returns_last(): + sender = _Sender([_response(502)]) + req = httpx.Request("GET", "http://api.test/sandboxes") + res = retry_request_sync(req, sender, retries=2, sleep=_no_sleep) + assert res.status_code == 502 + assert len(sender.calls) == 3 + + +def test_sync_post_does_not_retry_ambiguous(): + sender = _Sender([_response(502)]) + req = httpx.Request("POST", "http://api.test/rpc", content=b"x") + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 502 + assert len(sender.calls) == 1 + + +def test_sync_post_retries_rejected(): + sender = _Sender([_response(429), _response(200)]) + req = httpx.Request("POST", "http://api.test/rpc", content=b"x") + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 200 + assert len(sender.calls) == 2 + + +def test_sync_post_does_not_retry_503_ambiguous(): + sender = _Sender([_response(503)]) + req = httpx.Request("POST", "http://api.test/rpc", content=b"x") + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 503 + assert len(sender.calls) == 1 + + +def test_sync_retries_on_connection_error(): + sender = _Sender([httpx.ConnectError("boom"), _response(200)]) + req = httpx.Request("GET", "http://api.test/sandboxes") + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 200 + assert len(sender.calls) == 2 + + +def test_sync_does_not_retry_unknown_error(): + sender = _Sender([ValueError("nope")]) + req = httpx.Request("GET", "http://api.test/sandboxes") + with pytest.raises(ValueError): + retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert len(sender.calls) == 1 + + +def test_sync_streaming_body_not_retried(): + def gen(): + yield b"chunk" + + sender = _Sender([_response(503)]) + req = httpx.Request("POST", "http://api.test/rpc", content=gen()) + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 503 + assert len(sender.calls) == 1 + + +def test_sync_delete_with_streaming_body_not_retried(): + def gen(): + yield b"chunk" + + # DELETE may carry a one-shot streaming body; it must not be replayed. + sender = _Sender([_response(503), _response(200)]) + req = httpx.Request("DELETE", "http://api.test/x", content=gen()) + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 503 + assert len(sender.calls) == 1 + + +def test_sync_delete_without_body_is_retried(): + sender = _Sender([_response(503), _response(200)]) + req = httpx.Request("DELETE", "http://api.test/x") + res = retry_request_sync(req, sender, retries=3, sleep=_no_sleep) + assert res.status_code == 200 + assert len(sender.calls) == 2 + + +def test_sync_first_attempt_exhausting_timeout_stops_retries(monkeypatch): + clock = {"t": 0.0} + monkeypatch.setattr("e2b._retry.time.monotonic", lambda: clock["t"]) + + class _TimingSender: + def __init__(self): + self.calls = [] + + def __call__(self, request): + self.calls.append(request) + clock["t"] += 0.3 # each attempt consumes the whole timeout budget + raise httpx.ReadTimeout("timeout", request=request) + + sender = _TimingSender() + req = httpx.Request( + "GET", + "http://api.test/x", + extensions={ + "timeout": {"connect": 0.3, "read": 0.3, "write": 0.3, "pool": 0.3} + }, + ) + with pytest.raises(httpx.ReadTimeout): + retry_request_sync( + req, + sender, + retries=5, + sleep=lambda d: clock.__setitem__("t", clock["t"] + d), + ) + # The first attempt exhausts the 0.3s budget, so no retries are attempted. + assert len(sender.calls) == 1 + + +def test_sync_retries_within_timeout_budget(monkeypatch): + clock = {"t": 0.0} + monkeypatch.setattr("e2b._retry.time.monotonic", lambda: clock["t"]) + + sender = _Sender([_response(503), _response(503), _response(200)]) + req = httpx.Request( + "GET", + "http://api.test/x", + extensions={ + "timeout": { + "connect": 100.0, + "read": 100.0, + "write": 100.0, + "pool": 100.0, + } + }, + ) + res = retry_request_sync( + req, + sender, + retries=5, + sleep=lambda d: clock.__setitem__("t", clock["t"] + d), + ) + # Plenty of budget: retries proceed normally. + assert res.status_code == 200 + assert len(sender.calls) == 3 + + +def test_sync_retries_zero_single_attempt(): + sender = _Sender([_response(503)]) + req = httpx.Request("GET", "http://api.test/sandboxes") + res = retry_request_sync(req, sender, retries=0, sleep=_no_sleep) + assert res.status_code == 503 + assert len(sender.calls) == 1 + + +# --------------------------------------------------------------------------- +# retry_request_async +# --------------------------------------------------------------------------- + + +class _AsyncSender: + def __init__(self, outcomes): + self.outcomes = outcomes + self.calls = [] + self.i = 0 + + async def __call__(self, request): + self.calls.append(request) + outcome = self.outcomes[min(self.i, len(self.outcomes) - 1)] + self.i += 1 + if isinstance(outcome, Exception): + raise outcome + return outcome + + +async def _async_no_sleep(_): + pass + + +async def test_async_retries_503_then_succeeds(): + sender = _AsyncSender([_response(503), _response(200)]) + req = httpx.Request("GET", "http://api.test/sandboxes") + res = await retry_request_async(req, sender, retries=3, sleep=_async_no_sleep) + assert res.status_code == 200 + assert len(sender.calls) == 2 + + +async def test_async_post_not_retried_ambiguous(): + sender = _AsyncSender([_response(502)]) + req = httpx.Request("POST", "http://api.test/rpc", content=b"x") + res = await retry_request_async(req, sender, retries=3, sleep=_async_no_sleep) + assert res.status_code == 502 + assert len(sender.calls) == 1 + + +async def test_async_delete_with_streaming_body_not_retried(): + def gen(): + yield b"chunk" + + sender = _AsyncSender([_response(503), _response(200)]) + req = httpx.Request("DELETE", "http://api.test/x", content=gen()) + res = await retry_request_async(req, sender, retries=3, sleep=_async_no_sleep) + assert res.status_code == 503 + assert len(sender.calls) == 1 + + +async def test_async_first_attempt_exhausting_timeout_stops_retries(monkeypatch): + clock = {"t": 0.0} + monkeypatch.setattr("e2b._retry.time.monotonic", lambda: clock["t"]) + + class _AsyncTimingSender: + def __init__(self): + self.calls = [] + + async def __call__(self, request): + self.calls.append(request) + clock["t"] += 0.3 + raise httpx.ReadTimeout("timeout", request=request) + + async def _advance(d): + clock["t"] += d + + sender = _AsyncTimingSender() + req = httpx.Request( + "GET", + "http://api.test/x", + extensions={ + "timeout": {"connect": 0.3, "read": 0.3, "write": 0.3, "pool": 0.3} + }, + ) + with pytest.raises(httpx.ReadTimeout): + await retry_request_async(req, sender, retries=5, sleep=_advance) + assert len(sender.calls) == 1 + + +# --------------------------------------------------------------------------- +# classification table (parity with JS src/retry.ts) +# --------------------------------------------------------------------------- + + +def test_retryable_status_table_matches_agreed_policy(): + assert _RETRYABLE_STATUS == { + 408: "ambiguous", + 429: "rejected", + 502: "ambiguous", + 503: "ambiguous", + 504: "ambiguous", + } + # 500 is intentionally not retryable. + assert 500 not in _RETRYABLE_STATUS diff --git a/packages/python-sdk/tests/test_volume_connection_config.py b/packages/python-sdk/tests/test_volume_connection_config.py index 421bea8138..26239dd96e 100644 --- a/packages/python-sdk/tests/test_volume_connection_config.py +++ b/packages/python-sdk/tests/test_volume_connection_config.py @@ -48,3 +48,21 @@ def test_volume_api_url_custom_domain(monkeypatch): def test_volume_api_url_custom_domain_in_args(): config = VolumeConnectionConfig(domain="custom.com") assert config.api_url == "https://api.custom.com" + + +def test_volume_get_api_params_propagates_retries(monkeypatch): + monkeypatch.delenv("E2B_MAX_RETRIES", raising=False) + + config = VolumeConnectionConfig(retries=0) + params = config.get_api_params() + + assert params["retries"] == 0 + + +def test_volume_get_api_params_retries_override(monkeypatch): + monkeypatch.delenv("E2B_MAX_RETRIES", raising=False) + + config = VolumeConnectionConfig(retries=3) + params = config.get_api_params(retries=1) + + assert params["retries"] == 1