Skip to content

Commit 92ab0a9

Browse files
committed
Merge branch 'master' into 3.2-merge
# Conflicts: # .github/workflows/test.yml # src/grpc-client/src/BaseClient.php
2 parents 207e90d + 063d5b1 commit 92ab0a9

2 files changed

Lines changed: 86 additions & 29 deletions

File tree

src/BaseClient.php

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515
use Google\Protobuf\Internal\Message;
1616
use Hyperf\Context\ApplicationContext;
17+
use Hyperf\Context\Context;
1718
use Hyperf\Coroutine\Channel\Pool as ChannelPool;
19+
use Hyperf\Coroutine\Locker;
1820
use Hyperf\Grpc\StatusCode;
1921
use Hyperf\GrpcClient\Exception\GrpcClientException;
2022
use InvalidArgumentException;
21-
use Swoole\Http2\Response;
23+
use Throwable;
2224

2325
use function Hyperf\Support\retry;
2426

@@ -29,17 +31,38 @@
2931
*/
3032
class BaseClient
3133
{
32-
private ?GrpcClient $grpcClient = null;
33-
3434
private bool $initialized = false;
3535

36+
/**
37+
* @var null|array<array-key,GrpcClient>
38+
*/
39+
private ?array $grpcClients = null;
40+
41+
private int $clientCount = 1;
42+
3643
public function __construct(private string $hostname, private array $options = [])
3744
{
45+
$this->clientCount = max(1, (int) ($this->options['client_count'] ?? 0));
3846
}
3947

4048
public function __destruct()
4149
{
42-
$this->grpcClient?->close(false);
50+
if (! $this->initialized) {
51+
return;
52+
}
53+
54+
$lastException = null;
55+
foreach ($this->grpcClients as $client) {
56+
try {
57+
$client->close(false);
58+
} catch (Throwable $exception) {
59+
$lastException = $exception;
60+
}
61+
}
62+
63+
if ($lastException) {
64+
throw $lastException;
65+
}
4366
}
4467

4568
public function __call(string $name, array $arguments): mixed
@@ -49,11 +72,13 @@ public function __call(string $name, array $arguments): mixed
4972

5073
public function _getGrpcClient(): GrpcClient
5174
{
75+
// Lazy initialization: defer client setup until first use to optimize resource usage.
5276
if (! $this->initialized) {
5377
$this->init();
5478
}
55-
$this->start();
56-
return $this->grpcClient;
79+
80+
// Ensure the client connection is started before use.
81+
return $this->start();
5782
}
5883

5984
/**
@@ -158,33 +183,65 @@ protected function _bidiRequest(
158183
return $call;
159184
}
160185

161-
private function start()
186+
private function start(): GrpcClient
162187
{
163-
$client = $this->grpcClient;
164-
if (! ($client->isRunning() || $client->start())) {
165-
$message = sprintf(
166-
'Grpc client start failed with error code %d when connect to %s',
167-
$client->getErrCode(),
168-
$this->hostname
169-
);
170-
throw new GrpcClientException($message, StatusCode::INTERNAL);
188+
$key = Context::getOrSet(self::class . '::id', fn () => array_rand($this->grpcClients));
189+
$client = $this->grpcClients[$key];
190+
191+
// If the client is already running, return it directly.
192+
if ($client->isRunning()) {
193+
return $client;
171194
}
172-
return true;
195+
196+
$lockKey = sprintf('%s:start:%d', spl_object_hash($this), $key);
197+
198+
if (Locker::lock($lockKey)) {
199+
try {
200+
$client->start(); // May throw exception
201+
} catch (Throwable $e) {
202+
$message = sprintf(
203+
'Grpc client start failed with error code %d when connect to %s',
204+
$client->getErrCode(),
205+
$this->hostname
206+
);
207+
throw new GrpcClientException($message, StatusCode::INTERNAL, $e);
208+
} finally {
209+
Locker::unlock($lockKey);
210+
}
211+
}
212+
213+
return $client;
173214
}
174215

175216
private function init()
176217
{
177-
if (! empty($this->options['client'])) {
178-
if (! $this->options['client'] instanceof GrpcClient) {
179-
throw new InvalidArgumentException('Parameter client have to instanceof Hyperf\GrpcClient\GrpcClient');
218+
$lockKey = sprintf('%s:init', spl_object_hash($this));
219+
220+
if (Locker::lock($lockKey)) {
221+
try {
222+
if ($this->initialized) {
223+
return;
224+
}
225+
226+
$channelPool = ApplicationContext::getContainer()->get(ChannelPool::class);
227+
if (! empty($this->options['client'])) { // Use the specified client.
228+
if (! $this->options['client'] instanceof GrpcClient) {
229+
throw new InvalidArgumentException('Parameter client have to instanceof Hyperf\GrpcClient\GrpcClient');
230+
}
231+
$this->grpcClients[] = $this->options['client'];
232+
} else { // Use multiple clients.
233+
for ($i = 0; $i < $this->clientCount; ++$i) {
234+
$grpcClient = new GrpcClient($channelPool);
235+
$grpcClient->set($this->hostname, $this->options);
236+
$this->grpcClients[] = $grpcClient;
237+
}
238+
}
239+
240+
$this->initialized = true;
241+
} finally {
242+
Locker::unlock($lockKey);
180243
}
181-
$this->grpcClient = $this->options['client'];
182-
} else {
183-
$this->grpcClient = new GrpcClient(ApplicationContext::getContainer()->get(ChannelPool::class));
184-
$this->grpcClient->set($this->hostname, $this->options);
185244
}
186-
187-
$this->initialized = true;
188245
}
189246

190247
private function buildRequest(string $method, Message $argument, array $options): Request

src/GrpcClient.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ public function set(string $hostname, array $options = [])
9797

9898
public function start(): bool
9999
{
100-
if ($this->recvCoroutineId !== 0 || $this->sendCoroutineId !== 0) {
101-
throw new RuntimeException('Cannot restart the client.');
100+
if ($this->recvCoroutineId !== 0 || $this->sendCoroutineId !== 0) { // Client already started
101+
return true;
102102
}
103103
if (! Coroutine::inCoroutine()) {
104-
throw new RuntimeException('Client must be started in coroutine');
104+
throw new RuntimeException('Client must be started in coroutine.');
105105
}
106106
if (! $this->getHttpClient()->connect()) {
107107
throw new GrpcClientException('Connect failed, error=' . $this->getHttpClient()->errMsg, $this->getHttpClient()->errCode);
@@ -149,7 +149,7 @@ public function closeRecv()
149149

150150
public function isConnected(): bool
151151
{
152-
return $this->httpClient->connected;
152+
return (bool) $this->httpClient?->connected;
153153
}
154154

155155
public function isStreamExist(int $streamId): bool

0 commit comments

Comments
 (0)