Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ It provides:
- `Utopia\Client`, a PSR-18 client wrapper with immutable headers, auth, base URI, and timeout helpers.
- `Utopia\Client\Adapter\Curl\Client`, a cURL transport for regular PHP runtimes.
- `Utopia\Client\Adapter\SwooleCoroutine\Client`, a Swoole coroutine transport.
- `streamRequest()`, which delivers the response body to a sink callback chunk-by-chunk so large downloads and event streams are consumed with bounded memory (see [Streaming Responses](#streaming-responses)).
- `Utopia\Psr7\*` PSR-7 messages and PSR-17 factories.
- Request factories for JSON, forms, query strings, raw bodies, and multipart uploads.
- Response decoders for JSON, form-encoded, and multipart payloads.
Expand Down Expand Up @@ -159,6 +160,54 @@ foreach ($parts as $part) {
}
```

## Streaming Responses

`streamRequest()` delivers the response body to a sink callback chunk-by-chunk as
it arrives, so large downloads, Server-Sent Events, and LLM token streams are
consumed with bounded memory — the whole body is never held at once. It returns a
response carrying the status and headers; the body is empty because the body was
handed to the sink. Both adapters support it.

```php
<?php

$response = $client->streamRequest($request, function (string $chunk): void {
echo $chunk;
});

echo $response->getStatusCode();
```

The sink runs as each chunk arrives, which means it also applies backpressure: the
transfer does not read ahead while the sink is still working. To stop early, throw
from the sink.

```php
<?php

// Parse a line-delimited (NDJSON / SSE) stream as it streams in.
$buffer = '';

$client->streamRequest($request, function (string $chunk) use (&$buffer): void {
$buffer .= $chunk;

while (($newline = strpos($buffer, "\n")) !== false) {
$line = substr($buffer, 0, $newline);
$buffer = substr($buffer, $newline + 1);
// handle $line
}
});
```

Notes:

- Use `sendRequest()` for normal requests — it buffers the body and returns a
fully decodable response (`->json()`, `->form()`, `->multipart()`).
- `streamRequest()` returns only once the stream ends. For an unbounded stream
(e.g. SSE), set the transport timeout to no-limit (`CURLOPT_TIMEOUT_MS => 0` on
cURL, `timeout => -1` on Swoole) and stop by throwing from the sink.
- The Swoole adapter must run inside a coroutine, like `sendRequest()`.

## Timeouts

Timeout values are seconds. The helpers are immutable and delegate to the selected adapter.
Expand Down
18 changes: 18 additions & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ public function sendRequest(RequestInterface $request): ResponseInterface
);
}

/**
* Send a request and pass each response body chunk to $sink as it arrives.
* The returned response carries the status and headers; its body is empty.
*
* @param callable(string): void $sink
*
* @throws ClientExceptionInterface
*/
public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface
{
return $this->adapter->streamRequest(
$this->applyHeaders(
$this->applyBaseUri($request),
),
$sink,
);
}

private function applyBaseUri(RequestInterface $request): RequestInterface
{
if (!$this->baseUri instanceof \Psr\Http\Message\UriInterface) {
Expand Down
15 changes: 15 additions & 0 deletions src/Client/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,26 @@

namespace Utopia\Client;

use Psr\Http\Client\ClientExceptionInterface;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;

interface Adapter extends ClientInterface
{
public function withTimeout(float $seconds): static;

public function withConnectTimeout(float $seconds): static;

/**
* Send a request and pass each response body chunk to $sink as it arrives,
* keeping memory bounded regardless of body size. The returned response
* carries the status and headers; its body is empty because the body was
* delivered to $sink.
*
* @param callable(string): void $sink
*
* @throws ClientExceptionInterface
*/
public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface;
}
68 changes: 54 additions & 14 deletions src/Client/Adapter/Curl/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,53 @@ public function withConnectTimeout(float $seconds): static
* @throws ClientExceptionInterface
*/
public function sendRequest(RequestInterface $request): ResponseInterface
{
$body = '';

$parsed = $this->transfer($request, static function (string $chunk) use (&$body): void {
$body .= $chunk;
});

return $this->responseBuilder->build(
$parsed['status'],
$parsed['reason'],
$parsed['headers'],
$body,
$parsed['protocol'],
);
}

/**
* @param callable(string): void $sink
*
* @throws ClientExceptionInterface
*/
public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface
{
$parsed = $this->transfer($request, $sink);

return $this->responseBuilder->build(
$parsed['status'],
$parsed['reason'],
$parsed['headers'],
'',
$parsed['protocol'],
);
}

/**
* Run the transfer, forwarding each body chunk to $sink, and return the
* parsed status line and headers. cURL invokes the write callback as data
* arrives, so a streaming $sink sees the body chunk-by-chunk and the body
* is never fully held in memory.
*
* @param callable(string): void $sink
*
* @return array{protocol: string, status: int, reason: string, headers: array<string, array<int, string>>}
*
* @throws ClientExceptionInterface
*/
private function transfer(RequestInterface $request, callable $sink): array
{
if (!\extension_loaded('curl')) {
throw new AdapterPreconditionException($request, 'The curl extension is required.');
Expand All @@ -87,14 +134,13 @@ public function sendRequest(RequestInterface $request): ResponseInterface
}

$headers = '';
$body = '';
$handle = curl_init($url);

if (!$handle instanceof CurlHandle) {
throw new AdapterInitializationException($request, 'Unable to initialize curl.');
}

$options = $this->options($request, $headers, $body);
$options = $this->options($request, $headers, $sink);

try {
if (curl_setopt_array($handle, $options) === false) {
Expand Down Expand Up @@ -123,22 +169,16 @@ public function sendRequest(RequestInterface $request): ResponseInterface
throw new InvalidResponseException($request, 'Received an invalid HTTP response.');
}

return $this->responseBuilder->build(
$parsed['status'],
$parsed['reason'],
$parsed['headers'],
$body,
$parsed['protocol'],
);
return $parsed;
}

/**
* @param-out string $headers
* @param-out string $body
* @param-out string $headers
* @param callable(string): void $sink
*
* @return array<int, mixed>
*/
private function options(RequestInterface $request, string &$headers, string &$body): array
private function options(RequestInterface $request, string &$headers, callable $sink): array
{
$options = [
\CURLOPT_CUSTOMREQUEST => $request->getMethod(),
Expand All @@ -153,9 +193,9 @@ private function options(RequestInterface $request, string &$headers, string &$b
return \strlen($line);
},
\CURLOPT_RETURNTRANSFER => false,
\CURLOPT_WRITEFUNCTION => static function (CurlHandle $handle, string $chunk) use (&$body): int {
\CURLOPT_WRITEFUNCTION => static function (CurlHandle $handle, string $chunk) use ($sink): int {
unset($handle);
$body .= $chunk;
$sink($chunk);

return \strlen($chunk);
},
Expand Down
64 changes: 42 additions & 22 deletions src/Client/Adapter/SwooleCoroutine/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,31 @@ public function withConnectTimeout(float $seconds): static
* @throws ClientExceptionInterface
*/
public function sendRequest(RequestInterface $request): ResponseInterface
{
return $this->perform($request, null);
}

/**
* @param callable(string): void $sink
*
* @throws ClientExceptionInterface
*/
public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface
{
return $this->perform($request, $sink);
}

/**
* Execute the request. When $sink is given, Swoole's write callback forwards
* each body chunk to it as data arrives, so the body is never fully held in
* memory and the returned response has an empty body; otherwise the body is
* buffered onto the response.
*
* @param (callable(string): void)|null $sink
*
* @throws ClientExceptionInterface
*/
private function perform(RequestInterface $request, ?callable $sink): ResponseInterface
{
if (!\extension_loaded('swoole')) {
throw new AdapterPreconditionException($request, 'The swoole extension is required.');
Expand Down Expand Up @@ -107,10 +132,17 @@ public function sendRequest(RequestInterface $request): ResponseInterface
throw new AdapterInitializationException($request, $throwable->getMessage(), (int) $throwable->getCode(), $throwable);
}

$settings = $this->settings + [self::SETTING_HTTP2 => false];

if ($sink !== null) {
$settings['write_func'] = static function (SwooleClient $cli, string $chunk) use ($sink): void {
unset($cli);
$sink($chunk);
};
}

try {
if ($client->set($this->settings + [
self::SETTING_HTTP2 => false,
]) === false) {
if ($client->set($settings) === false) {
throw new InvalidArgumentException('Unable to configure Swoole client settings.');
}

Expand All @@ -121,6 +153,12 @@ public function sendRequest(RequestInterface $request): ResponseInterface
if ($client->setHeaders($this->requestHeaders($request)) === false) {
throw new InvalidArgumentException('Unable to configure Swoole request headers.');
}

$body = (string) $request->getBody();

if ($body !== '' && $client->setData($body) === false) {
throw new InvalidArgumentException('Unable to configure Swoole request body.');
}
} catch (InvalidArgumentException $invalidArgumentException) {
$client->close();

Expand All @@ -131,24 +169,6 @@ public function sendRequest(RequestInterface $request): ResponseInterface
throw new InvalidArgumentException($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
}

$body = (string) $request->getBody();

if ($body !== '') {
try {
if ($client->setData($body) === false) {
throw new InvalidArgumentException('Unable to configure Swoole request body.');
}
} catch (InvalidArgumentException $invalidArgumentException) {
$client->close();

throw $invalidArgumentException;
} catch (Throwable $throwable) {
$client->close();

throw new InvalidArgumentException($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
}
}

try {
$result = $client->execute($this->path($request));
} catch (Throwable $throwable) {
Expand Down Expand Up @@ -184,7 +204,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface
$headers = [];
}

$responseBody = $client->body;
$responseBody = $sink === null ? $client->body : '';

if (!\is_string($responseBody)) {
$responseBody = '';
Expand Down
Loading