From 99ab0fcbadcba4fa1b12edcd2240e5437f89d597 Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Tue, 24 Mar 2026 20:42:44 -0400 Subject: [PATCH 1/9] Add snapshot expiration parity Implement Task 1 from PY-166 by exposing snapshot creation timestamps, allowing omitted expiresAt values, and forwarding optional expiration values through sync and async sandbox snapshot creation. Add integration coverage for omitted request bodies, explicit expiration payloads including zero, public created_at access, and stopped sandbox state updates. --- src/vercel/_internal/sandbox/core.py | 9 +- src/vercel/_internal/sandbox/models.py | 2 +- src/vercel/sandbox/sandbox.py | 16 +- src/vercel/sandbox/snapshot.py | 14 +- tests/integration/test_sandbox_sync_async.py | 253 ++++++++++++++++++- 5 files changed, 283 insertions(+), 11 deletions(-) diff --git a/src/vercel/_internal/sandbox/core.py b/src/vercel/_internal/sandbox/core.py index bea5e66..9e306d5 100644 --- a/src/vercel/_internal/sandbox/core.py +++ b/src/vercel/_internal/sandbox/core.py @@ -410,9 +410,14 @@ async def extend_timeout(self, *, sandbox_id: str, duration: int) -> SandboxResp ) return SandboxResponse.model_validate(data) - async def create_snapshot(self, *, sandbox_id: str) -> CreateSnapshotResponse: + async def create_snapshot( + self, *, sandbox_id: str, expiration: int | None = None + ) -> CreateSnapshotResponse: + body = None if expiration is None else JSONBody({"expiration": expiration}) data = await self._request_client.request_json( - "POST", f"/v1/sandboxes/{sandbox_id}/snapshot" + "POST", + f"/v1/sandboxes/{sandbox_id}/snapshot", + body=body, ) return CreateSnapshotResponse.model_validate(data) diff --git a/src/vercel/_internal/sandbox/models.py b/src/vercel/_internal/sandbox/models.py index ef3e325..56dbf07 100644 --- a/src/vercel/_internal/sandbox/models.py +++ b/src/vercel/_internal/sandbox/models.py @@ -193,7 +193,7 @@ class Snapshot(BaseModel): region: str status: Literal["created", "deleted", "failed"] size_bytes: int = Field(alias="sizeBytes") - expires_at: int = Field(alias="expiresAt") + expires_at: int | None = Field(default=None, alias="expiresAt") created_at: int = Field(alias="createdAt") updated_at: int = Field(alias="updatedAt") diff --git a/src/vercel/sandbox/sandbox.py b/src/vercel/sandbox/sandbox.py index 9fb5dea..800744e 100644 --- a/src/vercel/sandbox/sandbox.py +++ b/src/vercel/sandbox/sandbox.py @@ -416,14 +416,17 @@ async def extend_timeout(self, duration: int) -> None: response = await self.client.extend_timeout(sandbox_id=self.sandbox.id, duration=duration) self.sandbox = response.sandbox - async def snapshot(self) -> AsyncSnapshot: + async def snapshot(self, *, expiration: int | None = None) -> AsyncSnapshot: """ Create a snapshot from this currently running sandbox. New sandboxes can then be created from this snapshot. Note: this sandbox will be stopped as part of the snapshot creation process. """ - response = await self.client.create_snapshot(sandbox_id=self.sandbox.id) + response = await self.client.create_snapshot( + sandbox_id=self.sandbox.id, + expiration=expiration, + ) self.sandbox = response.sandbox return AsyncSnapshot(client=self.client, snapshot=response.snapshot) @@ -776,14 +779,19 @@ def extend_timeout(self, duration: int) -> None: ) self.sandbox = response.sandbox - def snapshot(self) -> SnapshotClass: + def snapshot(self, *, expiration: int | None = None) -> SnapshotClass: """ Create a snapshot from this currently running sandbox. New sandboxes can then be created from this snapshot. Note: this sandbox will be stopped as part of the snapshot creation process. """ - response = iter_coroutine(self.client.create_snapshot(sandbox_id=self.sandbox.id)) + response = iter_coroutine( + self.client.create_snapshot( + sandbox_id=self.sandbox.id, + expiration=expiration, + ) + ) self.sandbox = response.sandbox return SnapshotClass(client=self.client, snapshot=response.snapshot) diff --git a/src/vercel/sandbox/snapshot.py b/src/vercel/sandbox/snapshot.py index bdc1350..b1430cb 100644 --- a/src/vercel/sandbox/snapshot.py +++ b/src/vercel/sandbox/snapshot.py @@ -38,7 +38,12 @@ def size_bytes(self) -> int: return self.snapshot.size_bytes @property - def expires_at(self) -> int: + def created_at(self) -> int: + """Timestamp when the snapshot was created.""" + return self.snapshot.created_at + + @property + def expires_at(self) -> int | None: """Timestamp when the snapshot expires.""" return self.snapshot.expires_at @@ -90,7 +95,12 @@ def size_bytes(self) -> int: return self.snapshot.size_bytes @property - def expires_at(self) -> int: + def created_at(self) -> int: + """Timestamp when the snapshot was created.""" + return self.snapshot.created_at + + @property + def expires_at(self) -> int | None: """Timestamp when the snapshot expires.""" return self.snapshot.expires_at diff --git a/tests/integration/test_sandbox_sync_async.py b/tests/integration/test_sandbox_sync_async.py index 3f2e146..44a8b1e 100644 --- a/tests/integration/test_sandbox_sync_async.py +++ b/tests/integration/test_sandbox_sync_async.py @@ -2335,10 +2335,10 @@ class TestSandboxSnapshot: """Test sandbox snapshot operations.""" @respx.mock - def test_create_snapshot_sync( + def test_create_snapshot_sync_without_expiration( self, mock_env_clear, mock_sandbox_get_response, mock_sandbox_snapshot_response ): - """Test synchronous snapshot creation.""" + """Test sync snapshot creation omits the body when expiration is unset.""" from vercel.sandbox import Sandbox sandbox_id = "sbx_test123456" @@ -2377,12 +2377,261 @@ def test_create_snapshot_sync( snapshot = sandbox.snapshot() assert route.called + assert route.calls.last.request.content == b"" assert snapshot.snapshot_id == mock_sandbox_snapshot_response["id"] + assert snapshot.created_at == mock_sandbox_snapshot_response["createdAt"] + assert snapshot.expires_at == mock_sandbox_snapshot_response["expiresAt"] # Sandbox should be stopped after snapshot assert sandbox.status == "stopped" sandbox.client.close() + @respx.mock + def test_create_snapshot_sync_with_expiration( + self, mock_env_clear, mock_sandbox_get_response, mock_sandbox_snapshot_response + ): + """Test sync snapshot creation forwards a non-zero expiration.""" + from vercel.sandbox import Sandbox + + sandbox_id = "sbx_test123456" + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": mock_sandbox_get_response, + "routes": [], + }, + ) + ) + + stopped_response = dict(mock_sandbox_get_response) + stopped_response["status"] = "stopped" + route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": stopped_response, + "snapshot": mock_sandbox_snapshot_response, + }, + ) + ) + + sandbox = Sandbox.get( + sandbox_id=sandbox_id, + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + snapshot = sandbox.snapshot(expiration=3600) + + assert route.called + body = json.loads(route.calls.last.request.content) + assert body == {"expiration": 3600} + assert snapshot.created_at == mock_sandbox_snapshot_response["createdAt"] + assert sandbox.status == "stopped" + + sandbox.client.close() + + @respx.mock + def test_create_snapshot_sync_with_zero_expiration( + self, mock_env_clear, mock_sandbox_get_response, mock_sandbox_snapshot_response + ): + """Test sync snapshot creation preserves explicit zero expiration.""" + from vercel.sandbox import Sandbox + + sandbox_id = "sbx_test123456" + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": mock_sandbox_get_response, + "routes": [], + }, + ) + ) + + stopped_response = dict(mock_sandbox_get_response) + stopped_response["status"] = "stopped" + route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": stopped_response, + "snapshot": mock_sandbox_snapshot_response, + }, + ) + ) + + sandbox = Sandbox.get( + sandbox_id=sandbox_id, + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + sandbox.snapshot(expiration=0) + + assert route.called + body = json.loads(route.calls.last.request.content) + assert body == {"expiration": 0} + assert sandbox.status == "stopped" + + sandbox.client.close() + + @respx.mock + @pytest.mark.asyncio + async def test_create_snapshot_async_without_expiration( + self, mock_env_clear, mock_sandbox_get_response, mock_sandbox_snapshot_response + ): + """Test async snapshot creation omits the body when expiration is unset.""" + from vercel.sandbox import AsyncSandbox + + sandbox_id = "sbx_test123456" + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": mock_sandbox_get_response, + "routes": [], + }, + ) + ) + + stopped_response = dict(mock_sandbox_get_response) + stopped_response["status"] = "stopped" + route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": stopped_response, + "snapshot": mock_sandbox_snapshot_response, + }, + ) + ) + + sandbox = await AsyncSandbox.get( + sandbox_id=sandbox_id, + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + snapshot = await sandbox.snapshot() + + assert route.called + assert route.calls.last.request.content == b"" + assert snapshot.snapshot_id == mock_sandbox_snapshot_response["id"] + assert snapshot.created_at == mock_sandbox_snapshot_response["createdAt"] + assert snapshot.expires_at == mock_sandbox_snapshot_response["expiresAt"] + assert sandbox.status == "stopped" + + await sandbox.client.aclose() + + @respx.mock + @pytest.mark.asyncio + async def test_create_snapshot_async_with_expiration( + self, mock_env_clear, mock_sandbox_get_response, mock_sandbox_snapshot_response + ): + """Test async snapshot creation forwards a non-zero expiration.""" + from vercel.sandbox import AsyncSandbox + + sandbox_id = "sbx_test123456" + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": mock_sandbox_get_response, + "routes": [], + }, + ) + ) + + stopped_response = dict(mock_sandbox_get_response) + stopped_response["status"] = "stopped" + route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": stopped_response, + "snapshot": mock_sandbox_snapshot_response, + }, + ) + ) + + sandbox = await AsyncSandbox.get( + sandbox_id=sandbox_id, + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + snapshot = await sandbox.snapshot(expiration=3600) + + assert route.called + body = json.loads(route.calls.last.request.content) + assert body == {"expiration": 3600} + assert snapshot.created_at == mock_sandbox_snapshot_response["createdAt"] + assert sandbox.status == "stopped" + + await sandbox.client.aclose() + + @respx.mock + @pytest.mark.asyncio + async def test_create_snapshot_async_with_zero_expiration_and_optional_expires_at( + self, mock_env_clear, mock_sandbox_get_response, mock_sandbox_snapshot_response + ): + """Test async snapshot creation keeps zero expiration and optional expiry metadata.""" + from vercel.sandbox import AsyncSandbox + + sandbox_id = "sbx_test123456" + snapshot_response = dict(mock_sandbox_snapshot_response) + snapshot_response.pop("expiresAt") + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": mock_sandbox_get_response, + "routes": [], + }, + ) + ) + + stopped_response = dict(mock_sandbox_get_response) + stopped_response["status"] = "stopped" + route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": stopped_response, + "snapshot": snapshot_response, + }, + ) + ) + + sandbox = await AsyncSandbox.get( + sandbox_id=sandbox_id, + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + snapshot = await sandbox.snapshot(expiration=0) + + assert route.called + body = json.loads(route.calls.last.request.content) + assert body == {"expiration": 0} + assert snapshot.created_at == snapshot_response["createdAt"] + assert snapshot.expires_at is None + assert sandbox.status == "stopped" + + await sandbox.client.aclose() + class TestSandboxExtendTimeout: """Test sandbox timeout extension.""" From ee4dd06aca15c216e7a620f2b30216f6286281d2 Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Tue, 24 Mar 2026 20:55:11 -0400 Subject: [PATCH 2/9] Add snapshot list pagination Add low-level snapshot listing support and typed sync/async page objects for snapshot pagination. Wire Snapshot.list() and AsyncSnapshot.list() through the shared pagination helpers and pin the behavior with integration coverage for filters, page traversal, and terminal-page handling. --- src/vercel/_internal/sandbox/core.py | 21 + src/vercel/_internal/sandbox/models.py | 7 + src/vercel/_internal/sandbox/pagination.py | 50 ++- src/vercel/sandbox/__init__.py | 4 +- src/vercel/sandbox/models.py | 2 + src/vercel/sandbox/page.py | 130 ++++++- src/vercel/sandbox/sandbox.py | 24 +- src/vercel/sandbox/snapshot.py | 103 +++++ tests/integration/test_sandbox_sync_async.py | 381 +++++++++++++++++++ 9 files changed, 700 insertions(+), 22 deletions(-) diff --git a/src/vercel/_internal/sandbox/core.py b/src/vercel/_internal/sandbox/core.py index 9e306d5..ff6b049 100644 --- a/src/vercel/_internal/sandbox/core.py +++ b/src/vercel/_internal/sandbox/core.py @@ -44,6 +44,7 @@ SandboxesResponse, SandboxResponse, SnapshotResponse, + SnapshotsResponse, WriteFile, ) from vercel._internal.sandbox.network_policy import ( @@ -427,6 +428,26 @@ async def get_snapshot(self, *, snapshot_id: str) -> SnapshotResponse: ) return SnapshotResponse.model_validate(data) + async def list_snapshots( + self, + *, + project_id: str | None = None, + limit: int | None = None, + since: int | None = None, + until: int | None = None, + ) -> SnapshotsResponse: + data = await self._request_client.request_json( + "GET", + "/v1/sandboxes/snapshots", + query={ + "project": project_id, + "limit": limit, + "since": since, + "until": until, + }, + ) + return SnapshotsResponse.model_validate(data) + async def delete_snapshot(self, *, snapshot_id: str) -> SnapshotResponse: data = await self._request_client.request_json( "DELETE", f"/v1/sandboxes/snapshots/{snapshot_id}" diff --git a/src/vercel/_internal/sandbox/models.py b/src/vercel/_internal/sandbox/models.py index 56dbf07..94ff510 100644 --- a/src/vercel/_internal/sandbox/models.py +++ b/src/vercel/_internal/sandbox/models.py @@ -198,6 +198,13 @@ class Snapshot(BaseModel): updated_at: int = Field(alias="updatedAt") +class SnapshotsResponse(BaseModel): + """API response containing a list of snapshots.""" + + snapshots: list[Snapshot] + pagination: Pagination + + class SnapshotResponse(BaseModel): """API response containing a snapshot.""" diff --git a/src/vercel/_internal/sandbox/pagination.py b/src/vercel/_internal/sandbox/pagination.py index d9bb0cd..4205b68 100644 --- a/src/vercel/_internal/sandbox/pagination.py +++ b/src/vercel/_internal/sandbox/pagination.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass +from datetime import datetime, timezone from vercel._internal.sandbox.models import Pagination @@ -32,4 +33,51 @@ def next_sandbox_page_info(pagination: Pagination) -> SandboxPageInfo | None: return SandboxPageInfo(until=pagination.next) -__all__ = ["SandboxListParams", "SandboxPageInfo", "next_sandbox_page_info"] +@dataclass(frozen=True, slots=True) +class SnapshotPageInfo: + until: int + + +@dataclass(frozen=True, slots=True) +class SnapshotListParams: + project_id: str | None = None + limit: int | None = None + since: int | None = None + until: int | None = None + + def with_until(self, until: int) -> SnapshotListParams: + return SnapshotListParams( + project_id=self.project_id, + limit=self.limit, + since=self.since, + until=until, + ) + + +def next_snapshot_page_info(pagination: Pagination) -> SnapshotPageInfo | None: + if pagination.next is None: + return None + return SnapshotPageInfo(until=pagination.next) + + +def normalize_list_timestamp(value: datetime | int | None) -> int | None: + if value is None: + return None + if isinstance(value, int): + return value + if isinstance(value, datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=timezone.utc) + return int(value.timestamp() * 1000) + raise TypeError("List timestamps must be datetime or integer milliseconds") + + +__all__ = [ + "SandboxListParams", + "SandboxPageInfo", + "SnapshotListParams", + "SnapshotPageInfo", + "next_sandbox_page_info", + "next_snapshot_page_info", + "normalize_list_timestamp", +] diff --git a/src/vercel/sandbox/__init__.py b/src/vercel/sandbox/__init__.py index 366d50f..76cad35 100644 --- a/src/vercel/sandbox/__init__.py +++ b/src/vercel/sandbox/__init__.py @@ -16,7 +16,7 @@ from .command import AsyncCommand, AsyncCommandFinished, Command, CommandFinished from .models import GitSource, SnapshotSource, Source, TarballSource -from .page import AsyncSandboxPage, SandboxPage +from .page import AsyncSandboxPage, AsyncSnapshotPage, SandboxPage, SnapshotPage from .sandbox import AsyncSandbox, Sandbox from .snapshot import AsyncSnapshot, Snapshot @@ -29,9 +29,11 @@ "SandboxServerError", "AsyncSandbox", "AsyncSandboxPage", + "AsyncSnapshotPage", "AsyncSnapshot", "Sandbox", "SandboxPage", + "SnapshotPage", "Snapshot", "AsyncCommand", "AsyncCommandFinished", diff --git a/src/vercel/sandbox/models.py b/src/vercel/sandbox/models.py index d9496c8..a539b0c 100644 --- a/src/vercel/sandbox/models.py +++ b/src/vercel/sandbox/models.py @@ -16,6 +16,7 @@ Snapshot, SnapshotResponse, SnapshotSource, + SnapshotsResponse, Source, TarballSource, WriteFile, @@ -37,6 +38,7 @@ "SandboxRoute", "SandboxesResponse", "Snapshot", + "SnapshotsResponse", "SnapshotResponse", "SnapshotSource", "Source", diff --git a/src/vercel/sandbox/page.py b/src/vercel/sandbox/page.py index 5dd61cf..37d673e 100644 --- a/src/vercel/sandbox/page.py +++ b/src/vercel/sandbox/page.py @@ -5,8 +5,17 @@ from typing import Any from vercel._internal.pagination import PageController -from vercel._internal.sandbox.models import Pagination, Sandbox as SandboxModel -from vercel._internal.sandbox.pagination import SandboxPageInfo, next_sandbox_page_info +from vercel._internal.sandbox.models import ( + Pagination, + Sandbox as SandboxModel, + Snapshot as SnapshotModel, +) +from vercel._internal.sandbox.pagination import ( + SandboxPageInfo, + SnapshotPageInfo, + next_sandbox_page_info, + next_snapshot_page_info, +) @dataclass(slots=True) @@ -118,9 +127,126 @@ async def iter_items(self) -> AsyncIterator[SandboxModel]: yield item +@dataclass(slots=True) +class SnapshotPage: + snapshots: list[SnapshotModel] + pagination: Pagination + _controller: PageController[SnapshotPage, SnapshotModel, SnapshotPageInfo] = field( + init=False, + repr=False, + ) + + @classmethod + def create( + cls, + *, + snapshots: list[SnapshotModel], + pagination: Pagination, + fetch_next_page: Callable[[SnapshotPageInfo], Awaitable[SnapshotPage]], + ) -> SnapshotPage: + page = cls(snapshots=list(snapshots), pagination=pagination) + page._controller = PageController( + get_items=lambda current_page: current_page.snapshots, + get_next_page_info=lambda current_page: next_snapshot_page_info( + current_page.pagination + ), + fetch_next_page=fetch_next_page, + ) + return page + + def has_next_page(self) -> bool: + return self._controller.has_next_page(self) + + def next_page_info(self) -> SnapshotPageInfo | None: + return self._controller.next_page_info(self) + + def get_next_page(self) -> SnapshotPage | None: + return self._controller.get_next_page_sync(self) + + def iter_pages(self) -> Iterator[SnapshotPage]: + return self._controller.iter_pages_sync(self) + + def iter_items(self) -> Iterator[SnapshotModel]: + return self._controller.iter_items_sync(self) + + +@dataclass(slots=True) +class AsyncSnapshotPage: + snapshots: list[SnapshotModel] + pagination: Pagination + _controller: PageController[AsyncSnapshotPage, SnapshotModel, SnapshotPageInfo] = field( + init=False, + repr=False, + ) + + @classmethod + def create( + cls, + *, + snapshots: list[SnapshotModel], + pagination: Pagination, + fetch_next_page: Callable[[SnapshotPageInfo], Awaitable[AsyncSnapshotPage]], + ) -> AsyncSnapshotPage: + page = cls(snapshots=list(snapshots), pagination=pagination) + page._controller = PageController( + get_items=lambda current_page: current_page.snapshots, + get_next_page_info=lambda current_page: next_snapshot_page_info( + current_page.pagination + ), + fetch_next_page=fetch_next_page, + ) + return page + + def has_next_page(self) -> bool: + return self._controller.has_next_page(self) + + def next_page_info(self) -> SnapshotPageInfo | None: + return self._controller.next_page_info(self) + + async def get_next_page(self) -> AsyncSnapshotPage | None: + return await self._controller.get_next_page(self) + + def iter_pages(self) -> AsyncIterator[AsyncSnapshotPage]: + return self._controller.iter_pages(self) + + def iter_items(self) -> AsyncIterator[SnapshotModel]: + return self._controller.iter_items(self) + + +@dataclass(slots=True) +class AsyncSnapshotPager: + _fetch_first_page: Callable[[], Awaitable[AsyncSnapshotPage]] + _first_page: AsyncSnapshotPage | None = field(init=False, default=None, repr=False) + + async def _get_first_page(self) -> AsyncSnapshotPage: + if self._first_page is None: + self._first_page = await self._fetch_first_page() + return self._first_page + + def __await__(self) -> Generator[Any, None, AsyncSnapshotPage]: + return self._get_first_page().__await__() + + def __aiter__(self) -> AsyncIterator[SnapshotModel]: + return self.iter_items() + + async def iter_pages(self) -> AsyncIterator[AsyncSnapshotPage]: + first_page = await self._get_first_page() + async for page in first_page.iter_pages(): + yield page + + async def iter_items(self) -> AsyncIterator[SnapshotModel]: + first_page = await self._get_first_page() + async for item in first_page.iter_items(): + yield item + + __all__ = [ + "AsyncSnapshotPager", + "AsyncSnapshotPage", "AsyncSandboxPager", "AsyncSandboxPage", "SandboxPage", + "SnapshotPage", "SandboxPageInfo", + "SnapshotPageInfo", ] diff --git a/src/vercel/sandbox/sandbox.py b/src/vercel/sandbox/sandbox.py index 800744e..aa1aebb 100644 --- a/src/vercel/sandbox/sandbox.py +++ b/src/vercel/sandbox/sandbox.py @@ -3,7 +3,7 @@ import builtins import time from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import datetime from typing import Any from vercel._internal.iter_coroutine import iter_coroutine @@ -19,7 +19,7 @@ ApiNetworkPolicy, NetworkPolicy, ) -from vercel._internal.sandbox.pagination import SandboxListParams +from vercel._internal.sandbox.pagination import SandboxListParams, normalize_list_timestamp from ..oidc import Credentials, get_credentials from .command import ( @@ -104,18 +104,6 @@ async def fetch_next_page(page_info) -> SandboxPage: ) -def _normalize_list_timestamp(value: datetime | int | None) -> int | None: - if value is None: - return None - if isinstance(value, int): - return value - if isinstance(value, datetime): - if value.tzinfo is None: - value = value.replace(tzinfo=timezone.utc) - return int(value.timestamp() * 1000) - raise TypeError("Sandbox list timestamps must be datetime or integer milliseconds") - - @dataclass class AsyncSandbox: client: AsyncSandboxOpsClient @@ -256,8 +244,8 @@ def list( params = SandboxListParams( project_id=creds.project_id, limit=limit, - since=_normalize_list_timestamp(since), - until=_normalize_list_timestamp(until), + since=normalize_list_timestamp(since), + until=normalize_list_timestamp(until), ) return AsyncSandboxPager( _fetch_first_page=lambda: _build_async_sandbox_page(creds=creds, params=params) @@ -617,8 +605,8 @@ def list( params = SandboxListParams( project_id=creds.project_id, limit=limit, - since=_normalize_list_timestamp(since), - until=_normalize_list_timestamp(until), + since=normalize_list_timestamp(since), + until=normalize_list_timestamp(until), ) return iter_coroutine(_build_sync_sandbox_page(creds=creds, params=params)) diff --git a/src/vercel/sandbox/snapshot.py b/src/vercel/sandbox/snapshot.py index b1430cb..8b82d09 100644 --- a/src/vercel/sandbox/snapshot.py +++ b/src/vercel/sandbox/snapshot.py @@ -1,13 +1,74 @@ from __future__ import annotations from dataclasses import dataclass +from datetime import datetime from typing import Literal from vercel._internal.iter_coroutine import iter_coroutine from vercel._internal.sandbox import AsyncSandboxOpsClient, SyncSandboxOpsClient from vercel._internal.sandbox.models import Snapshot as SnapshotModel +from vercel._internal.sandbox.pagination import SnapshotListParams, normalize_list_timestamp from ..oidc import Credentials, get_credentials +from .page import AsyncSnapshotPage, AsyncSnapshotPager, SnapshotPage + + +async def _build_async_snapshot_page( + *, + creds: Credentials, + params: SnapshotListParams, +) -> AsyncSnapshotPage: + client = AsyncSandboxOpsClient(team_id=creds.team_id, token=creds.token) + try: + response = await client.list_snapshots( + project_id=params.project_id, + limit=params.limit, + since=params.since, + until=params.until, + ) + finally: + await client.aclose() + + async def fetch_next_page(page_info) -> AsyncSnapshotPage: + return await _build_async_snapshot_page( + creds=creds, + params=params.with_until(page_info.until), + ) + + return AsyncSnapshotPage.create( + snapshots=response.snapshots, + pagination=response.pagination, + fetch_next_page=fetch_next_page, + ) + + +async def _build_sync_snapshot_page( + *, + creds: Credentials, + params: SnapshotListParams, +) -> SnapshotPage: + client = SyncSandboxOpsClient(team_id=creds.team_id, token=creds.token) + try: + response = await client.list_snapshots( + project_id=params.project_id, + limit=params.limit, + since=params.since, + until=params.until, + ) + finally: + client.close() + + async def fetch_next_page(page_info) -> SnapshotPage: + return await _build_sync_snapshot_page( + creds=creds, + params=params.with_until(page_info.until), + ) + + return SnapshotPage.create( + snapshots=response.snapshots, + pagination=response.pagination, + fetch_next_page=fetch_next_page, + ) @dataclass @@ -61,6 +122,28 @@ async def get( resp = await client.get_snapshot(snapshot_id=snapshot_id) return AsyncSnapshot(client=client, snapshot=resp.snapshot) + @staticmethod + def list( + *, + limit: int | None = None, + since: datetime | int | None = None, + until: datetime | int | None = None, + token: str | None = None, + project_id: str | None = None, + team_id: str | None = None, + ) -> AsyncSnapshotPager: + """List snapshots and return the first page.""" + creds: Credentials = get_credentials(token=token, project_id=project_id, team_id=team_id) + params = SnapshotListParams( + project_id=creds.project_id, + limit=limit, + since=normalize_list_timestamp(since), + until=normalize_list_timestamp(until), + ) + return AsyncSnapshotPager( + _fetch_first_page=lambda: _build_async_snapshot_page(creds=creds, params=params) + ) + async def delete(self) -> None: """Delete this snapshot.""" resp = await self.client.delete_snapshot(snapshot_id=self.snapshot.id) @@ -118,6 +201,26 @@ def get( resp = iter_coroutine(client.get_snapshot(snapshot_id=snapshot_id)) return Snapshot(client=client, snapshot=resp.snapshot) + @staticmethod + def list( + *, + limit: int | None = None, + since: datetime | int | None = None, + until: datetime | int | None = None, + token: str | None = None, + project_id: str | None = None, + team_id: str | None = None, + ) -> SnapshotPage: + """List snapshots and return the first page.""" + creds: Credentials = get_credentials(token=token, project_id=project_id, team_id=team_id) + params = SnapshotListParams( + project_id=creds.project_id, + limit=limit, + since=normalize_list_timestamp(since), + until=normalize_list_timestamp(until), + ) + return iter_coroutine(_build_sync_snapshot_page(creds=creds, params=params)) + def delete(self) -> None: """Delete this snapshot.""" resp = iter_coroutine(self.client.delete_snapshot(snapshot_id=self.snapshot.id)) diff --git a/tests/integration/test_sandbox_sync_async.py b/tests/integration/test_sandbox_sync_async.py index 44a8b1e..06565b5 100644 --- a/tests/integration/test_sandbox_sync_async.py +++ b/tests/integration/test_sandbox_sync_async.py @@ -38,6 +38,19 @@ def _sandbox_with_id( return sandbox +def _snapshot_with_id( + base_snapshot: dict, + snapshot_id: str, + *, + created_at: int, +) -> dict: + snapshot = dict(base_snapshot) + snapshot["id"] = snapshot_id + snapshot["createdAt"] = created_at + snapshot["updatedAt"] = created_at + return snapshot + + async def _collect_async_pages(page) -> list: return [current_page async for current_page in page.iter_pages()] @@ -1070,6 +1083,374 @@ def handler(request: httpx.Request) -> httpx.Response: assert [sandbox.id for sandbox in items] == ["sbx_async_terminal"] assert requests == [{"teamId": "team_test123", "project": "prj_test123"}] + +class TestSnapshotList: + """Test snapshot listing operations.""" + + @respx.mock + def test_list_snapshot_sync_serializes_filters_and_iterates_pages( + self, mock_env_clear, mock_sandbox_snapshot_response + ): + from vercel._internal.sandbox.models import Snapshot as SnapshotModel + from vercel.sandbox import Snapshot + + project = "snapshot-project" + limit = 2 + since = datetime(2024, 1, 15, 12, 0, tzinfo=timezone.utc) + until = datetime(2024, 1, 15, 12, 30, tzinfo=timezone.utc) + expected_since = str(int(since.timestamp() * 1000)) + expected_until = str(int(until.timestamp() * 1000)) + next_until = "1705320000000" + + first_page = { + "snapshots": [ + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_list_1", + created_at=1705320600000, + ), + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_list_2", + created_at=1705320300000, + ), + ], + "pagination": { + "count": 3, + "next": int(next_until), + "prev": None, + }, + } + second_page = { + "snapshots": [ + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_list_3", + created_at=1705320000000, + ), + ], + "pagination": { + "count": 3, + "next": None, + "prev": 1705320600000, + }, + } + requests: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + requests.append(params) + if params.get("until") == next_until: + return httpx.Response(200, json=second_page) + return httpx.Response(200, json=first_page) + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/snapshots").mock(side_effect=handler) + + page = Snapshot.list( + token="test_token", + team_id="team_test123", + project_id=project, + limit=limit, + since=since, + until=until, + ) + + assert requests == [ + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": expected_since, + "until": expected_until, + } + ] + assert isinstance(page.snapshots[0], SnapshotModel) + assert page.snapshots[0].id == "snap_list_1" + assert page.snapshots[0].created_at == 1705320600000 + assert page.snapshots[0].expires_at == mock_sandbox_snapshot_response["expiresAt"] + assert page.pagination.count == 3 + assert page.next_page_info() is not None + assert page.next_page_info().until == int(next_until) + assert [ + [snapshot.id for snapshot in current_page.snapshots] + for current_page in page.iter_pages() + ] == [ + ["snap_list_1", "snap_list_2"], + ["snap_list_3"], + ] + assert requests == [ + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": expected_since, + "until": expected_until, + }, + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": expected_since, + "until": next_until, + }, + ] + + requests.clear() + items_page = Snapshot.list( + token="test_token", + team_id="team_test123", + project_id=project, + limit=limit, + since=since, + until=until, + ) + assert [snapshot.id for snapshot in items_page.iter_items()] == [ + "snap_list_1", + "snap_list_2", + "snap_list_3", + ] + assert requests == [ + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": expected_since, + "until": expected_until, + }, + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": expected_since, + "until": next_until, + }, + ] + + @respx.mock + @pytest.mark.asyncio + async def test_list_snapshot_async_serializes_integer_filters_and_iterates_pages( + self, mock_env_clear, mock_sandbox_snapshot_response + ): + from vercel._internal.sandbox.models import Snapshot as SnapshotModel + from vercel.sandbox import AsyncSnapshot + + project = "snapshot-project" + limit = 2 + since = 1705321200000 + until = 1705323000000 + next_until = "1705319400000" + + first_page = { + "snapshots": [ + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_async_1", + created_at=1705320600000, + ), + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_async_2", + created_at=1705320000000, + ), + ], + "pagination": { + "count": 3, + "next": int(next_until), + "prev": None, + }, + } + second_page = { + "snapshots": [ + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_async_3", + created_at=1705319400000, + ), + ], + "pagination": { + "count": 3, + "next": None, + "prev": 1705320600000, + }, + } + requests: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + requests.append(params) + if params.get("until") == next_until: + return httpx.Response(200, json=second_page) + return httpx.Response(200, json=first_page) + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/snapshots").mock(side_effect=handler) + + page = await AsyncSnapshot.list( + token="test_token", + team_id="team_test123", + project_id=project, + limit=limit, + since=since, + until=until, + ) + + assert requests == [ + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": str(since), + "until": str(until), + } + ] + assert isinstance(page.snapshots[0], SnapshotModel) + assert page.snapshots[0].id == "snap_async_1" + assert page.snapshots[0].created_at == 1705320600000 + assert page.snapshots[0].expires_at == mock_sandbox_snapshot_response["expiresAt"] + assert page.pagination.count == 3 + assert page.next_page_info() is not None + assert page.next_page_info().until == int(next_until) + + pages = await _collect_async_pages(page) + assert [[snapshot.id for snapshot in current_page.snapshots] for current_page in pages] == [ + ["snap_async_1", "snap_async_2"], + ["snap_async_3"], + ] + + items_page = await AsyncSnapshot.list( + token="test_token", + team_id="team_test123", + project_id=project, + limit=limit, + since=since, + until=until, + ) + items = await _collect_async_items(items_page) + assert [snapshot.id for snapshot in items] == [ + "snap_async_1", + "snap_async_2", + "snap_async_3", + ] + assert requests == [ + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": str(since), + "until": str(until), + }, + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": str(since), + "until": next_until, + }, + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": str(since), + "until": str(until), + }, + { + "teamId": "team_test123", + "project": project, + "limit": str(limit), + "since": str(since), + "until": next_until, + }, + ] + + @respx.mock + def test_list_snapshot_sync_single_page_does_not_fetch_more( + self, mock_env_clear, mock_sandbox_snapshot_response + ): + from vercel.sandbox import Snapshot + + response = { + "snapshots": [ + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_single_page", + created_at=1705320600000, + ), + ], + "pagination": { + "count": 1, + "next": None, + "prev": None, + }, + } + requests: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + requests.append(dict(request.url.params)) + return httpx.Response(200, json=response) + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/snapshots").mock(side_effect=handler) + + page = Snapshot.list( + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + assert page.has_next_page() is False + assert page.next_page_info() is None + assert page.get_next_page() is None + assert [ + [snapshot.id for snapshot in current_page.snapshots] + for current_page in page.iter_pages() + ] == [["snap_single_page"]] + assert [snapshot.id for snapshot in page.iter_items()] == ["snap_single_page"] + assert requests == [{"teamId": "team_test123", "project": "prj_test123"}] + + @respx.mock + @pytest.mark.asyncio + async def test_list_snapshot_async_single_page_does_not_fetch_more( + self, mock_env_clear, mock_sandbox_snapshot_response + ): + from vercel.sandbox import AsyncSnapshot + + response = { + "snapshots": [ + _snapshot_with_id( + mock_sandbox_snapshot_response, + "snap_async_single", + created_at=1705320600000, + ), + ], + "pagination": { + "count": 1, + "next": None, + "prev": None, + }, + } + requests: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + requests.append(dict(request.url.params)) + return httpx.Response(200, json=response) + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/snapshots").mock(side_effect=handler) + + page = await AsyncSnapshot.list( + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + assert page.has_next_page() is False + assert page.next_page_info() is None + assert await page.get_next_page() is None + pages = await _collect_async_pages(page) + assert [[snapshot.id for snapshot in current_page.snapshots] for current_page in pages] == [ + ["snap_async_single"], + ] + items = await _collect_async_items(page) + assert [snapshot.id for snapshot in items] == ["snap_async_single"] + assert requests == [{"teamId": "team_test123", "project": "prj_test123"}] + @respx.mock def test_get_sandbox_sync_exposes_mode_network_policy( self, mock_env_clear, mock_sandbox_get_response_with_mode_network_policy From 02140eb38e7b47a1837f807692264dce599ba06b Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Wed, 25 Mar 2026 06:24:39 -0400 Subject: [PATCH 3/9] Expand snapshot example coverage Extend the snapshot example to cover expiration controls, listing, page and item iteration, and cleanup in both sync and async flows. Add example-backed tests for the new snapshot behavior, simplify list integration assertions to public fields, and tighten typing in the sandbox request and error helpers. --- examples/sandbox_11_snapshots.py | 140 +++++-- src/vercel/_internal/sandbox/core.py | 35 +- src/vercel/_internal/sandbox/errors.py | 8 +- tests/integration/test_sandbox_sync_async.py | 38 +- tests/test_examples.py | 382 +++++++++++++++++++ 5 files changed, 552 insertions(+), 51 deletions(-) diff --git a/examples/sandbox_11_snapshots.py b/examples/sandbox_11_snapshots.py index 952a070..da2b22d 100644 --- a/examples/sandbox_11_snapshots.py +++ b/examples/sandbox_11_snapshots.py @@ -28,6 +28,8 @@ async def async_demo() -> None: print("ASYNC SNAPSHOT EXAMPLE") print("=" * 60) + async_snapshot_ids: list[str] = [] + # Step 1: Create a sandbox and set it up print("\n[1] Creating initial sandbox...") sandbox1 = await AsyncSandbox.create(timeout=120_000) @@ -46,11 +48,14 @@ async def async_demo() -> None: config = await sandbox1.read_file("config.json") print(f" Created config.json: {config.decode()}") - # Step 2: Create a snapshot (this STOPS the sandbox) - print("\n[3] Creating snapshot...") - snapshot = await sandbox1.snapshot() + # Step 2: Create a snapshot with an explicit expiration (this STOPS the sandbox) + print("\n[3] Creating snapshot with expiration=3600...") + snapshot = await sandbox1.snapshot(expiration=3600) + async_snapshot_ids.append(snapshot.snapshot_id) print(f" Snapshot ID: {snapshot.snapshot_id}") print(f" Status: {snapshot.status}") + print(f" Created At: {snapshot.created_at}") + print(f" Expires At: {snapshot.expires_at}") print(f" Sandbox status after snapshot: {sandbox1.status}") finally: @@ -76,20 +81,61 @@ async def async_demo() -> None: assert config2 == b'{"version": "1.0", "env": "async"}', "config.json mismatch!" assert users2 == b"alice\nbob\ncharlie", "users.txt mismatch!" - print("\n✓ Async assertions passed!") + print("\n✓ Async restore assertions passed!") + + # Create a second snapshot with expiration=0 to preserve it indefinitely. + print("\n[6] Creating second snapshot with expiration=0...") + persistent_snapshot = await sandbox2.snapshot(expiration=0) + async_snapshot_ids.append(persistent_snapshot.snapshot_id) + print(f" Snapshot ID: {persistent_snapshot.snapshot_id}") + print(f" Created At: {persistent_snapshot.created_at}") + print(f" Expires At: {persistent_snapshot.expires_at}") finally: - await sandbox2.stop() await sandbox2.client.aclose() - # Step 5: Retrieve and delete the snapshot - print("\n[6] Retrieving and deleting snapshot...") - fetched = await AsyncSnapshot.get(snapshot_id=snapshot.snapshot_id) - try: - await fetched.delete() - print(f" Deleted snapshot, status: {fetched.status}") - finally: - await fetched.client.aclose() + # Step 5: List snapshots and confirm the new snapshots are discoverable. + print("\n[7] Listing recent snapshots...") + since = snapshot.created_at - 1 + pager = AsyncSnapshot.list(limit=10, since=since) + first_page = await pager + first_page_ids = [listed.id for listed in first_page.snapshots] + print(f" First page snapshot IDs: {first_page_ids}") + + paged_ids: list[list[str]] = [] + found_ids: set[str] = set() + async for page in pager.iter_pages(): + page_ids = [listed.id for listed in page.snapshots] + paged_ids.append(page_ids) + found_ids.update(page_ids) + if all(snapshot_id in found_ids for snapshot_id in async_snapshot_ids): + break + + print(f" Visited pages: {paged_ids}") + assert all(snapshot_id in found_ids for snapshot_id in async_snapshot_ids), ( + "Did not find all async snapshots in AsyncSnapshot.list() results" + ) + + item_ids: list[str] = [] + async for listed in AsyncSnapshot.list(limit=10, since=since): + item_ids.append(listed.id) + if all(snapshot_id in item_ids for snapshot_id in async_snapshot_ids): + break + + print(f" Iterated snapshot IDs: {item_ids}") + assert all(snapshot_id in item_ids for snapshot_id in async_snapshot_ids), ( + "Did not find all async snapshots while iterating AsyncSnapshot.list() items" + ) + + # Step 6: Retrieve and delete the snapshots + print("\n[8] Retrieving and deleting snapshots...") + for snapshot_id in async_snapshot_ids: + fetched = await AsyncSnapshot.get(snapshot_id=snapshot_id) + try: + await fetched.delete() + print(f" Deleted snapshot {snapshot_id}, status: {fetched.status}") + finally: + await fetched.client.aclose() def sync_demo() -> None: @@ -97,6 +143,8 @@ def sync_demo() -> None: print("SYNC SNAPSHOT EXAMPLE") print("=" * 60) + sync_snapshot_ids: list[str] = [] + # Step 1: Create a sandbox and set it up print("\n[1] Creating initial sandbox...") sandbox1 = Sandbox.create(timeout=120_000) @@ -115,11 +163,14 @@ def sync_demo() -> None: config = sandbox1.read_file("config.json") print(f" Created config.json: {config.decode()}") - # Step 2: Create a snapshot (this STOPS the sandbox) - print("\n[3] Creating snapshot...") - snapshot = sandbox1.snapshot() + # Step 2: Create a snapshot with an explicit expiration (this STOPS the sandbox) + print("\n[3] Creating snapshot with expiration=3600...") + snapshot = sandbox1.snapshot(expiration=3600) + sync_snapshot_ids.append(snapshot.snapshot_id) print(f" Snapshot ID: {snapshot.snapshot_id}") print(f" Status: {snapshot.status}") + print(f" Created At: {snapshot.created_at}") + print(f" Expires At: {snapshot.expires_at}") print(f" Sandbox status after snapshot: {sandbox1.status}") finally: @@ -145,20 +196,57 @@ def sync_demo() -> None: assert config2 == b'{"version": "1.0", "env": "sync"}', "config.json mismatch!" assert users2 == b"alice\nbob\ncharlie", "users.txt mismatch!" - print("\n✓ Sync assertions passed!") + print("\n✓ Sync restore assertions passed!") + + print("\n[6] Creating second snapshot with expiration=0...") + persistent_snapshot = sandbox2.snapshot(expiration=0) + sync_snapshot_ids.append(persistent_snapshot.snapshot_id) + print(f" Snapshot ID: {persistent_snapshot.snapshot_id}") + print(f" Created At: {persistent_snapshot.created_at}") + print(f" Expires At: {persistent_snapshot.expires_at}") finally: - sandbox2.stop() sandbox2.client.close() - # Step 5: Retrieve and delete the snapshot - print("\n[6] Retrieving and deleting snapshot...") - fetched = Snapshot.get(snapshot_id=snapshot.snapshot_id) - try: - fetched.delete() - print(f" Deleted snapshot, status: {fetched.status}") - finally: - fetched.client.close() + print("\n[7] Listing recent snapshots...") + since = snapshot.created_at - 1 + first_page = Snapshot.list(limit=10, since=since) + first_page_ids = [listed.id for listed in first_page.snapshots] + print(f" First page snapshot IDs: {first_page_ids}") + + paged_ids: list[list[str]] = [] + found_ids: set[str] = set() + for page in first_page.iter_pages(): + page_ids = [listed.id for listed in page.snapshots] + paged_ids.append(page_ids) + found_ids.update(page_ids) + if all(snapshot_id in found_ids for snapshot_id in sync_snapshot_ids): + break + + print(f" Visited pages: {paged_ids}") + assert all(snapshot_id in found_ids for snapshot_id in sync_snapshot_ids), ( + "Did not find all sync snapshots in Snapshot.list() pages" + ) + + item_ids: list[str] = [] + for listed in Snapshot.list(limit=10, since=since).iter_items(): + item_ids.append(listed.id) + if all(snapshot_id in item_ids for snapshot_id in sync_snapshot_ids): + break + + print(f" Iterated snapshot IDs: {item_ids}") + assert all(snapshot_id in item_ids for snapshot_id in sync_snapshot_ids), ( + "Did not find all sync snapshots while iterating Snapshot.list() items" + ) + + print("\n[8] Retrieving and deleting snapshots...") + for snapshot_id in sync_snapshot_ids: + fetched = Snapshot.get(snapshot_id=snapshot_id) + try: + fetched.delete() + print(f" Deleted snapshot {snapshot_id}, status: {fetched.status}") + finally: + fetched.client.close() if __name__ == "__main__": diff --git a/src/vercel/_internal/sandbox/core.py b/src/vercel/_internal/sandbox/core.py index ff6b049..83c8a9e 100644 --- a/src/vercel/_internal/sandbox/core.py +++ b/src/vercel/_internal/sandbox/core.py @@ -16,7 +16,7 @@ import tarfile from collections.abc import AsyncGenerator, Generator from importlib.metadata import version as _pkg_version -from typing import Any +from typing import Any, TypeAlias, cast import httpx @@ -62,6 +62,10 @@ f"vercel/sandbox/{VERSION} (Python/{sys.version}; {PLATFORM.system}/{PLATFORM.machine})" ) +JSONScalar: TypeAlias = str | int | float | bool | None +JSONValue: TypeAlias = JSONScalar | dict[str, "JSONValue"] | list["JSONValue"] +RequestQuery: TypeAlias = dict[str, str | int | float | bool | None] + # --------------------------------------------------------------------------- # Request client — error handling + request_json convenience @@ -84,11 +88,11 @@ async def request( path: str, *, headers: dict[str, str] | None = None, - query: dict[str, Any] | None = None, + query: RequestQuery | None = None, body: JSONBody | BytesBody | None = None, stream: bool = False, ) -> httpx.Response: - params: dict[str, Any] | None = None + params: RequestQuery | None = None if query: params = {k: v for k, v in query.items() if v is not None} @@ -114,7 +118,7 @@ async def request( error_body = None # Parse a helpful error message - parsed: Any | None = None + parsed: JSONValue | None = None message = f"HTTP {response.status_code}" if error_body: try: @@ -149,19 +153,30 @@ async def request_json( self, method: str, path: str, - **kwargs: Any, - ) -> Any: - headers = kwargs.pop("headers", None) or {} + *, + headers: dict[str, str] | None = None, + query: RequestQuery | None = None, + body: JSONBody | BytesBody | None = None, + stream: bool = False, + ) -> JSONValue: + headers = dict(headers or {}) headers.setdefault("content-type", "application/json") - r = await self.request(method, path, headers=headers, **kwargs) - return r.json() + r = await self.request( + method, + path, + headers=headers, + query=query, + body=body, + stream=stream, + ) + return cast(JSONValue, r.json()) def _build_sandbox_error( response: httpx.Response, message: str, *, - data: Any | None = None, + data: JSONValue | None = None, ) -> APIError: status_code = response.status_code if status_code == 401: diff --git a/src/vercel/_internal/sandbox/errors.py b/src/vercel/_internal/sandbox/errors.py index 3f4e108..62bb96d 100644 --- a/src/vercel/_internal/sandbox/errors.py +++ b/src/vercel/_internal/sandbox/errors.py @@ -2,8 +2,6 @@ from __future__ import annotations -from typing import Any - import httpx @@ -23,11 +21,11 @@ class SandboxError(Exception): class APIError(SandboxError): - def __init__(self, response: httpx.Response, message: str, *, data: Any | None = None): + def __init__(self, response: httpx.Response, message: str, *, data: object | None = None): super().__init__(message) self.response = response self.status_code = response.status_code - self.data = data + self.data: object | None = data class SandboxAuthError(APIError): @@ -44,7 +42,7 @@ def __init__( response: httpx.Response, message: str, *, - data: Any | None = None, + data: object | None = None, retry_after: str | int | None = None, ) -> None: super().__init__(response, message, data=data) diff --git a/tests/integration/test_sandbox_sync_async.py b/tests/integration/test_sandbox_sync_async.py index 06565b5..fee6b8e 100644 --- a/tests/integration/test_sandbox_sync_async.py +++ b/tests/integration/test_sandbox_sync_async.py @@ -1091,7 +1091,6 @@ class TestSnapshotList: def test_list_snapshot_sync_serializes_filters_and_iterates_pages( self, mock_env_clear, mock_sandbox_snapshot_response ): - from vercel._internal.sandbox.models import Snapshot as SnapshotModel from vercel.sandbox import Snapshot project = "snapshot-project" @@ -1164,10 +1163,20 @@ def handler(request: httpx.Request) -> httpx.Response: "until": expected_until, } ] - assert isinstance(page.snapshots[0], SnapshotModel) - assert page.snapshots[0].id == "snap_list_1" - assert page.snapshots[0].created_at == 1705320600000 - assert page.snapshots[0].expires_at == mock_sandbox_snapshot_response["expiresAt"] + assert [ + (snapshot.id, snapshot.created_at, snapshot.expires_at) for snapshot in page.snapshots + ] == [ + ( + "snap_list_1", + 1705320600000, + mock_sandbox_snapshot_response["expiresAt"], + ), + ( + "snap_list_2", + 1705320300000, + mock_sandbox_snapshot_response["expiresAt"], + ), + ] assert page.pagination.count == 3 assert page.next_page_info() is not None assert page.next_page_info().until == int(next_until) @@ -1231,7 +1240,6 @@ def handler(request: httpx.Request) -> httpx.Response: async def test_list_snapshot_async_serializes_integer_filters_and_iterates_pages( self, mock_env_clear, mock_sandbox_snapshot_response ): - from vercel._internal.sandbox.models import Snapshot as SnapshotModel from vercel.sandbox import AsyncSnapshot project = "snapshot-project" @@ -1302,10 +1310,20 @@ def handler(request: httpx.Request) -> httpx.Response: "until": str(until), } ] - assert isinstance(page.snapshots[0], SnapshotModel) - assert page.snapshots[0].id == "snap_async_1" - assert page.snapshots[0].created_at == 1705320600000 - assert page.snapshots[0].expires_at == mock_sandbox_snapshot_response["expiresAt"] + assert [ + (snapshot.id, snapshot.created_at, snapshot.expires_at) for snapshot in page.snapshots + ] == [ + ( + "snap_async_1", + 1705320600000, + mock_sandbox_snapshot_response["expiresAt"], + ), + ( + "snap_async_2", + 1705320000000, + mock_sandbox_snapshot_response["expiresAt"], + ), + ] assert page.pagination.count == 3 assert page.next_page_info() is not None assert page.next_page_info().until == int(next_until) diff --git a/tests/test_examples.py b/tests/test_examples.py index 4f3549e..d195dc3 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -1,9 +1,13 @@ from __future__ import annotations +import asyncio +import importlib.util import os import subprocess import sys +from dataclasses import dataclass from pathlib import Path +from typing import TypedDict import pytest @@ -24,6 +28,384 @@ ) +@dataclass +class _ListedSnapshot: + id: str + created_at: int + expires_at: int | None + + +@dataclass +class _CreatedSnapshot: + snapshot_id: str + status: str + created_at: int + expires_at: int | None + + +class _WriteFile(TypedDict): + path: str + content: bytes + + +class _NoopAsyncClient: + async def aclose(self) -> None: + return None + + +class _NoopSyncClient: + def close(self) -> None: + return None + + +def _load_example_module(module_name: str, script_path: Path): + spec = importlib.util.spec_from_file_location(module_name, script_path) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def _build_snapshot_example_fakes(): + state = { + "async_records": [], + "sync_records": [], + "async_snapshot_calls": [], + "sync_snapshot_calls": [], + "async_list_calls": [], + "sync_list_calls": [], + "async_pager_awaits": 0, + "async_page_iterations": 0, + "async_item_iterations": 0, + "sync_page_iterations": 0, + "sync_item_iterations": 0, + "async_page_ids": [], + "async_item_ids": [], + "sync_page_ids": [], + "sync_item_ids": [], + "async_deleted_ids": [], + "sync_deleted_ids": [], + "async_sandbox_index": 0, + "sync_sandbox_index": 0, + "async_snapshot_index": 0, + "sync_snapshot_index": 0, + } + + def _created_at(index: int) -> int: + return 1_705_320_600_000 + index + + def _expires_at(created_at: int, expiration: int | None) -> int | None: + if expiration in (None, 0): + return None + assert expiration is not None + return created_at + expiration * 1000 + + def _make_async_listed_snapshots() -> list[_ListedSnapshot]: + return [ + _ListedSnapshot( + id=record["id"], + created_at=record["created_at"], + expires_at=record["expires_at"], + ) + for record in state["async_records"] + ] + + def _make_sync_listed_snapshots() -> list[_ListedSnapshot]: + return [ + _ListedSnapshot( + id=record["id"], + created_at=record["created_at"], + expires_at=record["expires_at"], + ) + for record in state["sync_records"] + ] + + class FakeAsyncSnapshotPage: + def __init__(self, snapshots: list[_ListedSnapshot]) -> None: + self.snapshots = snapshots + + async def iter_pages(self): + state["async_page_iterations"] += 1 + state["async_page_ids"].append([snapshot.id for snapshot in self.snapshots]) + yield self + + async def iter_items(self): + state["async_item_iterations"] += 1 + for snapshot in self.snapshots: + state["async_item_ids"].append(snapshot.id) + yield snapshot + + class FakeAsyncSnapshotPager: + def __init__(self, snapshots: list[_ListedSnapshot]) -> None: + self._page = FakeAsyncSnapshotPage(snapshots) + + async def _get_first_page(self) -> FakeAsyncSnapshotPage: + state["async_pager_awaits"] += 1 + return self._page + + def __await__(self): + return self._get_first_page().__await__() + + def __aiter__(self): + return self.iter_items() + + async def iter_pages(self): + state["async_page_iterations"] += 1 + state["async_page_ids"].append([snapshot.id for snapshot in self._page.snapshots]) + yield self._page + + async def iter_items(self): + state["async_item_iterations"] += 1 + for snapshot in self._page.snapshots: + state["async_item_ids"].append(snapshot.id) + yield snapshot + + class FakeSyncSnapshotPage: + def __init__(self, snapshots: list[_ListedSnapshot]) -> None: + self.snapshots = snapshots + + def iter_pages(self): + state["sync_page_iterations"] += 1 + state["sync_page_ids"].append([snapshot.id for snapshot in self.snapshots]) + yield self + + def iter_items(self): + state["sync_item_iterations"] += 1 + for snapshot in self.snapshots: + state["sync_item_ids"].append(snapshot.id) + yield snapshot + + @dataclass + class FakeAsyncSnapshotHandle(_CreatedSnapshot): + client: _NoopAsyncClient + + async def delete(self) -> None: + state["async_deleted_ids"].append(self.snapshot_id) + self.status = "deleted" + + @dataclass + class FakeSyncSnapshotHandle(_CreatedSnapshot): + client: _NoopSyncClient + + def delete(self) -> None: + state["sync_deleted_ids"].append(self.snapshot_id) + self.status = "deleted" + + class FakeAsyncSandbox: + def __init__( + self, + sandbox_id: str, + *, + files: dict[str, bytes] | None = None, + source_snapshot_id: str | None = None, + ) -> None: + self.sandbox_id = sandbox_id + self.source_snapshot_id = source_snapshot_id + self.status = "running" + self.files = dict(files or {}) + self.client = _NoopAsyncClient() + + @classmethod + async def create( + cls, + *, + timeout: int, + source: dict[str, str] | None = None, + ) -> FakeAsyncSandbox: + del timeout + state["async_sandbox_index"] += 1 + sandbox_id = f"async-sandbox-{state['async_sandbox_index']}" + files: dict[str, bytes] | None = None + source_snapshot_id = None + if source is not None: + source_snapshot_id = source["snapshot_id"] + source_record = next( + record + for record in state["async_records"] + if record["id"] == source_snapshot_id + ) + files = dict(source_record["files"]) + return cls(sandbox_id, files=files, source_snapshot_id=source_snapshot_id) + + async def write_files(self, files: list[_WriteFile]) -> None: + for file in files: + self.files[file["path"]] = file["content"] + + async def read_file(self, path: str) -> bytes: + return self.files[path] + + async def snapshot(self, *, expiration: int | None = None) -> _CreatedSnapshot: + state["async_snapshot_calls"].append(expiration) + state["async_snapshot_index"] += 1 + created_at = _created_at(state["async_snapshot_index"]) + snapshot_id = f"async-snapshot-{state['async_snapshot_index']}" + expires_at = _expires_at(created_at, expiration) + state["async_records"].append( + { + "id": snapshot_id, + "created_at": created_at, + "expires_at": expires_at, + "files": dict(self.files), + } + ) + self.status = "stopped" + return _CreatedSnapshot( + snapshot_id=snapshot_id, + status="created", + created_at=created_at, + expires_at=expires_at, + ) + + class FakeSyncSandbox: + def __init__( + self, + sandbox_id: str, + *, + files: dict[str, bytes] | None = None, + source_snapshot_id: str | None = None, + ) -> None: + self.sandbox_id = sandbox_id + self.source_snapshot_id = source_snapshot_id + self.status = "running" + self.files = dict(files or {}) + self.client = _NoopSyncClient() + + @classmethod + def create( + cls, + *, + timeout: int, + source: dict[str, str] | None = None, + ) -> FakeSyncSandbox: + del timeout + state["sync_sandbox_index"] += 1 + sandbox_id = f"sync-sandbox-{state['sync_sandbox_index']}" + files: dict[str, bytes] | None = None + source_snapshot_id = None + if source is not None: + source_snapshot_id = source["snapshot_id"] + source_record = next( + record for record in state["sync_records"] if record["id"] == source_snapshot_id + ) + files = dict(source_record["files"]) + return cls(sandbox_id, files=files, source_snapshot_id=source_snapshot_id) + + def write_files(self, files: list[_WriteFile]) -> None: + for file in files: + self.files[file["path"]] = file["content"] + + def read_file(self, path: str) -> bytes: + return self.files[path] + + def snapshot(self, *, expiration: int | None = None) -> _CreatedSnapshot: + state["sync_snapshot_calls"].append(expiration) + state["sync_snapshot_index"] += 1 + created_at = _created_at(state["sync_snapshot_index"] + 10) + snapshot_id = f"sync-snapshot-{state['sync_snapshot_index']}" + expires_at = _expires_at(created_at, expiration) + state["sync_records"].append( + { + "id": snapshot_id, + "created_at": created_at, + "expires_at": expires_at, + "files": dict(self.files), + } + ) + self.status = "stopped" + return _CreatedSnapshot( + snapshot_id=snapshot_id, + status="created", + created_at=created_at, + expires_at=expires_at, + ) + + class FakeAsyncSnapshot: + @staticmethod + async def get(*, snapshot_id: str) -> FakeAsyncSnapshotHandle: + record = next( + record for record in state["async_records"] if record["id"] == snapshot_id + ) + return FakeAsyncSnapshotHandle( + snapshot_id=snapshot_id, + status="created", + created_at=record["created_at"], + expires_at=record["expires_at"], + client=_NoopAsyncClient(), + ) + + @staticmethod + def list(*, limit: int | None = None, since: int | None = None): + state["async_list_calls"].append({"limit": limit, "since": since}) + return FakeAsyncSnapshotPager(_make_async_listed_snapshots()) + + class FakeSnapshot: + @staticmethod + def get(*, snapshot_id: str) -> FakeSyncSnapshotHandle: + record = next(record for record in state["sync_records"] if record["id"] == snapshot_id) + return FakeSyncSnapshotHandle( + snapshot_id=snapshot_id, + status="created", + created_at=record["created_at"], + expires_at=record["expires_at"], + client=_NoopSyncClient(), + ) + + @staticmethod + def list(*, limit: int | None = None, since: int | None = None): + state["sync_list_calls"].append({"limit": limit, "since": since}) + return FakeSyncSnapshotPage(_make_sync_listed_snapshots()) + + return state, FakeAsyncSandbox, FakeAsyncSnapshot, FakeSyncSandbox, FakeSnapshot + + +def test_snapshot_example_uses_snapshot_listing_and_expiration(monkeypatch: pytest.MonkeyPatch): + script_path = _examples_dir / "sandbox_11_snapshots.py" + monkeypatch.setenv("VERCEL_PROJECT_ID", "prj_example_test") + module = _load_example_module("sandbox_11_snapshots_test_module", script_path) + state, fake_async_sandbox, fake_async_snapshot, fake_sync_sandbox, fake_snapshot = ( + _build_snapshot_example_fakes() + ) + + monkeypatch.setattr(module, "AsyncSandbox", fake_async_sandbox) + monkeypatch.setattr(module, "AsyncSnapshot", fake_async_snapshot) + monkeypatch.setattr(module, "Sandbox", fake_sync_sandbox) + monkeypatch.setattr(module, "Snapshot", fake_snapshot) + + asyncio.run(module.async_demo()) + module.sync_demo() + + assert state["async_snapshot_calls"] == [3600, 0] + assert state["sync_snapshot_calls"] == [3600, 0] + assert state["async_records"][0]["expires_at"] == state["async_records"][0]["created_at"] + ( + 3600 * 1000 + ) + assert state["async_records"][1]["expires_at"] is None + assert state["sync_records"][0]["expires_at"] == state["sync_records"][0]["created_at"] + ( + 3600 * 1000 + ) + assert state["sync_records"][1]["expires_at"] is None + assert state["async_list_calls"] == [ + {"limit": 10, "since": state["async_records"][0]["created_at"] - 1}, + {"limit": 10, "since": state["async_records"][0]["created_at"] - 1}, + ] + assert state["sync_list_calls"] == [ + {"limit": 10, "since": state["sync_records"][0]["created_at"] - 1}, + {"limit": 10, "since": state["sync_records"][0]["created_at"] - 1}, + ] + assert state["async_pager_awaits"] == 1 + assert state["async_page_iterations"] == 1 + assert state["async_item_iterations"] == 1 + assert state["sync_page_iterations"] == 1 + assert state["sync_item_iterations"] == 1 + assert state["async_page_ids"] == [["async-snapshot-1", "async-snapshot-2"]] + assert state["async_item_ids"] == ["async-snapshot-1", "async-snapshot-2"] + assert state["sync_page_ids"] == [["sync-snapshot-1", "sync-snapshot-2"]] + assert state["sync_item_ids"] == ["sync-snapshot-1", "sync-snapshot-2"] + assert state["async_deleted_ids"] == ["async-snapshot-1", "async-snapshot-2"] + assert state["sync_deleted_ids"] == ["sync-snapshot-1", "sync-snapshot-2"] + + @pytest.mark.skipif( not _is_ci and not _has_credentials, reason="Requires BLOB_READ_WRITE_TOKEN, VERCEL_TOKEN, VERCEL_PROJECT_ID, and VERCEL_TEAM_ID", From 1678ba25d9f301ea913c6d8a293343cafb8acc0d Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Wed, 25 Mar 2026 06:47:36 -0400 Subject: [PATCH 4/9] Enforce minimum snapshot expiration This is forced by the server, so mirror that as a client invariant as well. --- examples/sandbox_11_snapshots.py | 8 +- src/vercel/sandbox/__init__.py | 9 +- src/vercel/sandbox/sandbox.py | 19 +++- src/vercel/sandbox/snapshot.py | 30 +++++- tests/integration/test_sandbox_sync_async.py | 108 ++++++++++++++++++- tests/test_examples.py | 10 +- 6 files changed, 164 insertions(+), 20 deletions(-) diff --git a/examples/sandbox_11_snapshots.py b/examples/sandbox_11_snapshots.py index da2b22d..5fef492 100644 --- a/examples/sandbox_11_snapshots.py +++ b/examples/sandbox_11_snapshots.py @@ -49,8 +49,8 @@ async def async_demo() -> None: print(f" Created config.json: {config.decode()}") # Step 2: Create a snapshot with an explicit expiration (this STOPS the sandbox) - print("\n[3] Creating snapshot with expiration=3600...") - snapshot = await sandbox1.snapshot(expiration=3600) + print("\n[3] Creating snapshot with expiration=86400000...") + snapshot = await sandbox1.snapshot(expiration=86400000) async_snapshot_ids.append(snapshot.snapshot_id) print(f" Snapshot ID: {snapshot.snapshot_id}") print(f" Status: {snapshot.status}") @@ -164,8 +164,8 @@ def sync_demo() -> None: print(f" Created config.json: {config.decode()}") # Step 2: Create a snapshot with an explicit expiration (this STOPS the sandbox) - print("\n[3] Creating snapshot with expiration=3600...") - snapshot = sandbox1.snapshot(expiration=3600) + print("\n[3] Creating snapshot with expiration=86400000...") + snapshot = sandbox1.snapshot(expiration=86400000) sync_snapshot_ids.append(snapshot.snapshot_id) print(f" Snapshot ID: {snapshot.snapshot_id}") print(f" Status: {snapshot.status}") diff --git a/src/vercel/sandbox/__init__.py b/src/vercel/sandbox/__init__.py index 76cad35..fe6bf58 100644 --- a/src/vercel/sandbox/__init__.py +++ b/src/vercel/sandbox/__init__.py @@ -18,7 +18,12 @@ from .models import GitSource, SnapshotSource, Source, TarballSource from .page import AsyncSandboxPage, AsyncSnapshotPage, SandboxPage, SnapshotPage from .sandbox import AsyncSandbox, Sandbox -from .snapshot import AsyncSnapshot, Snapshot +from .snapshot import ( + MIN_SNAPSHOT_EXPIRATION_MS, + AsyncSnapshot, + Snapshot, + SnapshotExpiration, +) __all__ = [ "SandboxError", @@ -35,6 +40,8 @@ "SandboxPage", "SnapshotPage", "Snapshot", + "SnapshotExpiration", + "MIN_SNAPSHOT_EXPIRATION_MS", "AsyncCommand", "AsyncCommandFinished", "Command", diff --git a/src/vercel/sandbox/sandbox.py b/src/vercel/sandbox/sandbox.py index aa1aebb..b50b99e 100644 --- a/src/vercel/sandbox/sandbox.py +++ b/src/vercel/sandbox/sandbox.py @@ -30,7 +30,12 @@ ) from .page import AsyncSandboxPage, AsyncSandboxPager, SandboxPage from .pty.shell import start_interactive_shell -from .snapshot import AsyncSnapshot, Snapshot as SnapshotClass +from .snapshot import ( + AsyncSnapshot, + Snapshot as SnapshotClass, + SnapshotExpiration, + normalize_snapshot_expiration, +) def _normalize_source(source: Source | None) -> dict[str, Any] | None: @@ -404,16 +409,19 @@ async def extend_timeout(self, duration: int) -> None: response = await self.client.extend_timeout(sandbox_id=self.sandbox.id, duration=duration) self.sandbox = response.sandbox - async def snapshot(self, *, expiration: int | None = None) -> AsyncSnapshot: + async def snapshot( + self, *, expiration: int | SnapshotExpiration | None = None + ) -> AsyncSnapshot: """ Create a snapshot from this currently running sandbox. New sandboxes can then be created from this snapshot. Note: this sandbox will be stopped as part of the snapshot creation process. """ + normalized_expiration = normalize_snapshot_expiration(expiration) response = await self.client.create_snapshot( sandbox_id=self.sandbox.id, - expiration=expiration, + expiration=normalized_expiration, ) self.sandbox = response.sandbox return AsyncSnapshot(client=self.client, snapshot=response.snapshot) @@ -767,17 +775,18 @@ def extend_timeout(self, duration: int) -> None: ) self.sandbox = response.sandbox - def snapshot(self, *, expiration: int | None = None) -> SnapshotClass: + def snapshot(self, *, expiration: int | SnapshotExpiration | None = None) -> SnapshotClass: """ Create a snapshot from this currently running sandbox. New sandboxes can then be created from this snapshot. Note: this sandbox will be stopped as part of the snapshot creation process. """ + normalized_expiration = normalize_snapshot_expiration(expiration) response = iter_coroutine( self.client.create_snapshot( sandbox_id=self.sandbox.id, - expiration=expiration, + expiration=normalized_expiration, ) ) self.sandbox = response.sandbox diff --git a/src/vercel/sandbox/snapshot.py b/src/vercel/sandbox/snapshot.py index 8b82d09..e89fa41 100644 --- a/src/vercel/sandbox/snapshot.py +++ b/src/vercel/sandbox/snapshot.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from datetime import datetime -from typing import Literal +from typing import Final, Literal from vercel._internal.iter_coroutine import iter_coroutine from vercel._internal.sandbox import AsyncSandboxOpsClient, SyncSandboxOpsClient @@ -12,6 +12,34 @@ from ..oidc import Credentials, get_credentials from .page import AsyncSnapshotPage, AsyncSnapshotPager, SnapshotPage +MIN_SNAPSHOT_EXPIRATION_MS: Final[int] = 86_400_000 + + +class SnapshotExpiration(int): + """Snapshot expiration in milliseconds. + + Valid values are ``0`` for no expiration or any value greater than or equal + to ``86_400_000`` (24 hours). + """ + + def __new__(cls, value: int) -> SnapshotExpiration: + value = int(value) + if value != 0 and value < MIN_SNAPSHOT_EXPIRATION_MS: + raise ValueError( + "Snapshot expiration must be 0 for no expiration or >= 86400000 milliseconds" + ) + return int.__new__(cls, value) + + +def normalize_snapshot_expiration( + expiration: int | SnapshotExpiration | None, +) -> SnapshotExpiration | None: + if expiration is None: + return None + if isinstance(expiration, SnapshotExpiration): + return expiration + return SnapshotExpiration(expiration) + async def _build_async_snapshot_page( *, diff --git a/tests/integration/test_sandbox_sync_async.py b/tests/integration/test_sandbox_sync_async.py index fee6b8e..5b17258 100644 --- a/tests/integration/test_sandbox_sync_async.py +++ b/tests/integration/test_sandbox_sync_async.py @@ -18,6 +18,7 @@ NetworkPolicySubnets, NetworkTransformer, ) +from vercel.sandbox.snapshot import normalize_snapshot_expiration # Base URL for Vercel Sandbox API SANDBOX_API_BASE = "https://api.vercel.com" @@ -1087,6 +1088,34 @@ def handler(request: httpx.Request) -> httpx.Response: class TestSnapshotList: """Test snapshot listing operations.""" + def test_snapshot_expiration_allows_zero(self): + from vercel.sandbox import SnapshotExpiration + + expiration = SnapshotExpiration(0) + + assert expiration == 0 + + def test_snapshot_expiration_allows_minimum(self): + from vercel.sandbox import MIN_SNAPSHOT_EXPIRATION_MS, SnapshotExpiration + + expiration = SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS) + + assert expiration == MIN_SNAPSHOT_EXPIRATION_MS + + def test_snapshot_expiration_rejects_values_below_minimum(self): + from vercel.sandbox import MIN_SNAPSHOT_EXPIRATION_MS, SnapshotExpiration + + with pytest.raises(ValueError, match="0 for no expiration or >= 86400000"): + SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS - 1) + + def test_normalize_snapshot_expiration_coerces_int(self): + from vercel.sandbox import MIN_SNAPSHOT_EXPIRATION_MS, SnapshotExpiration + + expiration = normalize_snapshot_expiration(MIN_SNAPSHOT_EXPIRATION_MS) + + assert expiration == SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS) + assert isinstance(expiration, SnapshotExpiration) + @respx.mock def test_list_snapshot_sync_serializes_filters_and_iterates_pages( self, mock_env_clear, mock_sandbox_snapshot_response @@ -2823,11 +2852,11 @@ def test_create_snapshot_sync_with_expiration( project_id="prj_test123", ) - snapshot = sandbox.snapshot(expiration=3600) + snapshot = sandbox.snapshot(expiration=86_400_000) assert route.called body = json.loads(route.calls.last.request.content) - assert body == {"expiration": 3600} + assert body == {"expiration": 86_400_000} assert snapshot.created_at == mock_sandbox_snapshot_response["createdAt"] assert sandbox.status == "stopped" @@ -2880,6 +2909,41 @@ def test_create_snapshot_sync_with_zero_expiration( sandbox.client.close() + @respx.mock + def test_create_snapshot_sync_rejects_invalid_expiration( + self, mock_env_clear, mock_sandbox_get_response + ): + """Test sync snapshot creation validates the minimum expiration.""" + from vercel.sandbox import Sandbox + + sandbox_id = "sbx_test123456" + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": mock_sandbox_get_response, + "routes": [], + }, + ) + ) + route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( + return_value=httpx.Response(200, json={}) + ) + + sandbox = Sandbox.get( + sandbox_id=sandbox_id, + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + with pytest.raises(ValueError, match="0 for no expiration or >= 86400000"): + sandbox.snapshot(expiration=3_600) + + assert not route.called + sandbox.client.close() + @respx.mock @pytest.mark.asyncio async def test_create_snapshot_async_without_expiration( @@ -2969,11 +3033,11 @@ async def test_create_snapshot_async_with_expiration( project_id="prj_test123", ) - snapshot = await sandbox.snapshot(expiration=3600) + snapshot = await sandbox.snapshot(expiration=86_400_000) assert route.called body = json.loads(route.calls.last.request.content) - assert body == {"expiration": 3600} + assert body == {"expiration": 86_400_000} assert snapshot.created_at == mock_sandbox_snapshot_response["createdAt"] assert sandbox.status == "stopped" @@ -3031,6 +3095,42 @@ async def test_create_snapshot_async_with_zero_expiration_and_optional_expires_a await sandbox.client.aclose() + @respx.mock + @pytest.mark.asyncio + async def test_create_snapshot_async_rejects_invalid_expiration( + self, mock_env_clear, mock_sandbox_get_response + ): + """Test async snapshot creation validates the minimum expiration.""" + from vercel.sandbox import AsyncSandbox + + sandbox_id = "sbx_test123456" + + respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( + return_value=httpx.Response( + 200, + json={ + "sandbox": mock_sandbox_get_response, + "routes": [], + }, + ) + ) + route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( + return_value=httpx.Response(200, json={}) + ) + + sandbox = await AsyncSandbox.get( + sandbox_id=sandbox_id, + token="test_token", + team_id="team_test123", + project_id="prj_test123", + ) + + with pytest.raises(ValueError, match="0 for no expiration or >= 86400000"): + await sandbox.snapshot(expiration=3_600) + + assert not route.called + await sandbox.client.aclose() + class TestSandboxExtendTimeout: """Test sandbox timeout extension.""" diff --git a/tests/test_examples.py b/tests/test_examples.py index d195dc3..7334735 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -99,7 +99,7 @@ def _expires_at(created_at: int, expiration: int | None) -> int | None: if expiration in (None, 0): return None assert expiration is not None - return created_at + expiration * 1000 + return created_at + expiration def _make_async_listed_snapshots() -> list[_ListedSnapshot]: return [ @@ -375,14 +375,14 @@ def test_snapshot_example_uses_snapshot_listing_and_expiration(monkeypatch: pyte asyncio.run(module.async_demo()) module.sync_demo() - assert state["async_snapshot_calls"] == [3600, 0] - assert state["sync_snapshot_calls"] == [3600, 0] + assert state["async_snapshot_calls"] == [86_400_000, 0] + assert state["sync_snapshot_calls"] == [86_400_000, 0] assert state["async_records"][0]["expires_at"] == state["async_records"][0]["created_at"] + ( - 3600 * 1000 + 86_400_000 ) assert state["async_records"][1]["expires_at"] is None assert state["sync_records"][0]["expires_at"] == state["sync_records"][0]["created_at"] + ( - 3600 * 1000 + 86_400_000 ) assert state["sync_records"][1]["expires_at"] is None assert state["async_list_calls"] == [ From 6df7d6c2651c36ffd3776964fc9684b66cac757b Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Wed, 25 Mar 2026 06:49:06 -0400 Subject: [PATCH 5/9] Add docstring note about None for expiration --- src/vercel/sandbox/snapshot.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vercel/sandbox/snapshot.py b/src/vercel/sandbox/snapshot.py index e89fa41..22f3914 100644 --- a/src/vercel/sandbox/snapshot.py +++ b/src/vercel/sandbox/snapshot.py @@ -133,7 +133,7 @@ def created_at(self) -> int: @property def expires_at(self) -> int | None: - """Timestamp when the snapshot expires.""" + """Timestamp when the snapshot expires, or None for no expiration.""" return self.snapshot.expires_at @staticmethod @@ -212,7 +212,7 @@ def created_at(self) -> int: @property def expires_at(self) -> int | None: - """Timestamp when the snapshot expires.""" + """Timestamp when the snapshot expires, or None for no expiration.""" return self.snapshot.expires_at @staticmethod From 421c724b47e0241a034760754ce7c0f519a5bea5 Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Fri, 27 Mar 2026 12:38:46 -0700 Subject: [PATCH 6/9] Refactor sandbox list timestamp normalization Move list timestamp normalization into shared list param construction so sandbox and snapshot callers stop duplicating since/until coercion. The shared base keeps normalized millisecond values on the params object, preserving request behavior while centralizing the conversion logic. --- src/vercel/_internal/sandbox/pagination.py | 38 ++++++++++++++-------- src/vercel/sandbox/sandbox.py | 10 +++--- src/vercel/sandbox/snapshot.py | 10 +++--- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/vercel/_internal/sandbox/pagination.py b/src/vercel/_internal/sandbox/pagination.py index 4205b68..1f48a73 100644 --- a/src/vercel/_internal/sandbox/pagination.py +++ b/src/vercel/_internal/sandbox/pagination.py @@ -11,13 +11,28 @@ class SandboxPageInfo: until: int -@dataclass(frozen=True, slots=True) -class SandboxListParams: - project_id: str | None = None - limit: int | None = None - since: int | None = None - until: int | None = None - +@dataclass(frozen=True, slots=True, init=False) +class _BaseListParams: + project_id: str | None + limit: int | None + since: int | None + until: int | None + + def __init__( + self, + project_id: str | None = None, + limit: int | None = None, + since: datetime | int | None = None, + until: datetime | int | None = None, + ) -> None: + object.__setattr__(self, "project_id", project_id) + object.__setattr__(self, "limit", limit) + object.__setattr__(self, "since", normalize_list_timestamp(since)) + object.__setattr__(self, "until", normalize_list_timestamp(until)) + + +@dataclass(frozen=True, slots=True, init=False) +class SandboxListParams(_BaseListParams): def with_until(self, until: int) -> SandboxListParams: return SandboxListParams( project_id=self.project_id, @@ -38,13 +53,8 @@ class SnapshotPageInfo: until: int -@dataclass(frozen=True, slots=True) -class SnapshotListParams: - project_id: str | None = None - limit: int | None = None - since: int | None = None - until: int | None = None - +@dataclass(frozen=True, slots=True, init=False) +class SnapshotListParams(_BaseListParams): def with_until(self, until: int) -> SnapshotListParams: return SnapshotListParams( project_id=self.project_id, diff --git a/src/vercel/sandbox/sandbox.py b/src/vercel/sandbox/sandbox.py index b50b99e..3285662 100644 --- a/src/vercel/sandbox/sandbox.py +++ b/src/vercel/sandbox/sandbox.py @@ -19,7 +19,7 @@ ApiNetworkPolicy, NetworkPolicy, ) -from vercel._internal.sandbox.pagination import SandboxListParams, normalize_list_timestamp +from vercel._internal.sandbox.pagination import SandboxListParams from ..oidc import Credentials, get_credentials from .command import ( @@ -249,8 +249,8 @@ def list( params = SandboxListParams( project_id=creds.project_id, limit=limit, - since=normalize_list_timestamp(since), - until=normalize_list_timestamp(until), + since=since, + until=until, ) return AsyncSandboxPager( _fetch_first_page=lambda: _build_async_sandbox_page(creds=creds, params=params) @@ -613,8 +613,8 @@ def list( params = SandboxListParams( project_id=creds.project_id, limit=limit, - since=normalize_list_timestamp(since), - until=normalize_list_timestamp(until), + since=since, + until=until, ) return iter_coroutine(_build_sync_sandbox_page(creds=creds, params=params)) diff --git a/src/vercel/sandbox/snapshot.py b/src/vercel/sandbox/snapshot.py index 22f3914..a36125e 100644 --- a/src/vercel/sandbox/snapshot.py +++ b/src/vercel/sandbox/snapshot.py @@ -7,7 +7,7 @@ from vercel._internal.iter_coroutine import iter_coroutine from vercel._internal.sandbox import AsyncSandboxOpsClient, SyncSandboxOpsClient from vercel._internal.sandbox.models import Snapshot as SnapshotModel -from vercel._internal.sandbox.pagination import SnapshotListParams, normalize_list_timestamp +from vercel._internal.sandbox.pagination import SnapshotListParams from ..oidc import Credentials, get_credentials from .page import AsyncSnapshotPage, AsyncSnapshotPager, SnapshotPage @@ -165,8 +165,8 @@ def list( params = SnapshotListParams( project_id=creds.project_id, limit=limit, - since=normalize_list_timestamp(since), - until=normalize_list_timestamp(until), + since=since, + until=until, ) return AsyncSnapshotPager( _fetch_first_page=lambda: _build_async_snapshot_page(creds=creds, params=params) @@ -244,8 +244,8 @@ def list( params = SnapshotListParams( project_id=creds.project_id, limit=limit, - since=normalize_list_timestamp(since), - until=normalize_list_timestamp(until), + since=since, + until=until, ) return iter_coroutine(_build_sync_snapshot_page(creds=creds, params=params)) From 2dbebdeeb78e0d37e3908ceb66b7ceca19c6e98a Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Fri, 27 Mar 2026 12:49:50 -0700 Subject: [PATCH 7/9] Move example tests into examples SLOPPPPPP Instead of adding actual examples, the agent just added tests to the examples runner test suite. Fixed it. --- examples/sandbox_11_snapshots.py | 16 +- tests/test_examples.py | 384 ------------------------------- 2 files changed, 15 insertions(+), 385 deletions(-) diff --git a/examples/sandbox_11_snapshots.py b/examples/sandbox_11_snapshots.py index 5fef492..f87bd95 100644 --- a/examples/sandbox_11_snapshots.py +++ b/examples/sandbox_11_snapshots.py @@ -57,6 +57,9 @@ async def async_demo() -> None: print(f" Created At: {snapshot.created_at}") print(f" Expires At: {snapshot.expires_at}") print(f" Sandbox status after snapshot: {sandbox1.status}") + assert snapshot.expires_at == snapshot.created_at + 86_400_000, ( + "expiring async snapshot should report an expires_at timestamp" + ) finally: await sandbox1.client.aclose() @@ -90,6 +93,9 @@ async def async_demo() -> None: print(f" Snapshot ID: {persistent_snapshot.snapshot_id}") print(f" Created At: {persistent_snapshot.created_at}") print(f" Expires At: {persistent_snapshot.expires_at}") + assert persistent_snapshot.expires_at is None, ( + "persistent async snapshot should not have an expires_at timestamp" + ) finally: await sandbox2.client.aclose() @@ -117,7 +123,7 @@ async def async_demo() -> None: ) item_ids: list[str] = [] - async for listed in AsyncSnapshot.list(limit=10, since=since): + async for listed in AsyncSnapshot.list(limit=10, since=since).iter_items(): item_ids.append(listed.id) if all(snapshot_id in item_ids for snapshot_id in async_snapshot_ids): break @@ -134,6 +140,7 @@ async def async_demo() -> None: try: await fetched.delete() print(f" Deleted snapshot {snapshot_id}, status: {fetched.status}") + assert fetched.status == "deleted", "async snapshot should be deleted" finally: await fetched.client.aclose() @@ -172,6 +179,9 @@ def sync_demo() -> None: print(f" Created At: {snapshot.created_at}") print(f" Expires At: {snapshot.expires_at}") print(f" Sandbox status after snapshot: {sandbox1.status}") + assert snapshot.expires_at == snapshot.created_at + 86_400_000, ( + "expiring sync snapshot should report an expires_at timestamp" + ) finally: sandbox1.client.close() @@ -204,6 +214,9 @@ def sync_demo() -> None: print(f" Snapshot ID: {persistent_snapshot.snapshot_id}") print(f" Created At: {persistent_snapshot.created_at}") print(f" Expires At: {persistent_snapshot.expires_at}") + assert persistent_snapshot.expires_at is None, ( + "persistent sync snapshot should not have an expires_at timestamp" + ) finally: sandbox2.client.close() @@ -245,6 +258,7 @@ def sync_demo() -> None: try: fetched.delete() print(f" Deleted snapshot {snapshot_id}, status: {fetched.status}") + assert fetched.status == "deleted", "sync snapshot should be deleted" finally: fetched.client.close() diff --git a/tests/test_examples.py b/tests/test_examples.py index 7334735..2b6aecc 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -1,13 +1,7 @@ -from __future__ import annotations - -import asyncio -import importlib.util import os import subprocess import sys -from dataclasses import dataclass from pathlib import Path -from typing import TypedDict import pytest @@ -28,384 +22,6 @@ ) -@dataclass -class _ListedSnapshot: - id: str - created_at: int - expires_at: int | None - - -@dataclass -class _CreatedSnapshot: - snapshot_id: str - status: str - created_at: int - expires_at: int | None - - -class _WriteFile(TypedDict): - path: str - content: bytes - - -class _NoopAsyncClient: - async def aclose(self) -> None: - return None - - -class _NoopSyncClient: - def close(self) -> None: - return None - - -def _load_example_module(module_name: str, script_path: Path): - spec = importlib.util.spec_from_file_location(module_name, script_path) - assert spec is not None - assert spec.loader is not None - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - return module - - -def _build_snapshot_example_fakes(): - state = { - "async_records": [], - "sync_records": [], - "async_snapshot_calls": [], - "sync_snapshot_calls": [], - "async_list_calls": [], - "sync_list_calls": [], - "async_pager_awaits": 0, - "async_page_iterations": 0, - "async_item_iterations": 0, - "sync_page_iterations": 0, - "sync_item_iterations": 0, - "async_page_ids": [], - "async_item_ids": [], - "sync_page_ids": [], - "sync_item_ids": [], - "async_deleted_ids": [], - "sync_deleted_ids": [], - "async_sandbox_index": 0, - "sync_sandbox_index": 0, - "async_snapshot_index": 0, - "sync_snapshot_index": 0, - } - - def _created_at(index: int) -> int: - return 1_705_320_600_000 + index - - def _expires_at(created_at: int, expiration: int | None) -> int | None: - if expiration in (None, 0): - return None - assert expiration is not None - return created_at + expiration - - def _make_async_listed_snapshots() -> list[_ListedSnapshot]: - return [ - _ListedSnapshot( - id=record["id"], - created_at=record["created_at"], - expires_at=record["expires_at"], - ) - for record in state["async_records"] - ] - - def _make_sync_listed_snapshots() -> list[_ListedSnapshot]: - return [ - _ListedSnapshot( - id=record["id"], - created_at=record["created_at"], - expires_at=record["expires_at"], - ) - for record in state["sync_records"] - ] - - class FakeAsyncSnapshotPage: - def __init__(self, snapshots: list[_ListedSnapshot]) -> None: - self.snapshots = snapshots - - async def iter_pages(self): - state["async_page_iterations"] += 1 - state["async_page_ids"].append([snapshot.id for snapshot in self.snapshots]) - yield self - - async def iter_items(self): - state["async_item_iterations"] += 1 - for snapshot in self.snapshots: - state["async_item_ids"].append(snapshot.id) - yield snapshot - - class FakeAsyncSnapshotPager: - def __init__(self, snapshots: list[_ListedSnapshot]) -> None: - self._page = FakeAsyncSnapshotPage(snapshots) - - async def _get_first_page(self) -> FakeAsyncSnapshotPage: - state["async_pager_awaits"] += 1 - return self._page - - def __await__(self): - return self._get_first_page().__await__() - - def __aiter__(self): - return self.iter_items() - - async def iter_pages(self): - state["async_page_iterations"] += 1 - state["async_page_ids"].append([snapshot.id for snapshot in self._page.snapshots]) - yield self._page - - async def iter_items(self): - state["async_item_iterations"] += 1 - for snapshot in self._page.snapshots: - state["async_item_ids"].append(snapshot.id) - yield snapshot - - class FakeSyncSnapshotPage: - def __init__(self, snapshots: list[_ListedSnapshot]) -> None: - self.snapshots = snapshots - - def iter_pages(self): - state["sync_page_iterations"] += 1 - state["sync_page_ids"].append([snapshot.id for snapshot in self.snapshots]) - yield self - - def iter_items(self): - state["sync_item_iterations"] += 1 - for snapshot in self.snapshots: - state["sync_item_ids"].append(snapshot.id) - yield snapshot - - @dataclass - class FakeAsyncSnapshotHandle(_CreatedSnapshot): - client: _NoopAsyncClient - - async def delete(self) -> None: - state["async_deleted_ids"].append(self.snapshot_id) - self.status = "deleted" - - @dataclass - class FakeSyncSnapshotHandle(_CreatedSnapshot): - client: _NoopSyncClient - - def delete(self) -> None: - state["sync_deleted_ids"].append(self.snapshot_id) - self.status = "deleted" - - class FakeAsyncSandbox: - def __init__( - self, - sandbox_id: str, - *, - files: dict[str, bytes] | None = None, - source_snapshot_id: str | None = None, - ) -> None: - self.sandbox_id = sandbox_id - self.source_snapshot_id = source_snapshot_id - self.status = "running" - self.files = dict(files or {}) - self.client = _NoopAsyncClient() - - @classmethod - async def create( - cls, - *, - timeout: int, - source: dict[str, str] | None = None, - ) -> FakeAsyncSandbox: - del timeout - state["async_sandbox_index"] += 1 - sandbox_id = f"async-sandbox-{state['async_sandbox_index']}" - files: dict[str, bytes] | None = None - source_snapshot_id = None - if source is not None: - source_snapshot_id = source["snapshot_id"] - source_record = next( - record - for record in state["async_records"] - if record["id"] == source_snapshot_id - ) - files = dict(source_record["files"]) - return cls(sandbox_id, files=files, source_snapshot_id=source_snapshot_id) - - async def write_files(self, files: list[_WriteFile]) -> None: - for file in files: - self.files[file["path"]] = file["content"] - - async def read_file(self, path: str) -> bytes: - return self.files[path] - - async def snapshot(self, *, expiration: int | None = None) -> _CreatedSnapshot: - state["async_snapshot_calls"].append(expiration) - state["async_snapshot_index"] += 1 - created_at = _created_at(state["async_snapshot_index"]) - snapshot_id = f"async-snapshot-{state['async_snapshot_index']}" - expires_at = _expires_at(created_at, expiration) - state["async_records"].append( - { - "id": snapshot_id, - "created_at": created_at, - "expires_at": expires_at, - "files": dict(self.files), - } - ) - self.status = "stopped" - return _CreatedSnapshot( - snapshot_id=snapshot_id, - status="created", - created_at=created_at, - expires_at=expires_at, - ) - - class FakeSyncSandbox: - def __init__( - self, - sandbox_id: str, - *, - files: dict[str, bytes] | None = None, - source_snapshot_id: str | None = None, - ) -> None: - self.sandbox_id = sandbox_id - self.source_snapshot_id = source_snapshot_id - self.status = "running" - self.files = dict(files or {}) - self.client = _NoopSyncClient() - - @classmethod - def create( - cls, - *, - timeout: int, - source: dict[str, str] | None = None, - ) -> FakeSyncSandbox: - del timeout - state["sync_sandbox_index"] += 1 - sandbox_id = f"sync-sandbox-{state['sync_sandbox_index']}" - files: dict[str, bytes] | None = None - source_snapshot_id = None - if source is not None: - source_snapshot_id = source["snapshot_id"] - source_record = next( - record for record in state["sync_records"] if record["id"] == source_snapshot_id - ) - files = dict(source_record["files"]) - return cls(sandbox_id, files=files, source_snapshot_id=source_snapshot_id) - - def write_files(self, files: list[_WriteFile]) -> None: - for file in files: - self.files[file["path"]] = file["content"] - - def read_file(self, path: str) -> bytes: - return self.files[path] - - def snapshot(self, *, expiration: int | None = None) -> _CreatedSnapshot: - state["sync_snapshot_calls"].append(expiration) - state["sync_snapshot_index"] += 1 - created_at = _created_at(state["sync_snapshot_index"] + 10) - snapshot_id = f"sync-snapshot-{state['sync_snapshot_index']}" - expires_at = _expires_at(created_at, expiration) - state["sync_records"].append( - { - "id": snapshot_id, - "created_at": created_at, - "expires_at": expires_at, - "files": dict(self.files), - } - ) - self.status = "stopped" - return _CreatedSnapshot( - snapshot_id=snapshot_id, - status="created", - created_at=created_at, - expires_at=expires_at, - ) - - class FakeAsyncSnapshot: - @staticmethod - async def get(*, snapshot_id: str) -> FakeAsyncSnapshotHandle: - record = next( - record for record in state["async_records"] if record["id"] == snapshot_id - ) - return FakeAsyncSnapshotHandle( - snapshot_id=snapshot_id, - status="created", - created_at=record["created_at"], - expires_at=record["expires_at"], - client=_NoopAsyncClient(), - ) - - @staticmethod - def list(*, limit: int | None = None, since: int | None = None): - state["async_list_calls"].append({"limit": limit, "since": since}) - return FakeAsyncSnapshotPager(_make_async_listed_snapshots()) - - class FakeSnapshot: - @staticmethod - def get(*, snapshot_id: str) -> FakeSyncSnapshotHandle: - record = next(record for record in state["sync_records"] if record["id"] == snapshot_id) - return FakeSyncSnapshotHandle( - snapshot_id=snapshot_id, - status="created", - created_at=record["created_at"], - expires_at=record["expires_at"], - client=_NoopSyncClient(), - ) - - @staticmethod - def list(*, limit: int | None = None, since: int | None = None): - state["sync_list_calls"].append({"limit": limit, "since": since}) - return FakeSyncSnapshotPage(_make_sync_listed_snapshots()) - - return state, FakeAsyncSandbox, FakeAsyncSnapshot, FakeSyncSandbox, FakeSnapshot - - -def test_snapshot_example_uses_snapshot_listing_and_expiration(monkeypatch: pytest.MonkeyPatch): - script_path = _examples_dir / "sandbox_11_snapshots.py" - monkeypatch.setenv("VERCEL_PROJECT_ID", "prj_example_test") - module = _load_example_module("sandbox_11_snapshots_test_module", script_path) - state, fake_async_sandbox, fake_async_snapshot, fake_sync_sandbox, fake_snapshot = ( - _build_snapshot_example_fakes() - ) - - monkeypatch.setattr(module, "AsyncSandbox", fake_async_sandbox) - monkeypatch.setattr(module, "AsyncSnapshot", fake_async_snapshot) - monkeypatch.setattr(module, "Sandbox", fake_sync_sandbox) - monkeypatch.setattr(module, "Snapshot", fake_snapshot) - - asyncio.run(module.async_demo()) - module.sync_demo() - - assert state["async_snapshot_calls"] == [86_400_000, 0] - assert state["sync_snapshot_calls"] == [86_400_000, 0] - assert state["async_records"][0]["expires_at"] == state["async_records"][0]["created_at"] + ( - 86_400_000 - ) - assert state["async_records"][1]["expires_at"] is None - assert state["sync_records"][0]["expires_at"] == state["sync_records"][0]["created_at"] + ( - 86_400_000 - ) - assert state["sync_records"][1]["expires_at"] is None - assert state["async_list_calls"] == [ - {"limit": 10, "since": state["async_records"][0]["created_at"] - 1}, - {"limit": 10, "since": state["async_records"][0]["created_at"] - 1}, - ] - assert state["sync_list_calls"] == [ - {"limit": 10, "since": state["sync_records"][0]["created_at"] - 1}, - {"limit": 10, "since": state["sync_records"][0]["created_at"] - 1}, - ] - assert state["async_pager_awaits"] == 1 - assert state["async_page_iterations"] == 1 - assert state["async_item_iterations"] == 1 - assert state["sync_page_iterations"] == 1 - assert state["sync_item_iterations"] == 1 - assert state["async_page_ids"] == [["async-snapshot-1", "async-snapshot-2"]] - assert state["async_item_ids"] == ["async-snapshot-1", "async-snapshot-2"] - assert state["sync_page_ids"] == [["sync-snapshot-1", "sync-snapshot-2"]] - assert state["sync_item_ids"] == ["sync-snapshot-1", "sync-snapshot-2"] - assert state["async_deleted_ids"] == ["async-snapshot-1", "async-snapshot-2"] - assert state["sync_deleted_ids"] == ["sync-snapshot-1", "sync-snapshot-2"] - - @pytest.mark.skipif( not _is_ci and not _has_credentials, reason="Requires BLOB_READ_WRITE_TOKEN, VERCEL_TOKEN, VERCEL_PROJECT_ID, and VERCEL_TEAM_ID", From b9b9132f0a62d08c4a41af6a5d3f3e53e72c82ab Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Fri, 27 Mar 2026 13:15:17 -0700 Subject: [PATCH 8/9] Split the tests up We were mixing a lot of pagination tests into the main sandbox integration tests, so split the more "unit" like tests out. --- tests/integration/test_sandbox_sync_async.py | 296 ------------------- tests/unit/test_sandbox_page.py | 135 +++++++++ tests/unit/test_sandbox_snapshot.py | 30 ++ 3 files changed, 165 insertions(+), 296 deletions(-) create mode 100644 tests/unit/test_sandbox_page.py create mode 100644 tests/unit/test_sandbox_snapshot.py diff --git a/tests/integration/test_sandbox_sync_async.py b/tests/integration/test_sandbox_sync_async.py index 5b17258..494c24c 100644 --- a/tests/integration/test_sandbox_sync_async.py +++ b/tests/integration/test_sandbox_sync_async.py @@ -18,7 +18,6 @@ NetworkPolicySubnets, NetworkTransformer, ) -from vercel.sandbox.snapshot import normalize_snapshot_expiration # Base URL for Vercel Sandbox API SANDBOX_API_BASE = "https://api.vercel.com" @@ -1088,34 +1087,6 @@ def handler(request: httpx.Request) -> httpx.Response: class TestSnapshotList: """Test snapshot listing operations.""" - def test_snapshot_expiration_allows_zero(self): - from vercel.sandbox import SnapshotExpiration - - expiration = SnapshotExpiration(0) - - assert expiration == 0 - - def test_snapshot_expiration_allows_minimum(self): - from vercel.sandbox import MIN_SNAPSHOT_EXPIRATION_MS, SnapshotExpiration - - expiration = SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS) - - assert expiration == MIN_SNAPSHOT_EXPIRATION_MS - - def test_snapshot_expiration_rejects_values_below_minimum(self): - from vercel.sandbox import MIN_SNAPSHOT_EXPIRATION_MS, SnapshotExpiration - - with pytest.raises(ValueError, match="0 for no expiration or >= 86400000"): - SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS - 1) - - def test_normalize_snapshot_expiration_coerces_int(self): - from vercel.sandbox import MIN_SNAPSHOT_EXPIRATION_MS, SnapshotExpiration - - expiration = normalize_snapshot_expiration(MIN_SNAPSHOT_EXPIRATION_MS) - - assert expiration == SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS) - assert isinstance(expiration, SnapshotExpiration) - @respx.mock def test_list_snapshot_sync_serializes_filters_and_iterates_pages( self, mock_env_clear, mock_sandbox_snapshot_response @@ -1209,60 +1180,6 @@ def handler(request: httpx.Request) -> httpx.Response: assert page.pagination.count == 3 assert page.next_page_info() is not None assert page.next_page_info().until == int(next_until) - assert [ - [snapshot.id for snapshot in current_page.snapshots] - for current_page in page.iter_pages() - ] == [ - ["snap_list_1", "snap_list_2"], - ["snap_list_3"], - ] - assert requests == [ - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": expected_since, - "until": expected_until, - }, - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": expected_since, - "until": next_until, - }, - ] - - requests.clear() - items_page = Snapshot.list( - token="test_token", - team_id="team_test123", - project_id=project, - limit=limit, - since=since, - until=until, - ) - assert [snapshot.id for snapshot in items_page.iter_items()] == [ - "snap_list_1", - "snap_list_2", - "snap_list_3", - ] - assert requests == [ - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": expected_since, - "until": expected_until, - }, - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": expected_since, - "until": next_until, - }, - ] @respx.mock @pytest.mark.asyncio @@ -1357,147 +1274,6 @@ def handler(request: httpx.Request) -> httpx.Response: assert page.next_page_info() is not None assert page.next_page_info().until == int(next_until) - pages = await _collect_async_pages(page) - assert [[snapshot.id for snapshot in current_page.snapshots] for current_page in pages] == [ - ["snap_async_1", "snap_async_2"], - ["snap_async_3"], - ] - - items_page = await AsyncSnapshot.list( - token="test_token", - team_id="team_test123", - project_id=project, - limit=limit, - since=since, - until=until, - ) - items = await _collect_async_items(items_page) - assert [snapshot.id for snapshot in items] == [ - "snap_async_1", - "snap_async_2", - "snap_async_3", - ] - assert requests == [ - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": str(since), - "until": str(until), - }, - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": str(since), - "until": next_until, - }, - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": str(since), - "until": str(until), - }, - { - "teamId": "team_test123", - "project": project, - "limit": str(limit), - "since": str(since), - "until": next_until, - }, - ] - - @respx.mock - def test_list_snapshot_sync_single_page_does_not_fetch_more( - self, mock_env_clear, mock_sandbox_snapshot_response - ): - from vercel.sandbox import Snapshot - - response = { - "snapshots": [ - _snapshot_with_id( - mock_sandbox_snapshot_response, - "snap_single_page", - created_at=1705320600000, - ), - ], - "pagination": { - "count": 1, - "next": None, - "prev": None, - }, - } - requests: list[dict[str, str]] = [] - - def handler(request: httpx.Request) -> httpx.Response: - requests.append(dict(request.url.params)) - return httpx.Response(200, json=response) - - respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/snapshots").mock(side_effect=handler) - - page = Snapshot.list( - token="test_token", - team_id="team_test123", - project_id="prj_test123", - ) - - assert page.has_next_page() is False - assert page.next_page_info() is None - assert page.get_next_page() is None - assert [ - [snapshot.id for snapshot in current_page.snapshots] - for current_page in page.iter_pages() - ] == [["snap_single_page"]] - assert [snapshot.id for snapshot in page.iter_items()] == ["snap_single_page"] - assert requests == [{"teamId": "team_test123", "project": "prj_test123"}] - - @respx.mock - @pytest.mark.asyncio - async def test_list_snapshot_async_single_page_does_not_fetch_more( - self, mock_env_clear, mock_sandbox_snapshot_response - ): - from vercel.sandbox import AsyncSnapshot - - response = { - "snapshots": [ - _snapshot_with_id( - mock_sandbox_snapshot_response, - "snap_async_single", - created_at=1705320600000, - ), - ], - "pagination": { - "count": 1, - "next": None, - "prev": None, - }, - } - requests: list[dict[str, str]] = [] - - def handler(request: httpx.Request) -> httpx.Response: - requests.append(dict(request.url.params)) - return httpx.Response(200, json=response) - - respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/snapshots").mock(side_effect=handler) - - page = await AsyncSnapshot.list( - token="test_token", - team_id="team_test123", - project_id="prj_test123", - ) - - assert page.has_next_page() is False - assert page.next_page_info() is None - assert await page.get_next_page() is None - pages = await _collect_async_pages(page) - assert [[snapshot.id for snapshot in current_page.snapshots] for current_page in pages] == [ - ["snap_async_single"], - ] - items = await _collect_async_items(page) - assert [snapshot.id for snapshot in items] == ["snap_async_single"] - assert requests == [{"teamId": "team_test123", "project": "prj_test123"}] - @respx.mock def test_get_sandbox_sync_exposes_mode_network_policy( self, mock_env_clear, mock_sandbox_get_response_with_mode_network_policy @@ -2910,42 +2686,6 @@ def test_create_snapshot_sync_with_zero_expiration( sandbox.client.close() @respx.mock - def test_create_snapshot_sync_rejects_invalid_expiration( - self, mock_env_clear, mock_sandbox_get_response - ): - """Test sync snapshot creation validates the minimum expiration.""" - from vercel.sandbox import Sandbox - - sandbox_id = "sbx_test123456" - - respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( - return_value=httpx.Response( - 200, - json={ - "sandbox": mock_sandbox_get_response, - "routes": [], - }, - ) - ) - route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( - return_value=httpx.Response(200, json={}) - ) - - sandbox = Sandbox.get( - sandbox_id=sandbox_id, - token="test_token", - team_id="team_test123", - project_id="prj_test123", - ) - - with pytest.raises(ValueError, match="0 for no expiration or >= 86400000"): - sandbox.snapshot(expiration=3_600) - - assert not route.called - sandbox.client.close() - - @respx.mock - @pytest.mark.asyncio async def test_create_snapshot_async_without_expiration( self, mock_env_clear, mock_sandbox_get_response, mock_sandbox_snapshot_response ): @@ -3095,42 +2835,6 @@ async def test_create_snapshot_async_with_zero_expiration_and_optional_expires_a await sandbox.client.aclose() - @respx.mock - @pytest.mark.asyncio - async def test_create_snapshot_async_rejects_invalid_expiration( - self, mock_env_clear, mock_sandbox_get_response - ): - """Test async snapshot creation validates the minimum expiration.""" - from vercel.sandbox import AsyncSandbox - - sandbox_id = "sbx_test123456" - - respx.get(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}").mock( - return_value=httpx.Response( - 200, - json={ - "sandbox": mock_sandbox_get_response, - "routes": [], - }, - ) - ) - route = respx.post(f"{SANDBOX_API_BASE}/v1/sandboxes/{sandbox_id}/snapshot").mock( - return_value=httpx.Response(200, json={}) - ) - - sandbox = await AsyncSandbox.get( - sandbox_id=sandbox_id, - token="test_token", - team_id="team_test123", - project_id="prj_test123", - ) - - with pytest.raises(ValueError, match="0 for no expiration or >= 86400000"): - await sandbox.snapshot(expiration=3_600) - - assert not route.called - await sandbox.client.aclose() - class TestSandboxExtendTimeout: """Test sandbox timeout extension.""" diff --git a/tests/unit/test_sandbox_page.py b/tests/unit/test_sandbox_page.py new file mode 100644 index 0000000..cc159c8 --- /dev/null +++ b/tests/unit/test_sandbox_page.py @@ -0,0 +1,135 @@ +"""Tests for sandbox page helpers.""" + +from __future__ import annotations + +import pytest + +from vercel._internal.sandbox.models import Pagination, Snapshot as SnapshotModel +from vercel.sandbox.page import AsyncSnapshotPage, SnapshotPage + + +def _snapshot_model(snapshot_id: str, *, created_at: int) -> SnapshotModel: + return SnapshotModel.model_validate( + { + "id": snapshot_id, + "sourceSandboxId": "sbx_test123456", + "region": "iad1", + "status": "created", + "sizeBytes": 1024, + "expiresAt": created_at + 86_400_000, + "createdAt": created_at, + "updatedAt": created_at, + } + ) + + +async def _collect_async_pages(page: AsyncSnapshotPage) -> list[AsyncSnapshotPage]: + return [current_page async for current_page in page.iter_pages()] + + +async def _collect_async_items(page: AsyncSnapshotPage) -> list: + return [snapshot async for snapshot in page.iter_items()] + + +class TestSnapshotPage: + def test_iterates_pages_and_items(self) -> None: + second_page = SnapshotPage.create( + snapshots=[_snapshot_model("snap_2", created_at=1705320000000)], + pagination=Pagination(count=2, next=None, prev=1705320600000), + fetch_next_page=self._fetch_unreachable_page, + ) + + async def fetch_next_page(_page_info): + return second_page + + first_page = SnapshotPage.create( + snapshots=[_snapshot_model("snap_1", created_at=1705320600000)], + pagination=Pagination(count=2, next=1705320000000, prev=None), + fetch_next_page=fetch_next_page, + ) + + assert first_page.has_next_page() is True + next_page_info = first_page.next_page_info() + assert next_page_info is not None + assert next_page_info.until == 1705320000000 + assert [ + [snapshot.id for snapshot in page.snapshots] for page in first_page.iter_pages() + ] == [ + ["snap_1"], + ["snap_2"], + ] + assert [snapshot.id for snapshot in first_page.iter_items()] == ["snap_1", "snap_2"] + + def test_terminal_page_does_not_fetch_more(self) -> None: + page = SnapshotPage.create( + snapshots=[_snapshot_model("snap_terminal", created_at=1705320600000)], + pagination=Pagination(count=1, next=None, prev=None), + fetch_next_page=self._fetch_unreachable_page, + ) + + assert page.has_next_page() is False + assert page.next_page_info() is None + assert page.get_next_page() is None + assert [ + [snapshot.id for snapshot in current.snapshots] for current in page.iter_pages() + ] == [ + ["snap_terminal"], + ] + assert [snapshot.id for snapshot in page.iter_items()] == ["snap_terminal"] + + @staticmethod + async def _fetch_unreachable_page(_page_info) -> SnapshotPage: + raise AssertionError("fetch_next_page should not be called") + + +class TestAsyncSnapshotPage: + @pytest.mark.asyncio + async def test_iterates_pages_and_items(self) -> None: + second_page = AsyncSnapshotPage.create( + snapshots=[_snapshot_model("snap_async_2", created_at=1705320000000)], + pagination=Pagination(count=2, next=None, prev=1705320600000), + fetch_next_page=self._fetch_unreachable_page, + ) + + async def fetch_next_page(_page_info): + return second_page + + first_page = AsyncSnapshotPage.create( + snapshots=[_snapshot_model("snap_async_1", created_at=1705320600000)], + pagination=Pagination(count=2, next=1705320000000, prev=None), + fetch_next_page=fetch_next_page, + ) + + assert first_page.has_next_page() is True + next_page_info = first_page.next_page_info() + assert next_page_info is not None + assert next_page_info.until == 1705320000000 + pages = await _collect_async_pages(first_page) + assert [[snapshot.id for snapshot in page.snapshots] for page in pages] == [ + ["snap_async_1"], + ["snap_async_2"], + ] + items = await _collect_async_items(first_page) + assert [snapshot.id for snapshot in items] == ["snap_async_1", "snap_async_2"] + + @pytest.mark.asyncio + async def test_terminal_page_does_not_fetch_more(self) -> None: + page = AsyncSnapshotPage.create( + snapshots=[_snapshot_model("snap_async_terminal", created_at=1705320600000)], + pagination=Pagination(count=1, next=None, prev=None), + fetch_next_page=self._fetch_unreachable_page, + ) + + assert page.has_next_page() is False + assert page.next_page_info() is None + assert await page.get_next_page() is None + pages = await _collect_async_pages(page) + assert [[snapshot.id for snapshot in current.snapshots] for current in pages] == [ + ["snap_async_terminal"], + ] + items = await _collect_async_items(page) + assert [snapshot.id for snapshot in items] == ["snap_async_terminal"] + + @staticmethod + async def _fetch_unreachable_page(_page_info) -> AsyncSnapshotPage: + raise AssertionError("fetch_next_page should not be called") diff --git a/tests/unit/test_sandbox_snapshot.py b/tests/unit/test_sandbox_snapshot.py new file mode 100644 index 0000000..3854646 --- /dev/null +++ b/tests/unit/test_sandbox_snapshot.py @@ -0,0 +1,30 @@ +"""Tests for snapshot helpers.""" + +from __future__ import annotations + +import pytest + +from vercel.sandbox import MIN_SNAPSHOT_EXPIRATION_MS, SnapshotExpiration +from vercel.sandbox.snapshot import normalize_snapshot_expiration + + +class TestSnapshotExpiration: + def test_allows_zero(self) -> None: + expiration = SnapshotExpiration(0) + + assert expiration == 0 + + def test_allows_minimum(self) -> None: + expiration = SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS) + + assert expiration == MIN_SNAPSHOT_EXPIRATION_MS + + def test_rejects_values_below_minimum(self) -> None: + with pytest.raises(ValueError, match="0 for no expiration or >= 86400000"): + SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS - 1) + + def test_normalize_coerces_int(self) -> None: + expiration = normalize_snapshot_expiration(MIN_SNAPSHOT_EXPIRATION_MS) + + assert expiration == SnapshotExpiration(MIN_SNAPSHOT_EXPIRATION_MS) + assert isinstance(expiration, SnapshotExpiration) From 7204f8445319109dc22d89108fb3af28b67b57a4 Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Fri, 27 Mar 2026 13:33:29 -0700 Subject: [PATCH 9/9] Check expiration time with some drift --- examples/sandbox_11_snapshots.py | 35 ++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/examples/sandbox_11_snapshots.py b/examples/sandbox_11_snapshots.py index f87bd95..ef8b616 100644 --- a/examples/sandbox_11_snapshots.py +++ b/examples/sandbox_11_snapshots.py @@ -22,6 +22,25 @@ load_dotenv() +SNAPSHOT_EXPIRATION_MS = 86_400_000 +SNAPSHOT_EXPIRATION_DRIFT_MS = 1_000 + + +def assert_time_near(*, actual: int | None, expected: int, drift: int, label: str) -> None: + assert actual is not None, f"{label} should report an expires_at timestamp" + assert abs(actual - expected) <= drift, f"{label} should be within {drift}ms of {expected}" + + +def assert_snapshot_expires_at_expected_time( + *, created_at: int, expires_at: int | None, label: str +) -> None: + assert_time_near( + actual=expires_at, + expected=created_at + SNAPSHOT_EXPIRATION_MS, + drift=SNAPSHOT_EXPIRATION_DRIFT_MS, + label=label, + ) + async def async_demo() -> None: print("=" * 60) @@ -50,15 +69,17 @@ async def async_demo() -> None: # Step 2: Create a snapshot with an explicit expiration (this STOPS the sandbox) print("\n[3] Creating snapshot with expiration=86400000...") - snapshot = await sandbox1.snapshot(expiration=86400000) + snapshot = await sandbox1.snapshot(expiration=SNAPSHOT_EXPIRATION_MS) async_snapshot_ids.append(snapshot.snapshot_id) print(f" Snapshot ID: {snapshot.snapshot_id}") print(f" Status: {snapshot.status}") print(f" Created At: {snapshot.created_at}") print(f" Expires At: {snapshot.expires_at}") print(f" Sandbox status after snapshot: {sandbox1.status}") - assert snapshot.expires_at == snapshot.created_at + 86_400_000, ( - "expiring async snapshot should report an expires_at timestamp" + assert_snapshot_expires_at_expected_time( + created_at=snapshot.created_at, + expires_at=snapshot.expires_at, + label="expiring async snapshot", ) finally: @@ -172,15 +193,17 @@ def sync_demo() -> None: # Step 2: Create a snapshot with an explicit expiration (this STOPS the sandbox) print("\n[3] Creating snapshot with expiration=86400000...") - snapshot = sandbox1.snapshot(expiration=86400000) + snapshot = sandbox1.snapshot(expiration=SNAPSHOT_EXPIRATION_MS) sync_snapshot_ids.append(snapshot.snapshot_id) print(f" Snapshot ID: {snapshot.snapshot_id}") print(f" Status: {snapshot.status}") print(f" Created At: {snapshot.created_at}") print(f" Expires At: {snapshot.expires_at}") print(f" Sandbox status after snapshot: {sandbox1.status}") - assert snapshot.expires_at == snapshot.created_at + 86_400_000, ( - "expiring sync snapshot should report an expires_at timestamp" + assert_snapshot_expires_at_expected_time( + created_at=snapshot.created_at, + expires_at=snapshot.expires_at, + label="expiring sync snapshot", ) finally: