Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/Illuminate/Http/Client/Batch.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public function as(string $key)

$this->incrementPendingRequests();

return $this->requests[$key] = $this->asyncRequest();
return $this->factory->setHandler($this->handler)->async()->setDeferred($this->requests, $key);
}

/**
Expand Down Expand Up @@ -252,18 +252,19 @@ public function send(): array
}

$results = [];
$promises = [];

foreach ($this->requests as $key => $item) {
$promise = match (true) {
$item instanceof PendingRequest => $item->getPromise(),
default => $item,
if (! empty($this->requests)) {
// Create a generator that yields promises on-demand for proper concurrency control
$promiseGenerator = function () {
foreach ($this->requests as $key => $item) {
yield $key => match (true) {
$item instanceof Closure => $item(),
$item instanceof PendingRequest => $item->getPromise(),
default => $item,
};
}
};

$promises[$key] = $promise;
}

if (! empty($promises)) {
$eachPromiseOptions = [
'fulfilled' => function ($result, $key) use (&$results) {
$results[$key] = $result;
Expand Down Expand Up @@ -311,7 +312,7 @@ public function send(): array
$eachPromiseOptions['concurrency'] = $this->concurrencyLimit;
}

(new EachPromise($promises, $eachPromiseOptions))->promise()->wait();
(new EachPromise($promiseGenerator(), $eachPromiseOptions))->promise()->wait();
}

// Before returning the results, we must ensure that the results are sorted
Expand Down Expand Up @@ -422,7 +423,7 @@ public function getRequests(): array
*
* @param string $method
* @param array $parameters
* @return \Illuminate\Http\Client\PendingRequest|\GuzzleHttp\Promise\Promise
* @return \Illuminate\Http\Client\PendingRequest
*/
public function __call(string $method, array $parameters)
{
Expand All @@ -432,6 +433,10 @@ public function __call(string $method, array $parameters)

$this->incrementPendingRequests();

return $this->requests[] = $this->asyncRequest()->$method(...$parameters);
// Get the next numeric index
$key = count($this->requests);

// Create a deferred PendingRequest and call the method to start chaining
return $this->factory->setHandler($this->handler)->async()->setDeferred($this->requests, $key)->$method(...$parameters);
}
}
99 changes: 99 additions & 0 deletions src/Illuminate/Http/Client/DeferredPromise.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php

namespace Illuminate\Http\Client;

class DeferredPromise
{
/**
* Reference to the pool/batch requests array.
*
* @var array
*/
protected $requests;

/**
* The key for this request in the pool/batch.
*
* @var string|int
*/
protected $key;

/**
* The base promise closure that will execute the request.
*
* @var callable
*/
protected $promiseFactory;

/**
* Additional promise transformations to apply.
*
* @var array<callable>
*/
protected $transformations = [];

/**
* @param array &$requests Reference to the pool/batch requests array
* @param string|int $key The key for this request
* @param callable $promiseFactory Closure that returns a promise
*/
public function __construct(array &$requests, $key, callable $promiseFactory)
{
$this->requests = &$requests;
$this->key = $key;
$this->promiseFactory = $promiseFactory;

// Store the factory in the requests array
$this->updateRequestsClosure();
}

/**
* Add a promise transformation (like ->then()).
*
* @param callable|null $onFulfilled
* @param callable|null $onRejected
* @return $this
*/
public function then(?callable $onFulfilled = null, ?callable $onRejected = null)
{
$this->transformations[] = ['method' => 'then', 'args' => [$onFulfilled, $onRejected]];
$this->updateRequestsClosure();

return $this;
}

/**
* Add an otherwise transformation.
*
* @param callable $onRejected
* @return $this
*/
public function otherwise(callable $onRejected)
{
$this->transformations[] = ['method' => 'otherwise', 'args' => [$onRejected]];
$this->updateRequestsClosure();

return $this;
}

/**
* Update the closure stored in the requests array to include all transformations.
*
* @return void
*/
protected function updateRequestsClosure()
{
$promiseFactory = $this->promiseFactory;
$transformations = $this->transformations;

$this->requests[$this->key] = function () use ($promiseFactory, $transformations) {
$promise = $promiseFactory();

foreach ($transformations as $transformation) {
$promise = $promise->{$transformation['method']}(...$transformation['args']);
}

return $promise;
};
}
}
Loading
Loading