Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
136a95c
feat: ETS-backed LB with per-request pick, remove persistent_term wri…
cgreeno Apr 23, 2026
9b58997
chore: drop benchee dep and bench/lb_pick.exs from LB PR
cgreeno Apr 23, 2026
1ddd9ec
fix(lb): prevent pick/1 from crashing callers when the table is gone
cgreeno Apr 23, 2026
48b9975
test(connection): cover lb_mod.shutdown on disconnect and terminate
cgreeno Apr 23, 2026
6bba30f
docs(connection): drop references to the removed :refresh timer
cgreeno Apr 23, 2026
292d958
test(connection): restore the real persistent_term shape in already_s…
cgreeno Apr 23, 2026
d98c474
fix(lb): prevent PickFirst from going stale after DNS re-resolution
cgreeno Apr 23, 2026
88b3de1
refactor: swap persistent_term for a shared ETS registry
cgreeno Apr 23, 2026
1f5d7e8
test(registry): direct unit coverage for put/lookup/delete
cgreeno Apr 23, 2026
b3b0d98
test(connection): cover pick_channel races with disconnect
cgreeno Apr 23, 2026
189f46e
test(connection): assert no ETS leaks across 500 connect/disconnect c…
cgreeno Apr 23, 2026
43c98d2
test(re_resolve): cover pick races with a reconcile that shrinks back…
cgreeno Apr 23, 2026
dbab947
fix(lb): make update/2 required; remove broken init/1 fallback
cgreeno Apr 23, 2026
06ad6be
refactor(lb): drop shutdown/1, let BEAM reclaim ETS; stop Connection …
cgreeno Apr 23, 2026
f543e1a
docs: tighten comments on EXIT handler and PickFirst moduledoc
cgreeno Apr 23, 2026
9537b64
fix(connection): keep Registry consistent across init, reconcile, dis…
cgreeno Apr 24, 2026
d1be507
docs: tighten comments on Registry ordering and pick race
cgreeno Apr 28, 2026
648f80d
fix(connection): re-init resolver in place, don't stop the Connection
cgreeno Apr 28, 2026
4ab3008
Merge branch 'master' into feature/ets-lb-pick-path
cgreeno Apr 30, 2026
5334433
fix: erase persistent_term leak in GRPC.Client.Connection on disconnect
cgreeno Apr 30, 2026
38a032e
docs: aggressive comment cleanup, match #520 style
cgreeno May 1, 2026
b4b7f21
Merge branch 'master' into feature/ets-lb-pick-path
cgreeno May 5, 2026
9eb3a75
Merge remote-tracking branch 'origin/master' into feature/ets-lb-pick…
May 13, 2026
8af2f5f
test(dns_resolver): disconnect channel at end of failing-channels test
cgreeno May 13, 2026
4f39740
Merge remote-tracking branch 'origin/master' into feature/ets-lb-pick…
cgreeno May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 104 additions & 163 deletions grpc/lib/grpc/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule GRPC.Client.Connection do
* The target string is resolved using a [Resolver](GRPC.Client.Resolver).
* Depending on the target and service config, a load-balancing module is chosen
(e.g. `PickFirst`, `RoundRobin`).
* The orchestrator periodically refreshes the LB decision to adapt to changes.
* Each call to `pick/2` dispatches to the LB module, which selects a channel
per request. DNS re-resolution reconciles the LB's channel list in place.

## Target syntax

Expand Down Expand Up @@ -68,9 +69,6 @@ defmodule GRPC.Client.Connection do
iex> GRPC.Client.Connection.disconnect(ch)
{:ok, %GRPC.Channel{...}}

## Notes

* The orchestrator refreshes the LB pick every 15 seconds.
"""
use GenServer
alias GRPC.Channel
Expand All @@ -79,7 +77,6 @@ defmodule GRPC.Client.Connection do

@insecure_scheme "http"
@secure_scheme "https"
@refresh_interval 15_000
@default_resolve_interval 30_000
@default_max_resolve_interval 300_000
@default_min_resolve_interval 5_000
Expand Down Expand Up @@ -122,29 +119,33 @@ defmodule GRPC.Client.Connection do
def init(%__MODULE__{} = state) do
Process.flag(:trap_exit, true)

# only now persist the chosen channel (which should already have adapter_payload
# because build_initial_state connected real channels and set virtual_channel)
:persistent_term.put(
{__MODULE__, :lb_state, state.virtual_channel.ref},
state.virtual_channel
)
connected = connected_channels(state.real_channels)

Process.send_after(self(), :refresh, @refresh_interval)
case state.lb_mod.init(channels: connected) do
{:ok, lb_state} ->
state = %{state | lb_state: lb_state}

state =
if function_exported?(state.resolver, :init, 2) do
{:ok, resolver_state} =
state.resolver.init(state.resolver_target,
connection_pid: self(),
connect_opts: state.connect_opts
)

%{state | resolver_state: resolver_state}
else
state
end

state =
if function_exported?(state.resolver, :init, 2) do
{:ok, resolver_state} =
state.resolver.init(state.resolver_target,
connection_pid: self(),
connect_opts: state.connect_opts
)

%{state | resolver_state: resolver_state}
else
state
end
:persistent_term.put({__MODULE__, state.virtual_channel.ref}, {state.lb_mod, lb_state})

{:ok, state}
{:ok, state}

{:error, reason} ->
disconnect_real_channels(state.real_channels, state.adapter)
{:stop, reason}
end
end

@doc """
Expand Down Expand Up @@ -185,16 +186,10 @@ defmodule GRPC.Client.Connection do

case DynamicSupervisor.start_child(GRPC.Client.Supervisor, child_spec(initial_state)) do
{:ok, _pid} ->
{:ok, ch}
finalize_connection(ch, opts)

{:error, {:already_started, _pid}} ->
case pick_channel(ch, opts) do
{:ok, %Channel{} = channel} ->
{:ok, channel}

_ ->
{:error, :no_connection}
end
finalize_connection(ch, opts)

{:error, reason} ->
{:error, reason}
Expand Down Expand Up @@ -244,12 +239,15 @@ defmodule GRPC.Client.Connection do
"""
@spec pick_channel(Channel.t(), keyword()) :: {:ok, Channel.t()} | {:error, term()}
def pick_channel(%Channel{ref: ref} = _channel, _opts \\ []) do
case :persistent_term.get({__MODULE__, :lb_state, ref}, nil) do
nil ->
{:error, :no_connection}
case :persistent_term.get({__MODULE__, ref}, nil) do
{lb_mod, lb_state} when not is_nil(lb_mod) ->
case lb_mod.pick(lb_state) do
{:ok, %Channel{} = channel, _new_state} -> {:ok, channel}
{:error, _} -> {:error, :no_connection}
end

%Channel{} = channel ->
{:ok, channel}
_ ->
{:error, :no_connection}
end
end

Expand Down Expand Up @@ -279,58 +277,14 @@ defmodule GRPC.Client.Connection do
state.resolver.shutdown(state.resolver_state)
end

resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}}
:persistent_term.erase({__MODULE__, :lb_state, channel.ref})

if Map.has_key?(state, :real_channels) do
Enum.map(state.real_channels, fn
{_key, {:connected, ch}} ->
do_disconnect(adapter, ch)
:persistent_term.erase({__MODULE__, channel.ref})
disconnect_real_channels(state.real_channels, adapter)

_ ->
:ok
end)

keys_to_delete = [:real_channels, :virtual_channel]
new_state = Map.drop(state, keys_to_delete)

{:reply, resp, new_state, {:continue, :stop}}
else
{:reply, resp, state, {:continue, :stop}}
end
resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}}
{:reply, resp, state, {:continue, :stop}}
end

@impl GenServer
def handle_info(
:refresh,
%{lb_mod: lb_mod, lb_state: lb_state, real_channels: channels, virtual_channel: vc} =
state
)
when not is_nil(lb_mod) do
{:ok, {prefer_host, prefer_port}, new_lb_state} = lb_mod.pick(lb_state)

channel_key = build_address_key(prefer_host, prefer_port)

case Map.get(channels, channel_key) do
{:connected, %Channel{} = picked_channel} ->
:persistent_term.put({__MODULE__, :lb_state, vc.ref}, picked_channel)

Process.send_after(self(), :refresh, @refresh_interval)
{:noreply, %{state | lb_state: new_lb_state, virtual_channel: picked_channel}}

_nil_or_failed ->
# LB picked a channel that is missing or in {:failed, _} state.
# Don't update persistent_term — keep serving from the current
# virtual_channel until re-resolution provides healthy backends.
Logger.warning("LB picked #{channel_key}, but channel is unavailable")

Process.send_after(self(), :refresh, @refresh_interval)
{:noreply, %{state | lb_state: new_lb_state}}
end
end

def handle_info(:refresh, state), do: {:noreply, state}

def handle_info({:resolver_update, result}, state) do
state = handle_resolve_result(result, state)
{:noreply, state}
Expand All @@ -356,6 +310,16 @@ defmodule GRPC.Client.Connection do
{:noreply, state}
end

def handle_info({:EXIT, _pid, :normal}, state), do: {:noreply, state}

def handle_info({:EXIT, pid, reason}, state) do
Logger.warning(
"#{inspect(__MODULE__)} received :EXIT from #{inspect(pid)} reason: #{inspect(reason)}"
)

{:noreply, state}
end

def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
Logger.warning(
"#{inspect(__MODULE__)} received :DOWN from #{inspect(pid)} with reason: #{inspect(reason)}"
Expand All @@ -378,13 +342,30 @@ defmodule GRPC.Client.Connection do

@impl GenServer
def terminate(_reason, %{virtual_channel: %{ref: ref}}) do
:persistent_term.erase({__MODULE__, :lb_state, ref})
:persistent_term.erase({__MODULE__, ref})
:ok
rescue
_ -> :ok
end

def terminate(_reason, _state), do: :ok

defp finalize_connection(%Channel{} = ch, opts) do
case pick_channel(ch, opts) do
{:ok, %Channel{} = channel} -> {:ok, channel}
_ -> {:error, :no_connection}
end
end

defp disconnect_real_channels(real_channels, adapter) when is_map(real_channels) do
Enum.each(real_channels, fn
{_key, {:connected, ch}} -> do_disconnect(adapter, ch)
_ -> :ok
end)
end

defp disconnect_real_channels(_real_channels, _adapter), do: :ok

defp handle_resolve_result({:ok, %{addresses: []}}, state), do: state

defp handle_resolve_result({:ok, %{addresses: new_addresses}}, state) do
Expand Down Expand Up @@ -445,64 +426,32 @@ defmodule GRPC.Client.Connection do
end)
end

defp rebalance_after_reconcile(new_addresses, real_channels, state) do
if state.lb_mod do
case state.lb_mod.init(addresses: new_addresses) do
{:ok, new_lb_state} ->
{:ok, {host, port}, picked_lb_state} = state.lb_mod.pick(new_lb_state)
key = build_address_key(host, port)

case Map.get(real_channels, key) do
{:connected, picked_channel} ->
maybe_update_persistent_term(state.virtual_channel, picked_channel)

%{
state
| real_channels: real_channels,
lb_state: picked_lb_state,
virtual_channel: picked_channel
}
defp rebalance_after_reconcile(_new_addresses, real_channels, state) do
connected = connected_channels(real_channels)

_ ->
fallback_to_healthy_channel(state, real_channels, picked_lb_state)
end

{:error, _} ->
fallback_to_healthy_channel(state, real_channels, state.lb_state)
new_lb_state =
if state.lb_mod do
case reconcile_lb(state.lb_mod, state.lb_state, connected) do
{:ok, s} -> s
{:error, _} -> state.lb_state
end
else
state.lb_state
end
else
fallback_to_healthy_channel(state, real_channels, state.lb_state)
end
end

defp fallback_to_healthy_channel(state, real_channels, lb_state) do
ref = state.virtual_channel.ref

case Enum.find_value(real_channels, fn {_k, v} -> match?({:connected, _}, v) && v end) do
{:connected, healthy_channel} ->
maybe_update_persistent_term(state.virtual_channel, healthy_channel)
if connected == [] do
Logger.warning("No healthy channels available after re-resolution")
end

%{
state
| real_channels: real_channels,
lb_state: lb_state,
virtual_channel: healthy_channel
}
%{state | real_channels: real_channels, lb_state: new_lb_state}
end

nil ->
Logger.warning("No healthy channels available after re-resolution")
:persistent_term.erase({__MODULE__, :lb_state, ref})
%{state | real_channels: real_channels, lb_state: lb_state}
end
defp reconcile_lb(lb_mod, lb_state, new_channels) do
lb_mod.update(lb_state, new_channels)
end

defp maybe_update_persistent_term(current_channel, new_channel) do
if current_channel != new_channel do
:persistent_term.put(
{__MODULE__, :lb_state, new_channel.ref},
new_channel
)
end
defp connected_channels(real_channels) do
for {_key, {:connected, ch}} <- real_channels, do: ch
end

defp channel_alive?({:connected, %{adapter_payload: %{conn_pid: pid}}}) when is_pid(pid) do
Expand Down Expand Up @@ -625,44 +574,36 @@ defmodule GRPC.Client.Connection do

lb_mod = choose_lb(lb_policy)

case lb_mod.init(addresses: addresses) do
{:ok, lb_state} ->
{:ok, {prefer_host, prefer_port}, new_lb_state} = lb_mod.pick(lb_state)

real_channels =
build_real_channels(addresses, base_state.virtual_channel, norm_opts, adapter)

key = build_address_key(prefer_host, prefer_port)

with {:connected, ch} <- Map.get(real_channels, key, {:failed, :no_channel}) do
{:ok,
%__MODULE__{
base_state
| lb_mod: lb_mod,
lb_state: new_lb_state,
virtual_channel: ch,
real_channels: real_channels
}}
else
{:failed, reason} -> {:error, reason}
end
real_channels =
build_real_channels(addresses, base_state.virtual_channel, norm_opts, adapter)

case connected_channels(real_channels) do
[] ->
disconnect_real_channels(real_channels, adapter)
{:error, :no_channels}

{:error, :no_addresses} ->
{:error, :no_addresses}
_connected ->
{:ok,
%__MODULE__{
base_state
| lb_mod: lb_mod,
real_channels: real_channels
}}
end
end

defp build_direct_state(%__MODULE__{} = base_state, norm_target, norm_opts, adapter) do
{host, port} = split_host_port(norm_target)
vc = base_state.virtual_channel
lb_mod = GRPC.Client.LoadBalancing.PickFirst

case connect_real_channel(vc, host, port, norm_opts, adapter) do
{:ok, ch} ->
{:ok,
%__MODULE__{
base_state
| virtual_channel: ch,
real_channels: %{"#{host}:#{port}" => {:connected, ch}}
| real_channels: %{build_address_key(host, port) => {:connected, ch}},
lb_mod: lb_mod
}}

{:error, reason} ->
Expand Down
13 changes: 7 additions & 6 deletions grpc/lib/grpc/client/load_balacing.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
defmodule GRPC.Client.LoadBalancing do
@moduledoc """
Load balancing behaviour for gRPC clients.
@moduledoc "Load balancing behaviour for gRPC clients."

alias GRPC.Channel

This module defines the behaviour that load balancing strategies must implement.
"""
@callback init(opts :: keyword()) :: {:ok, state :: any()} | {:error, reason :: any()}

@callback pick(state :: any()) ::
{:ok, {host :: String.t(), port :: non_neg_integer()}, new_state :: any()}
| {:error, reason :: any()}
{:ok, Channel.t(), new_state :: any()} | {:error, reason :: any()}

@callback update(state :: any(), new_channels :: [Channel.t()]) ::
{:ok, new_state :: any()} | {:error, reason :: any()}
end
Loading
Loading