Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions grpc/guides/advanced/connection_pool.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Connection Pool

The Elixir gRPC client maintains a **connection pool** for every target address. Rather than opening a fresh HTTP/2 connection for each RPC, callers share a fixed set of long-lived connections. This reduces connection overhead, improves throughput under concurrency, and enables fine-grained control over how many connections and concurrent streams your client opens.

The pool is managed transparently: when you call a stub, the client checks out a connection, tracks the open stream count, and returns the connection when the call finishes. All of this happens without any additional code on your side.

---

## How It Works

When `GRPC.Stub.connect/2` is called, a supervised pool of connections is started for the target address. Each connection is an independent HTTP/2 session capable of multiplexing many concurrent streams.

On each RPC call the client:

1. Picks first connection from the pool with number open streams that is still below the configured limit.
2. Increments that connection's open-stream counter and records a lease.
3. Runs the RPC.
4. Decrements the counter and releases the lease when the call completes.

If every connection in the pool has reached `max_streams`, the client opens an **overflow** connection up to `max_overflow` extra connections. Overflow connections aren't discarded once the load subsides, but if pool dies or connections are otherwise dropped we reset to initial count.

---

## Configuration

Pool behaviour is controlled via the `:pool` option passed to `GRPC.Stub.connect/2` or `GRPC.Client.Connection.connect/2`.

| Option | Type | Default | Description |
|:-------------------------|:----------------------------|:--------|:-------------------------------------------------------------------------------------------------------------------|
| `:size` | `non_neg_integer` | `1` | Number of persistent connections kept open. |
| `:max_overflow` | `non_neg_integer or nil` | `0` | Maximum number of extra connections created when the pool is fully saturated. `nil` means no client-side limit. |
| `:max_streams` | `pos_integer or nil` | `nil` | Maximum concurrent streams per connection. `nil` means no client-side limit (server limit applies). |
| `:health_check_enabled` | `boolean` | `false` | When `true`, a periodic gRPC health-check ping is sent on each connection every 10 minutes. |

---

## Examples

### Default pool (single connection)

```elixir
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051")
iex> {:ok, reply} = channel |> MyService.Stub.my_rpc(request)
```

### Multiple persistent connections

Open three connections upfront to distribute concurrent load:

```elixir
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051", pool: %{size: 3})
iex> {:ok, reply} = channel |> MyService.Stub.my_rpc(request)
```

### Overflow connections

Keep two persistent connections and allow up to five overflow connections during traffic spikes:

```elixir
iex> {:ok, channel} =
...> GRPC.Stub.connect("localhost:50051",
...> pool: %{size: 2, max_overflow: 5}
...> )
```

### Limiting streams per connection

Cap each connection at 100 concurrent streams. Requests beyond this limit will use a new overflow connection (if allowed):

```elixir
iex> {:ok, channel} =
...> GRPC.Stub.connect("localhost:50051",
...> pool: %{size: 2, max_overflow: 1, max_streams: 100}
...> )
```

---

## Disconnect

Calling `GRPC.Stub.disconnect/1` stops the pool supervisor and closes all underlying connections:

```elixir
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051")
iex> {:ok, _} = GRPC.Stub.disconnect(channel)
```

The returned channel has `pool: nil`, indicating the pool is no longer active.

---
6 changes: 4 additions & 2 deletions grpc/lib/grpc/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ defmodule GRPC.Channel do
interceptors: [],
compressor: module(),
accepted_compressors: [module()],
headers: list()
headers: list(),
pool: reference()
}
defstruct host: nil,
port: nil,
Expand All @@ -41,5 +42,6 @@ defmodule GRPC.Channel do
interceptors: [],
compressor: nil,
accepted_compressors: [],
headers: []
headers: [],
pool: nil
end
1 change: 1 addition & 0 deletions grpc/lib/grpc/client/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule GRPC.Client.Application do

def start(_type, _args) do
children = [
{Registry, [keys: :unique, name: GRPC.Client.Pool.Registry]},
{DynamicSupervisor, [name: GRPC.Client.Supervisor]}
]

Expand Down
146 changes: 81 additions & 65 deletions grpc/lib/grpc/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ defmodule GRPC.Client.Connection do
* `:codec` – request/response codec (default: `GRPC.Codec.Proto`)
* `:compressor` / `:accepted_compressors` – message compression
* `:headers` – default metadata headers
* `:pool` – connection pool options (map):
* `:size` – number of persistent connections (default: `1`)
* `:max_overflow` – max extra connections when pool is saturated; `nil` for no limit (default: `0`)
* `:max_streams` – max concurrent streams per connection; `nil` for no client-side limit (default: `nil`)
* `:health_check_enabled` – send periodic gRPC health-check pings (default: `false`)

The pool can be disabled entirely by setting `config :grpc, pool_enabled: false` in your
application config, which restores the pre-pool behaviour (single direct connection per `connect/2` call).

Returns:

Expand Down Expand Up @@ -228,26 +236,15 @@ defmodule GRPC.Client.Connection do
end

@impl GenServer
def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do
def handle_call({:disconnect, %Channel{adapter: adapter, pool: nil} = channel}, _from, state) do
resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}}
:persistent_term.erase({__MODULE__, :lb_state, channel.ref})

if Map.has_key?(state, :real_channels) do
Enum.map(state.real_channels, fn
{_key, {:ok, ch}} ->
do_disconnect(adapter, ch)

_ ->
:ok
end)

keys_to_delete = [:real_channels, :virtual_channel]
new_state = Map.drop(state, keys_to_delete)
do_handle_disconnect(adapter, channel, resp, state)
end

{:reply, resp, new_state, {:continue, :stop}}
else
{:reply, resp, state, {:continue, :stop}}
end
@impl GenServer
def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do
resp = {:ok, %Channel{channel | pool: nil}}
do_handle_disconnect(adapter, channel, resp, state)
end

@impl GenServer
Expand Down Expand Up @@ -312,16 +309,28 @@ defmodule GRPC.Client.Connection do
{:global, {__MODULE__, ref}}
end

defp do_disconnect(adapter, channel) do
adapter.disconnect(channel)
rescue
_ ->
:ok
catch
_type, _value ->
:ok
defp do_handle_disconnect(adapter, channel, resp, state) do
:persistent_term.erase({__MODULE__, :lb_state, channel.ref})

if Map.has_key?(state, :real_channels) do
Enum.map(state.real_channels, fn
{_key, {:ok, ch}} -> do_disconnect(adapter, ch)
_ -> :ok
end)

{:reply, resp, Map.drop(state, [:real_channels, :virtual_channel]), {:continue, :stop}}
else
{:reply, resp, state, {:continue, :stop}}
end
end

defp pool_enabled?, do: Application.get_env(:grpc, :pool_enabled, false)

defp do_disconnect(adapter, %Channel{pool: nil} = ch), do: adapter.disconnect(ch)

defp do_disconnect(_adapter, %Channel{pool: pool_ref}),
do: GRPC.Client.Pool.stop_for_address(pool_ref)

defp build_initial_state(target, opts) do
opts =
Keyword.validate!(opts,
Expand All @@ -333,7 +342,8 @@ defmodule GRPC.Client.Connection do
codec: GRPC.Codec.Proto,
compressor: nil,
accepted_compressors: [],
headers: []
headers: [],
pool: %{size: 1, max_overflow: 0, max_streams: nil, health_check_enabled: false}
)

resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver)
Expand Down Expand Up @@ -369,10 +379,10 @@ defmodule GRPC.Client.Connection do

case resolver.resolve(norm_target) do
{:ok, %{addresses: addresses, service_config: config}} ->
build_balanced_state(base_state, addresses, config, lb_policy_opt, norm_opts, adapter)
build_balanced_state(base_state, addresses, config, lb_policy_opt, norm_opts)

{:error, _reason} ->
build_direct_state(base_state, norm_target, norm_opts, adapter)
build_direct_state(base_state, norm_target, norm_opts)
end
end

Expand All @@ -397,8 +407,7 @@ defmodule GRPC.Client.Connection do
addresses,
config,
lb_policy_opt,
norm_opts,
adapter
norm_opts
) do
lb_policy =
cond do
Expand All @@ -419,7 +428,7 @@ defmodule GRPC.Client.Connection do
{:ok, {prefer_host, prefer_port}, new_lb_state} = lb_mod.pick(lb_state)

real_channels =
build_real_channels(addresses, base_state.virtual_channel, norm_opts, adapter)
build_real_channels(addresses, base_state.virtual_channel, norm_opts)

key = build_address_key(prefer_host, prefer_port)

Expand All @@ -441,38 +450,55 @@ defmodule GRPC.Client.Connection do
end
end

defp build_direct_state(%__MODULE__{} = base_state, norm_target, norm_opts, adapter) do
defp build_direct_state(%__MODULE__{} = base_state, norm_target, norm_opts) do
{host, port} = split_host_port(norm_target)
vc = base_state.virtual_channel

case connect_real_channel(vc, host, port, norm_opts, adapter) do
{:ok, ch} ->
{:ok,
%__MODULE__{
base_state
| virtual_channel: ch,
real_channels: %{"#{host}:#{port}" => {:ok, ch}}
}}
if pool_enabled?() do
case GRPC.Client.Pool.start_for_address(vc, host, port, norm_opts) do
{:ok, ch} ->
{:ok,
%__MODULE__{
base_state
| virtual_channel: ch,
real_channels: %{"#{host}:#{port}" => {:ok, ch}}
}}

{:error, reason} ->
{:error, reason}
{:error, reason} ->
{:error, reason}
end
else
channel = %Channel{vc | host: host, port: port}

case base_state.adapter.connect(channel, norm_opts[:adapter_opts] || []) do
{:ok, ch} ->
{:ok,
%__MODULE__{
base_state
| virtual_channel: ch,
real_channels: %{"#{host}:#{port}" => {:ok, ch}}
}}

{:error, reason} ->
{:error, reason}
end
end
end

defp build_real_channels(addresses, %Channel{} = virtual_channel, norm_opts, adapter) do
defp build_real_channels(addresses, %Channel{} = virtual_channel, norm_opts) do
Map.new(addresses, fn %{port: port, address: host} ->
case connect_real_channel(
%Channel{virtual_channel | host: host, port: port},
host,
port,
norm_opts,
adapter
) do
{:ok, ch} ->
{build_address_key(host, port), {:ok, ch}}
if pool_enabled?() do
case GRPC.Client.Pool.start_for_address(virtual_channel, host, port, norm_opts) do
{:ok, ch} -> {build_address_key(host, port), {:ok, ch}}
{:error, reason} -> {build_address_key(host, port), {:error, reason}}
end
else
channel = %Channel{virtual_channel | host: host, port: port}

{:error, reason} ->
{build_address_key(host, port), {:error, reason}}
case virtual_channel.adapter.connect(channel, norm_opts[:adapter_opts] || []) do
{:ok, ch} -> {build_address_key(host, port), {:ok, ch}}
{:error, reason} -> {build_address_key(host, port), {:error, reason}}
end
end
end)
end
Expand Down Expand Up @@ -522,16 +548,6 @@ defmodule GRPC.Client.Connection do
defp choose_lb(:round_robin), do: GRPC.Client.LoadBalancing.RoundRobin
defp choose_lb(_), do: GRPC.Client.LoadBalancing.PickFirst

defp connect_real_channel(%Channel{scheme: "unix"} = vc, path, port, opts, adapter) do
%Channel{vc | host: path, port: port}
|> adapter.connect(opts[:adapter_opts])
end

defp connect_real_channel(%Channel{} = vc, host, port, opts, adapter) do
%Channel{vc | host: host, port: port}
|> adapter.connect(opts[:adapter_opts])
end

defp split_host_port(target) do
case String.split(target, ":", trim: true) do
[h, p] -> {h, String.to_integer(p)}
Expand Down
Loading