Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openapi/ga/individual/platform.openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions openapi/ga/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions openapi/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

188 changes: 106 additions & 82 deletions packages/filesets/src/filesets/filesystem/filesystem.py

Large diffs are not rendered by default.

239 changes: 172 additions & 67 deletions packages/filesets/src/filesets/resources.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,26 @@ def send(
params = self._resolve_query_params(request)

if self._is_binary(request):
stream_ctx = self._http.stream(
request.method, url, content=request.content, headers=req_headers, params=params
)
return NemoBinaryResponse(stream_ctx, request)
kwargs = {
"method": request.method,
"url": url,
"content": request.content,
"headers": req_headers,
"params": params,
}
return NemoBinaryResponse(self._http, kwargs, request)

if self._is_stream(request):
assert request.response_type is not None
stream_ctx = self._http.stream(
request.method, url, content=request.content, headers=req_headers, params=params
)
kwargs = {
"method": request.method,
"url": url,
"content": request.content,
"headers": req_headers,
"params": params,
}
model_type = _get_stream_model_type(request.response_type)
return NemoStreamResponse(stream_ctx, model_type, request)
return NemoStreamResponse(self._http, kwargs, model_type, request)

raw = self._http.request(request.method, url, content=request.content, headers=req_headers, params=params)
body = None
Expand Down Expand Up @@ -226,18 +234,26 @@ async def send(
params = self._resolve_query_params(request)

if self._is_binary(request):
stream_ctx = self._http.stream(
request.method, url, content=request.content, headers=req_headers, params=params
)
return AsyncNemoBinaryResponse(stream_ctx, request)
kwargs = {
"method": request.method,
"url": url,
"content": request.content,
"headers": req_headers,
"params": params,
}
return AsyncNemoBinaryResponse(self._http, kwargs, request)

if self._is_stream(request):
assert request.response_type is not None
stream_ctx = self._http.stream(
request.method, url, content=request.content, headers=req_headers, params=params
)
kwargs = {
"method": request.method,
"url": url,
"content": request.content,
"headers": req_headers,
"params": params,
}
model_type = _get_stream_model_type(request.response_type)
return AsyncNemoStreamResponse(stream_ctx, model_type, request)
return AsyncNemoStreamResponse(self._http, kwargs, model_type, request)

raw = await self._http.request(request.method, url, content=request.content, headers=req_headers, params=params)
body = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
from __future__ import annotations

from collections.abc import AsyncIterator, Iterator
from contextlib import AbstractAsyncContextManager, AbstractContextManager
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass
from types import TracebackType
from typing import Generic, TypeVar
from typing import Any, Generic, TypeVar

import httpx
from nemo_platform_plugin.client.types import PreparedRequest
Expand Down Expand Up @@ -51,82 +50,71 @@ def data(self) -> ResponseT:
class NemoBinaryResponse:
"""Sync response for binary download endpoints.

Use as a context manager::
``read()`` performs a regular (non-streaming) HTTP request::

with client.send(endpoints.download(...)) as resp:
data = resp.read() # all bytes at once
# or: for chunk in resp # iterate chunks
resp = client.send(endpoints.download(...))
data = resp.read()

For streaming chunks, use ``stream()`` which returns a context manager
yielding the raw ``httpx.Response``::

resp = client.send(endpoints.download(...))
with resp.stream() as http_response:
for chunk in http_response.iter_bytes():
f.write(chunk)
"""

def __init__(self, stream_ctx: AbstractContextManager[httpx.Response], request: PreparedRequest) -> None:
self._stream_ctx = stream_ctx
self._response: httpx.Response | None = None
def __init__(self, http: httpx.Client, request_kwargs: dict[str, Any], request: PreparedRequest) -> None:
self._http = http
self._request_kwargs = request_kwargs
self.request = request

@property
def http_response(self) -> httpx.Response:
assert self._response is not None, "Must enter context manager before accessing response"
return self._response

def read(self) -> bytes:
"""Read and return the entire response body as bytes."""
return self.http_response.read()

def __iter__(self) -> Iterator[bytes]:
return self.http_response.iter_bytes()

def __enter__(self) -> NemoBinaryResponse:
self._response = self._stream_ctx.__enter__()
self._response.raise_for_status()
return self
resp = self._http.request(**self._request_kwargs)
resp.raise_for_status()
return resp.content

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
self._stream_ctx.__exit__(exc_type, exc_val, exc_tb)
@contextmanager
def stream(self) -> Iterator[httpx.Response]:
"""Open a streaming connection for chunk-by-chunk iteration."""
with self._http.stream(**self._request_kwargs) as resp:
resp.raise_for_status()
yield resp


class NemoStreamResponse(Generic[ModelT]):
"""Sync response for SSE/NDJSON streaming endpoints.

Use as a context manager::
Use ``stream()`` to iterate over parsed model objects::

with client.send(ChatEndpoint(...)) as resp:
for chunk in resp:
resp = client.send(ChatEndpoint(...))
with resp.stream() as chunks:
for chunk in chunks:
print(chunk.text)
"""

def __init__(
self,
stream_ctx: AbstractContextManager[httpx.Response],
model_type: type[ModelT],
request: PreparedRequest,
self, http: httpx.Client, request_kwargs: dict[str, Any], model_type: type[ModelT], request: PreparedRequest
) -> None:
self._stream_ctx = stream_ctx
self._http = http
self._request_kwargs = request_kwargs
self._model_type = model_type
self._response: httpx.Response | None = None
self.request = request

@property
def http_response(self) -> httpx.Response:
assert self._response is not None, "Must enter context manager before accessing response"
return self._response
@contextmanager
def stream(self) -> Iterator[Iterator[ModelT]]:
"""Open a streaming connection and yield an iterator of parsed models."""
with self._http.stream(**self._request_kwargs) as resp:
resp.raise_for_status()

def __iter__(self) -> Iterator[ModelT]:
for line in self.http_response.iter_lines():
line = line.strip()
if line:
yield self._model_type.model_validate_json(line)
def _iter_models() -> Iterator[ModelT]:
for line in resp.iter_lines():
line = line.strip()
if line:
yield self._model_type.model_validate_json(line)

def __enter__(self) -> NemoStreamResponse[ModelT]:
self._response = self._stream_ctx.__enter__()
self._response.raise_for_status()
return self

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
self._stream_ctx.__exit__(exc_type, exc_val, exc_tb)
yield _iter_models()


# ---------------------------------------------------------------------------
Expand All @@ -137,83 +125,75 @@ def __exit__(
class AsyncNemoBinaryResponse:
"""Async response for binary download endpoints.

Use as an async context manager::
``read()`` performs a regular (non-streaming) HTTP request::

resp = await client.send(endpoints.download(...))
data = await resp.read()

For streaming chunks, use ``stream()`` which returns an async context
manager yielding the raw ``httpx.Response``::

async with client.send(endpoints.download(...)) as resp:
data = await resp.read() # all bytes at once
# or: async for chunk in resp # iterate chunks
resp = await client.send(endpoints.download(...))
async with resp.stream() as http_response:
async for chunk in http_response.aiter_bytes():
f.write(chunk)
"""

def __init__(self, stream_ctx: AbstractAsyncContextManager[httpx.Response], request: PreparedRequest) -> None:
self._stream_ctx = stream_ctx
self._response: httpx.Response | None = None
def __init__(self, http: httpx.AsyncClient, request_kwargs: dict[str, Any], request: PreparedRequest) -> None:
self._http = http
self._request_kwargs = request_kwargs
self.request = request

@property
def http_response(self) -> httpx.Response:
assert self._response is not None, "Must enter async context manager before accessing response"
return self._response

async def read(self) -> bytes:
"""Read and return the entire response body as bytes."""
return await self.http_response.aread()
resp = await self._http.request(**self._request_kwargs)
resp.raise_for_status()
return resp.content

async def __aiter__(self) -> AsyncIterator[bytes]:
async for chunk in self.http_response.aiter_bytes():
yield chunk

async def __aenter__(self) -> AsyncNemoBinaryResponse:
self._response = await self._stream_ctx.__aenter__()
self._response.raise_for_status()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
await self._stream_ctx.__aexit__(exc_type, exc_val, exc_tb)
@asynccontextmanager
async def stream(self) -> AsyncIterator[httpx.Response]:
"""Open a streaming connection for chunk-by-chunk iteration."""
async with self._http.stream(**self._request_kwargs) as resp:
resp.raise_for_status()
yield resp


class AsyncNemoStreamResponse(Generic[ModelT]):
"""Async response for SSE/NDJSON streaming endpoints.

Use as an async context manager::
Use ``stream()`` to iterate over parsed model objects::

async with client.send(ChatEndpoint(...)) as resp:
async for chunk in resp:
resp = await client.send(ChatEndpoint(...))
async with resp.stream() as chunks:
async for chunk in chunks:
print(chunk.text)
"""

def __init__(
self,
stream_ctx: AbstractAsyncContextManager[httpx.Response],
http: httpx.AsyncClient,
request_kwargs: dict[str, Any],
model_type: type[ModelT],
request: PreparedRequest,
) -> None:
self._stream_ctx = stream_ctx
self._http = http
self._request_kwargs = request_kwargs
self._model_type = model_type
self._response: httpx.Response | None = None
self.request = request

@property
def http_response(self) -> httpx.Response:
assert self._response is not None, "Must enter async context manager before accessing response"
return self._response
@asynccontextmanager
async def stream(self) -> AsyncIterator[AsyncIterator[ModelT]]:
"""Open a streaming connection and yield an async iterator of parsed models."""
async with self._http.stream(**self._request_kwargs) as resp:
resp.raise_for_status()

async def __aiter__(self) -> AsyncIterator[ModelT]:
async for line in self.http_response.aiter_lines():
line = line.strip()
if line:
yield self._model_type.model_validate_json(line)
async def _iter_models() -> AsyncIterator[ModelT]:
async for line in resp.aiter_lines():
line = line.strip()
if line:
yield self._model_type.model_validate_json(line)

async def __aenter__(self) -> AsyncNemoStreamResponse[ModelT]:
self._response = await self._stream_ctx.__aenter__()
self._response.raise_for_status()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
await self._stream_ctx.__aexit__(exc_type, exc_val, exc_tb)
yield _iter_models()


# ---------------------------------------------------------------------------
Expand Down
Loading
Loading