diff --git a/grpc/guides/advanced/connection_pool.md b/grpc/guides/advanced/connection_pool.md new file mode 100644 index 00000000..88c4ac26 --- /dev/null +++ b/grpc/guides/advanced/connection_pool.md @@ -0,0 +1,90 @@ +# Connection Pool + +The Elixir gRPC client maintains a **connection pool** for every target address. Rather than opening a fresh HTTP/2 connection for each RPC, callers share a fixed set of long-lived connections. This reduces connection overhead, improves throughput under concurrency, and enables fine-grained control over how many connections and concurrent streams your client opens. + +The pool is managed transparently: when you call a stub, the client checks out a connection, tracks the open stream count, and returns the connection when the call finishes. All of this happens without any additional code on your side. + +--- + +## How It Works + +When `GRPC.Stub.connect/2` is called, a supervised pool of connections is started for the target address. Each connection is an independent HTTP/2 session capable of multiplexing many concurrent streams. + +On each RPC call the client: + +1. Picks first connection from the pool with number open streams that is still below the configured limit. +2. Increments that connection's open-stream counter and records a lease. +3. Runs the RPC. +4. Decrements the counter and releases the lease when the call completes. + +If every connection in the pool has reached `max_streams`, the client opens an **overflow** connection up to `max_overflow` extra connections. Overflow connections aren't discarded once the load subsides, but if pool dies or connections are otherwise dropped we reset to initial count. + +--- + +## Configuration + +Pool behaviour is controlled via the `:pool` option passed to `GRPC.Stub.connect/2` or `GRPC.Client.Connection.connect/2`. + +| Option | Type | Default | Description | +|:-------------------------|:----------------------------|:--------|:-------------------------------------------------------------------------------------------------------------------| +| `:size` | `non_neg_integer` | `1` | Number of persistent connections kept open. | +| `:max_overflow` | `non_neg_integer or nil` | `0` | Maximum number of extra connections created when the pool is fully saturated. `nil` means no client-side limit. | +| `:max_streams` | `pos_integer or nil` | `nil` | Maximum concurrent streams per connection. `nil` means no client-side limit (server limit applies). | +| `:health_check_enabled` | `boolean` | `false` | When `true`, a periodic gRPC health-check ping is sent on each connection every 10 minutes. | + +--- + +## Examples + +### Default pool (single connection) + +```elixir +iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051") +iex> {:ok, reply} = channel |> MyService.Stub.my_rpc(request) +``` + +### Multiple persistent connections + +Open three connections upfront to distribute concurrent load: + +```elixir +iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051", pool: %{size: 3}) +iex> {:ok, reply} = channel |> MyService.Stub.my_rpc(request) +``` + +### Overflow connections + +Keep two persistent connections and allow up to five overflow connections during traffic spikes: + +```elixir +iex> {:ok, channel} = +...> GRPC.Stub.connect("localhost:50051", +...> pool: %{size: 2, max_overflow: 5} +...> ) +``` + +### Limiting streams per connection + +Cap each connection at 100 concurrent streams. Requests beyond this limit will use a new overflow connection (if allowed): + +```elixir +iex> {:ok, channel} = +...> GRPC.Stub.connect("localhost:50051", +...> pool: %{size: 2, max_overflow: 1, max_streams: 100} +...> ) +``` + +--- + +## Disconnect + +Calling `GRPC.Stub.disconnect/1` stops the pool supervisor and closes all underlying connections: + +```elixir +iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051") +iex> {:ok, _} = GRPC.Stub.disconnect(channel) +``` + +The returned channel has `pool: nil`, indicating the pool is no longer active. + +--- diff --git a/grpc/lib/grpc/channel.ex b/grpc/lib/grpc/channel.ex index 8350b863..ea9a29b3 100644 --- a/grpc/lib/grpc/channel.ex +++ b/grpc/lib/grpc/channel.ex @@ -28,7 +28,8 @@ defmodule GRPC.Channel do interceptors: [], compressor: module(), accepted_compressors: [module()], - headers: list() + headers: list(), + pool: reference() } defstruct host: nil, port: nil, @@ -41,5 +42,6 @@ defmodule GRPC.Channel do interceptors: [], compressor: nil, accepted_compressors: [], - headers: [] + headers: [], + pool: nil end diff --git a/grpc/lib/grpc/client/application.ex b/grpc/lib/grpc/client/application.ex index 16d4a230..dd6c923a 100644 --- a/grpc/lib/grpc/client/application.ex +++ b/grpc/lib/grpc/client/application.ex @@ -4,6 +4,7 @@ defmodule GRPC.Client.Application do def start(_type, _args) do children = [ + {Registry, [keys: :unique, name: GRPC.Client.Pool.Registry]}, {DynamicSupervisor, [name: GRPC.Client.Supervisor]} ] diff --git a/grpc/lib/grpc/client/connection.ex b/grpc/lib/grpc/client/connection.ex index 2cbb537d..83609457 100644 --- a/grpc/lib/grpc/client/connection.ex +++ b/grpc/lib/grpc/client/connection.ex @@ -140,6 +140,14 @@ defmodule GRPC.Client.Connection do * `:codec` – request/response codec (default: `GRPC.Codec.Proto`) * `:compressor` / `:accepted_compressors` – message compression * `:headers` – default metadata headers + * `:pool` – connection pool options (map): + * `:size` – number of persistent connections (default: `1`) + * `:max_overflow` – max extra connections when pool is saturated; `nil` for no limit (default: `0`) + * `:max_streams` – max concurrent streams per connection; `nil` for no client-side limit (default: `nil`) + * `:health_check_enabled` – send periodic gRPC health-check pings (default: `false`) + + The pool can be disabled entirely by setting `config :grpc, pool_enabled: false` in your + application config, which restores the pre-pool behaviour (single direct connection per `connect/2` call). Returns: @@ -228,26 +236,15 @@ defmodule GRPC.Client.Connection do end @impl GenServer - def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do + def handle_call({:disconnect, %Channel{adapter: adapter, pool: nil} = channel}, _from, state) do resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}} - :persistent_term.erase({__MODULE__, :lb_state, channel.ref}) - - if Map.has_key?(state, :real_channels) do - Enum.map(state.real_channels, fn - {_key, {:ok, ch}} -> - do_disconnect(adapter, ch) - - _ -> - :ok - end) - - keys_to_delete = [:real_channels, :virtual_channel] - new_state = Map.drop(state, keys_to_delete) + do_handle_disconnect(adapter, channel, resp, state) + end - {:reply, resp, new_state, {:continue, :stop}} - else - {:reply, resp, state, {:continue, :stop}} - end + @impl GenServer + def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do + resp = {:ok, %Channel{channel | pool: nil}} + do_handle_disconnect(adapter, channel, resp, state) end @impl GenServer @@ -312,16 +309,28 @@ defmodule GRPC.Client.Connection do {:global, {__MODULE__, ref}} end - defp do_disconnect(adapter, channel) do - adapter.disconnect(channel) - rescue - _ -> - :ok - catch - _type, _value -> - :ok + defp do_handle_disconnect(adapter, channel, resp, state) do + :persistent_term.erase({__MODULE__, :lb_state, channel.ref}) + + if Map.has_key?(state, :real_channels) do + Enum.map(state.real_channels, fn + {_key, {:ok, ch}} -> do_disconnect(adapter, ch) + _ -> :ok + end) + + {:reply, resp, Map.drop(state, [:real_channels, :virtual_channel]), {:continue, :stop}} + else + {:reply, resp, state, {:continue, :stop}} + end end + defp pool_enabled?, do: Application.get_env(:grpc, :pool_enabled, false) + + defp do_disconnect(adapter, %Channel{pool: nil} = ch), do: adapter.disconnect(ch) + + defp do_disconnect(_adapter, %Channel{pool: pool_ref}), + do: GRPC.Client.Pool.stop_for_address(pool_ref) + defp build_initial_state(target, opts) do opts = Keyword.validate!(opts, @@ -333,7 +342,8 @@ defmodule GRPC.Client.Connection do codec: GRPC.Codec.Proto, compressor: nil, accepted_compressors: [], - headers: [] + headers: [], + pool: %{size: 1, max_overflow: 0, max_streams: nil, health_check_enabled: false} ) resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver) @@ -369,10 +379,10 @@ defmodule GRPC.Client.Connection do case resolver.resolve(norm_target) do {:ok, %{addresses: addresses, service_config: config}} -> - build_balanced_state(base_state, addresses, config, lb_policy_opt, norm_opts, adapter) + build_balanced_state(base_state, addresses, config, lb_policy_opt, norm_opts) {:error, _reason} -> - build_direct_state(base_state, norm_target, norm_opts, adapter) + build_direct_state(base_state, norm_target, norm_opts) end end @@ -397,8 +407,7 @@ defmodule GRPC.Client.Connection do addresses, config, lb_policy_opt, - norm_opts, - adapter + norm_opts ) do lb_policy = cond do @@ -419,7 +428,7 @@ defmodule GRPC.Client.Connection do {:ok, {prefer_host, prefer_port}, new_lb_state} = lb_mod.pick(lb_state) real_channels = - build_real_channels(addresses, base_state.virtual_channel, norm_opts, adapter) + build_real_channels(addresses, base_state.virtual_channel, norm_opts) key = build_address_key(prefer_host, prefer_port) @@ -441,38 +450,55 @@ defmodule GRPC.Client.Connection do end end - defp build_direct_state(%__MODULE__{} = base_state, norm_target, norm_opts, adapter) do + defp build_direct_state(%__MODULE__{} = base_state, norm_target, norm_opts) do {host, port} = split_host_port(norm_target) vc = base_state.virtual_channel - case connect_real_channel(vc, host, port, norm_opts, adapter) do - {:ok, ch} -> - {:ok, - %__MODULE__{ - base_state - | virtual_channel: ch, - real_channels: %{"#{host}:#{port}" => {:ok, ch}} - }} + if pool_enabled?() do + case GRPC.Client.Pool.start_for_address(vc, host, port, norm_opts) do + {:ok, ch} -> + {:ok, + %__MODULE__{ + base_state + | virtual_channel: ch, + real_channels: %{"#{host}:#{port}" => {:ok, ch}} + }} - {:error, reason} -> - {:error, reason} + {:error, reason} -> + {:error, reason} + end + else + channel = %Channel{vc | host: host, port: port} + + case base_state.adapter.connect(channel, norm_opts[:adapter_opts] || []) do + {:ok, ch} -> + {:ok, + %__MODULE__{ + base_state + | virtual_channel: ch, + real_channels: %{"#{host}:#{port}" => {:ok, ch}} + }} + + {:error, reason} -> + {:error, reason} + end end end - defp build_real_channels(addresses, %Channel{} = virtual_channel, norm_opts, adapter) do + defp build_real_channels(addresses, %Channel{} = virtual_channel, norm_opts) do Map.new(addresses, fn %{port: port, address: host} -> - case connect_real_channel( - %Channel{virtual_channel | host: host, port: port}, - host, - port, - norm_opts, - adapter - ) do - {:ok, ch} -> - {build_address_key(host, port), {:ok, ch}} + if pool_enabled?() do + case GRPC.Client.Pool.start_for_address(virtual_channel, host, port, norm_opts) do + {:ok, ch} -> {build_address_key(host, port), {:ok, ch}} + {:error, reason} -> {build_address_key(host, port), {:error, reason}} + end + else + channel = %Channel{virtual_channel | host: host, port: port} - {:error, reason} -> - {build_address_key(host, port), {:error, reason}} + case virtual_channel.adapter.connect(channel, norm_opts[:adapter_opts] || []) do + {:ok, ch} -> {build_address_key(host, port), {:ok, ch}} + {:error, reason} -> {build_address_key(host, port), {:error, reason}} + end end end) end @@ -522,16 +548,6 @@ defmodule GRPC.Client.Connection do defp choose_lb(:round_robin), do: GRPC.Client.LoadBalancing.RoundRobin defp choose_lb(_), do: GRPC.Client.LoadBalancing.PickFirst - defp connect_real_channel(%Channel{scheme: "unix"} = vc, path, port, opts, adapter) do - %Channel{vc | host: path, port: port} - |> adapter.connect(opts[:adapter_opts]) - end - - defp connect_real_channel(%Channel{} = vc, host, port, opts, adapter) do - %Channel{vc | host: host, port: port} - |> adapter.connect(opts[:adapter_opts]) - end - defp split_host_port(target) do case String.split(target, ":", trim: true) do [h, p] -> {h, String.to_integer(p)} diff --git a/grpc/lib/grpc/client/pool.ex b/grpc/lib/grpc/client/pool.ex new file mode 100644 index 00000000..e35c7adf --- /dev/null +++ b/grpc/lib/grpc/client/pool.ex @@ -0,0 +1,71 @@ +defmodule GRPC.Client.Pool do + @moduledoc false + + alias GRPC.Channel + alias GRPC.Client.Pool.Config + + @call_timeout 7_000 + + @spec start_for_address(Channel.t(), term(), non_neg_integer(), keyword()) :: + {:ok, Channel.t()} | {:error, any()} + def start_for_address(%Channel{} = vc, host, port, norm_opts) do + pool_opts = norm_opts[:pool] + address_pool_ref = make_ref() + + config = %Config{ + pool_ref: address_pool_ref, + channel: %Channel{vc | host: host, port: port}, + pool_size: Map.get(pool_opts, :size, 1), + max_pool_overflow: Map.get(pool_opts, :max_overflow, 0), + max_client_streams_per_connection: Map.get(pool_opts, :max_streams), + adapter_opts: norm_opts[:adapter_opts] || [], + health_check_enabled: Map.get(pool_opts, :health_check_enabled, false) + } + + case DynamicSupervisor.start_child( + GRPC.Client.Supervisor, + {GRPC.Client.Pool.Supervisor, config} + ) do + {:ok, _} -> + {:ok, %Channel{vc | host: host, port: port, pool: address_pool_ref}} + + {:error, reason} -> + {:error, reason} + end + end + + @spec stop_for_address(reference()) :: :ok + def stop_for_address(pool_ref) do + case Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Supervisor, pool_ref}) do + [{sup_pid, _}] -> Supervisor.stop(sup_pid, :normal) + [] -> :ok + end + rescue + _ -> :ok + end + + @spec checkout(reference()) :: {GRPC.Client.Pool.Server.State.Channel.t(), Channel.t()} | nil + def checkout(pool_ref) do + case Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) do + [{pool_pid, _}] -> + case GenServer.call(pool_pid, :take_channel, @call_timeout) do + nil -> nil + %GRPC.Client.Pool.Server.State.Channel{channel: channel} = wrapped -> {wrapped, channel} + end + + [] -> + nil + end + end + + @spec checkin(reference(), GRPC.Client.Pool.Server.State.Channel.t()) :: :ok + def checkin(pool_ref, wrapped_channel) do + case Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) do + [{pool_pid, _}] -> + GenServer.cast(pool_pid, {:return_channel, wrapped_channel, self()}) + + [] -> + :ok + end + end +end diff --git a/grpc/lib/grpc/client/pool/config.ex b/grpc/lib/grpc/client/pool/config.ex new file mode 100644 index 00000000..1161877c --- /dev/null +++ b/grpc/lib/grpc/client/pool/config.ex @@ -0,0 +1,23 @@ +defmodule GRPC.Client.Pool.Config do + @moduledoc false + + defstruct [ + :pool_ref, + :channel, + :pool_size, + :max_pool_overflow, + :max_client_streams_per_connection, + :adapter_opts, + :health_check_enabled + ] + + @type t :: %__MODULE__{ + pool_ref: reference(), + channel: GRPC.Channel.t(), + pool_size: non_neg_integer(), + max_pool_overflow: non_neg_integer() | nil, + max_client_streams_per_connection: non_neg_integer() | nil, + adapter_opts: keyword(), + health_check_enabled: boolean() + } +end diff --git a/grpc/lib/grpc/client/pool/health_check/dynamic_supervisor.ex b/grpc/lib/grpc/client/pool/health_check/dynamic_supervisor.ex new file mode 100644 index 00000000..ad428ea9 --- /dev/null +++ b/grpc/lib/grpc/client/pool/health_check/dynamic_supervisor.ex @@ -0,0 +1,35 @@ +defmodule GRPC.Client.Pool.HealthCheck.DynamicSupervisor do + @moduledoc """ + Supervises health-check GenServers, restarts them in case of crush, + allows starting new ones dynamically + """ + + use DynamicSupervisor + + alias GRPC.Client.Pool.Config + alias GRPC.Client.Pool.HealthCheck.Server + alias GRPC.Client.Pool.Server.State.Channel + + @spec start_link(Config.t()) :: Supervisor.on_start() + def start_link(%Config{} = config) do + DynamicSupervisor.start_link(__MODULE__, config, name: via_tuple(config.pool_ref)) + end + + @impl DynamicSupervisor + def init(_args), do: DynamicSupervisor.init(strategy: :one_for_one) + + @spec start(Channel.id(), Process.dest(), reference()) :: DynamicSupervisor.on_start_child() + def start(channel_id, conn_pid, pool_ref) do + [{dynamic_supervisor_pid, _value}] = + Registry.lookup(GRPC.Client.Pool.Registry, {__MODULE__, pool_ref}) + + DynamicSupervisor.start_child( + dynamic_supervisor_pid, + {Server, %{channel_id: channel_id, conn_pid: conn_pid, pool_ref: pool_ref}} + ) + end + + defp via_tuple(pool_ref) do + {:via, Registry, {GRPC.Client.Pool.Registry, {__MODULE__, pool_ref}}} + end +end diff --git a/grpc/lib/grpc/client/pool/health_check/server.ex b/grpc/lib/grpc/client/pool/health_check/server.ex new file mode 100644 index 00000000..b0d98295 --- /dev/null +++ b/grpc/lib/grpc/client/pool/health_check/server.ex @@ -0,0 +1,150 @@ +defmodule GRPC.Client.Pool.HealthCheck.Server do + @moduledoc """ + Job of this module is to send periodic health-check messages to single GRPC Channel. + This server has to be alive for the whole lifespan of a given channels connection. + In case that channel get's closed, dies or exits in some other manner, it should terminate gracefully. + It overrides public API and invokes internal logic of Pool as it needs granular controll + over what connection/channel_id it is using (as we want to monitor specific channel per single HC server) + """ + + use GenServer + + require Logger + + alias GRPC.Client.Pool.Server.State.Channel + + defmodule State do + defstruct [:channel_id, :conn_pid, :pool_ref, :health_check_enabled?] + + @type t :: %__MODULE__{ + channel_id: GRPC.Client.Pool.Server.State.Channel.id(), + conn_pid: Process.dest(), + pool_ref: reference(), + health_check_enabled?: boolean() + } + end + + @connection_health_check_interval 10 * 60 * 1_000 + @call_timeout 7_000 + + @spec start_link(%{ + :channel_id => Channel.id(), + :conn_pid => Process.dest(), + :pool_ref => reference() + }) :: + :ignore | {:error, any()} | {:ok, pid()} + def start_link(%{channel_id: channel_id, conn_pid: conn_pid, pool_ref: pool_ref}) do + state = %State{ + channel_id: channel_id, + conn_pid: conn_pid, + pool_ref: pool_ref, + health_check_enabled?: true + } + + GenServer.start_link(__MODULE__, state, name: via_tuple(channel_id, pool_ref)) + end + + @impl GenServer + def init(%State{conn_pid: conn_pid} = state) do + Process.monitor(conn_pid) + Process.send_after(self(), :send_health_check, @connection_health_check_interval) + {:ok, state} + end + + defp send_health_check(%State{channel_id: channel_id, pool_ref: pool_ref} = state) do + case with_chan(pool_ref, channel_id, &send_message/1) do + # Status 12 https://grpc.github.io/grpc/core/md_doc_statuscodes.html + # UNIMPLEMENTED | 12 | The operation is not implemented or is not supported/enabled in this service. + {:error, %GRPC.RPCError{status: 12}} -> + Logger.error( + "Received gRPC error not implemented when sending health-check message. Will disable periodic messages." + ) + + %{state | health_check_enabled?: false} + + {:error, error} -> + Logger.warning( + "Received gRPC error when sending health-check message. Will continue to send periodic messages.", + error: inspect(error) + ) + + state + + {:ok, _} -> + state + + :ok -> + state + end + end + + @impl GenServer + # For connection health-check it's enough for us to send ping message + # if connection is healthy it will return some value, if it's not healthy + # the connection will close, GRPC ConnectionProcess will exit and we will remove it + # in :DOWN handler + def handle_info(:send_health_check, %State{} = state) do + state = send_health_check(state) + + # Here we could potentially exit, if HC get's disables self + # due to it not being implemented on server side, but I think it's + # okay to leave process alive as it will self terminate one connection it + # was supposed to HC dies. + if state.health_check_enabled? do + Process.send_after(self(), :send_health_check, @connection_health_check_interval) + end + + {:noreply, state} + end + + # In case that connection process exists, we will receive :DOWN message + # once connection process is dead there is no reason for us to be alive + # so we just exit normally and never get restarted + @impl GenServer + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + {:stop, :normal, state} + end + + defp with_chan(pool_ref, channel_id, callback) do + case Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) do + [{pool_pid, _}] -> + pool_pid + |> GenServer.call({:take_channel, channel_id}, @call_timeout) + |> maybe_execute_callback(callback, pool_pid) + + [] -> + :ok + end + end + + defp maybe_execute_callback( + %GRPC.Client.Pool.Server.State.Channel{} = wrapped_channel, + callback, + pid + ), + do: do_execute_callback(wrapped_channel, callback, pid) + + defp maybe_execute_callback(nil, _callback, _pid), do: :ok + + defp do_execute_callback( + %GRPC.Client.Pool.Server.State.Channel{channel: channel} = wrapped_channel, + callback, + pid + ) do + try do + callback.(channel) + after + GenServer.cast(pid, {:return_channel, wrapped_channel, self()}) + end + end + + defp via_tuple(channel_id, pool_ref) do + {:via, Registry, {GRPC.Client.Pool.Registry, {__MODULE__, channel_id, pool_ref}}} + end + + defp send_message(%GRPC.Channel{} = channel) do + GRPC.Health.V1.Health.Stub.check(channel, %GRPC.Health.V1.HealthCheckRequest{ + service: "healthcheck" + }) + end +end diff --git a/grpc/lib/grpc/client/pool/implementation.ex b/grpc/lib/grpc/client/pool/implementation.ex new file mode 100644 index 00000000..e5370465 --- /dev/null +++ b/grpc/lib/grpc/client/pool/implementation.ex @@ -0,0 +1,290 @@ +defmodule GRPC.Client.Pool.Implementation do + @moduledoc false + + require Logger + + alias GRPC.Client.Pool.Config + alias GRPC.Client.Pool.Server.State + alias GRPC.Client.Pool.HealthCheck.DynamicSupervisor + + @spec init(State.t()) :: State.t() + def init(%State{config: %Config{} = config} = state) do + pool_size = config.pool_size + open_connection_pool(state, pool_size) + end + + defp open_connection_pool(%State{config: config} = state, pool_size) do + Enum.reduce(1..pool_size, state, fn _, state_acc -> + case open_new_connection(config) do + %State.Channel{} = wrapped_channel -> + state_acc + |> put_in([:channels, wrapped_channel.id], wrapped_channel) + |> put_in([:leases_by_channel_id, wrapped_channel.id], []) + + _ -> + state_acc + end + end) + end + + defp open_new_connection(%Config{} = config) do + case config.channel.adapter.connect(config.channel, config.adapter_opts) do + {:ok, %GRPC.Channel{} = channel} -> + channel_id = make_ref() + wrapped_channel = %State.Channel{channel: channel, id: channel_id, open_streams: 0} + + if config.health_check_enabled do + DynamicSupervisor.start( + channel_id, + fetch_conn_pid_from_channel(wrapped_channel), + config.pool_ref + ) + end + + wrapped_channel + + _ -> + nil + end + end + + @spec handle_connection_process_crush(State.t(), reference()) :: {State.t(), [reference(), ...]} + def handle_connection_process_crush(state, pid) do + state + |> find_channel_by_adapter_payload_connection_pid(pid) + |> maybe_remove_channel_and_leases() + end + + defp find_channel_by_adapter_payload_connection_pid(%State{channels: channels} = state, pid) do + {state, + Enum.find_value(channels, fn {_id, wrapped_channel} -> + if fetch_conn_pid_from_channel(wrapped_channel) == pid, do: wrapped_channel + end)} + end + + defp maybe_remove_channel_and_leases({state, nil = _channel} = _state_channel_tuple) do + {state, []} + end + + defp maybe_remove_channel_and_leases( + {%State{channels: _}, %State.Channel{}} = state_channel_tuple + ) do + state_channel_tuple + |> remove_channel() + |> remove_leases_by_channel_id_for_channel() + |> remove_leases_by_monitor_reference_for_channel() + |> maybe_replenish_pool() + end + + defp maybe_replenish_pool({%State{channels: channels, config: config} = state, monitor_refs}) do + deficit = config.pool_size - map_size(channels) + + { + if(deficit > 0, do: open_connection_pool(state, deficit), else: state), + monitor_refs + } + end + + defp remove_channel({%State{channels: channels} = state, %State.Channel{id: id} = channel}) do + {%{state | channels: Map.delete(channels, id)}, channel} + end + + defp remove_leases_by_channel_id_for_channel( + {%State{leases_by_channel_id: leases} = state, %State.Channel{id: id}} + ) do + {removed_leases, leases} = Map.pop(leases, id, []) + + monitor_refs = + Enum.map(removed_leases, fn %State.Lease{monitor_ref: monitor_ref} -> monitor_ref end) + + {%{state | leases_by_channel_id: leases}, monitor_refs} + end + + defp remove_leases_by_monitor_reference_for_channel( + {%State{leases_by_monitor_ref: leases} = state, monitor_refs} + ) do + leases = Map.drop(leases, monitor_refs) + {%{state | leases_by_monitor_ref: leases}, monitor_refs} + end + + @spec return_channel(State.t(), reference()) :: {State.t(), reference() | nil} + def return_channel(state, monitor_ref) when is_reference(monitor_ref) do + case Map.get(state.leases_by_monitor_ref, monitor_ref) do + nil -> + {state, monitor_ref} + + %State.Lease{id: id, caller_pid: caller_pid} -> + channel = Map.fetch!(state.channels, id) + return_channel(state, channel, caller_pid) + end + end + + @spec return_channel(State.t(), State.Channel.t(), pid()) :: {State.t(), reference() | nil} + def return_channel( + %State{channels: channels} = state, + %State.Channel{} = wrapped_channel, + caller_pid + ) do + channels + |> Map.has_key?(wrapped_channel.id) + |> maybe_return_channel(state, wrapped_channel, caller_pid) + end + + defp maybe_return_channel( + true = _channel_in_state, + %State{} = state, + %State.Channel{} = wrapped_channel, + caller_pid + ) do + state + |> decrement_open_streams_count(wrapped_channel) + |> remove_lease_from_leases_by_channel_id(wrapped_channel, caller_pid) + |> remove_lease_from_leases_by_monitor_ref() + end + + defp maybe_return_channel( + false = _channel_in_state, + %State{} = state, + _wrapped_channel, + _caller_pid + ) do + {state, nil} + end + + defp decrement_open_streams_count(%State{} = state, channel) do + update_in(state, [:channels, channel.id, :open_streams], &(&1 - 1)) + end + + defp remove_lease_from_leases_by_channel_id( + %State{leases_by_channel_id: leases} = state, + channel, + caller_pid + ) do + old_leases = Map.get(leases, channel.id, []) + old_lease = Enum.find(old_leases, fn %State.Lease{caller_pid: pid} -> pid == caller_pid end) + + new_leases = + Enum.reject(old_leases, fn %State.Lease{caller_pid: pid} -> pid == caller_pid end) + + {put_in(state, [:leases_by_channel_id, channel.id], new_leases), + old_lease && old_lease.monitor_ref} + end + + defp remove_lease_from_leases_by_monitor_ref( + {%State{leases_by_monitor_ref: leases} = state, monitor_ref} + ) do + {%{state | leases_by_monitor_ref: Map.delete(leases, monitor_ref)}, monitor_ref} + end + + @spec take_channel(State.t(), pid()) :: {State.Channel.t(), reference(), State.t()} + def take_channel(state, caller_pid) do + state + |> maybe_open_channels() + |> choose_channel() + |> maybe_open_new_connection() + |> lease_channel(caller_pid) + end + + @doc """ + This function may not return channel if requested channel has reached stream limit + """ + @spec take_channel(State.t(), State.Channel.id(), pid()) :: + {State.Channel.t() | nil, reference() | nil, State.t()} + def take_channel(state, channel_id, caller_pid) do + state + |> choose_channel(channel_id) + |> lease_channel(caller_pid) + end + + defp maybe_open_channels(%{channels: channels} = state) when map_size(channels) > 0, do: state + defp maybe_open_channels(%{channels: %{}} = state), do: init(state) + + defp choose_channel( + %{ + channels: wrapped_channel, + config: %Config{max_client_streams_per_connection: max_streams} + } = state + ) do + { + Enum.find_value(wrapped_channel, fn {_id, %State.Channel{open_streams: open_streams} = ch} -> + if is_nil(max_streams) or open_streams < max_streams, do: ch + end), + state + } + end + + defp choose_channel( + %{ + channels: wrapped_channel, + config: %Config{max_client_streams_per_connection: max_streams} + } = state, + channel_id + ) do + { + Enum.find_value(wrapped_channel, fn {_id, + %State.Channel{open_streams: open_streams, id: id} = ch} -> + if (is_nil(max_streams) or open_streams < max_streams) and id == channel_id, do: ch + end), + state + } + end + + defp maybe_open_new_connection({nil, %State{channels: channels, config: config} = state}) + when not is_nil(config.max_pool_overflow) and + map_size(channels) >= config.pool_size + config.max_pool_overflow, + do: {nil, state} + + defp maybe_open_new_connection({nil, %State{config: config} = state}) do + case open_new_connection(config) do + %State.Channel{} = wrapped_channel -> + state = + state + |> put_in([:channels, wrapped_channel.id], wrapped_channel) + |> put_in([:leases_by_channel_id, wrapped_channel.id], []) + + {wrapped_channel, state} + + _ -> + {nil, state} + end + end + + defp maybe_open_new_connection({wrapped_channel, state}), do: {wrapped_channel, state} + + defp lease_channel({%State.Channel{} = wrapped_channel, state}, caller_pid) do + monitor_ref = Process.monitor(caller_pid) + + { + wrapped_channel, + monitor_ref, + state + |> increment_open_streams_count(wrapped_channel) + |> add_lease_to_leases_by_channel_id(wrapped_channel, monitor_ref, caller_pid) + |> add_lease_to_leases_by_monitor_ref(wrapped_channel, monitor_ref, caller_pid) + } + end + + defp lease_channel({nil = _wrapped_channel, state}, _caller_pid), do: {nil, nil, state} + + defp increment_open_streams_count(%State{} = state, channel) do + update_in(state, [:channels, channel.id, :open_streams], &(&1 + 1)) + end + + defp add_lease_to_leases_by_channel_id(%State{} = state, channel, monitor_ref, caller_pid) do + new_lease = %State.Lease{monitor_ref: monitor_ref, caller_pid: caller_pid, id: channel.id} + update_in(state, [:leases_by_channel_id, channel.id], &[new_lease | &1]) + end + + defp add_lease_to_leases_by_monitor_ref(%State{} = state, channel, monitor_ref, caller_pid) do + put_in(state, [:leases_by_monitor_ref, monitor_ref], %State.Lease{ + id: channel.id, + caller_pid: caller_pid, + monitor_ref: monitor_ref + }) + end + + defp fetch_conn_pid_from_channel(%State.Channel{ + channel: %GRPC.Channel{adapter_payload: %{conn_pid: conn_pid}} + }), + do: conn_pid +end diff --git a/grpc/lib/grpc/client/pool/server.ex b/grpc/lib/grpc/client/pool/server.ex new file mode 100644 index 00000000..68c82eb2 --- /dev/null +++ b/grpc/lib/grpc/client/pool/server.ex @@ -0,0 +1,153 @@ +defmodule GRPC.Client.Pool.Server do + @moduledoc false + + require Logger + + use GenServer + + alias GRPC.Client.Pool.Config + alias GRPC.Client.Pool.Implementation + + defmodule DeriveAccess do + defmacro __using__(_opts) do + quote do + @behaviour Access + defdelegate fetch(term, key), to: Map + defdelegate get(term, key, default), to: Map + defdelegate get_and_update(term, key, fun), to: Map + defdelegate pop(term, key), to: Map + end + end + end + + defmodule State do + use GRPC.Client.Pool.Server.DeriveAccess + + defmodule Channel do + use GRPC.Client.Pool.Server.DeriveAccess + + defstruct [:id, :open_streams, :channel] + + @type id :: reference() + + @type t :: %__MODULE__{ + id: id(), + open_streams: non_neg_integer(), + channel: GRPC.Channel.t() + } + end + + defmodule Lease do + use GRPC.Client.Pool.Server.DeriveAccess + + defstruct [:id, :monitor_ref, :caller_pid] + + @type t :: %__MODULE__{ + id: GRPC.Client.Pool.Server.State.Channel.id(), + monitor_ref: reference(), + caller_pid: pid() + } + end + + defstruct channels: %{}, config: %{}, leases_by_channel_id: %{}, leases_by_monitor_ref: %{} + + @type t :: %__MODULE__{ + channels: %{Channel.id() => Channel.t()}, + config: GRPC.Client.Pool.Config.t(), + leases_by_channel_id: %{Channel.id() => [Lease.t()]}, + leases_by_monitor_ref: %{reference() => Lease.t()} + } + end + + @spec start_link(Config.t()) :: GenServer.on_start() + def start_link(%Config{} = config) do + GenServer.start_link(__MODULE__, %State{config: config}, name: via_tuple(config.pool_ref)) + end + + @impl GenServer + @spec init(State.t()) :: {:ok, State.t()} + def init(state) do + Process.flag(:trap_exit, true) + {:ok, Implementation.init(state)} + end + + @impl GenServer + def handle_call(:take_channel, {caller_pid, _tag} = _from, state) do + {channel, _monitor_ref, state} = Implementation.take_channel(state, caller_pid) + {:reply, channel, state} + end + + @impl GenServer + def handle_call({:take_channel, channel_id}, {caller_pid, _tag} = _from, state) do + {channel, _maybe_monitor_ref, state} = + Implementation.take_channel(state, channel_id, caller_pid) + + {:reply, channel, state} + end + + @impl GenServer + def handle_cast({:return_channel, wrapped_channel, caller_pid}, state) do + {state, monitor_ref} = Implementation.return_channel(state, wrapped_channel, caller_pid) + if monitor_ref, do: Process.demonitor(monitor_ref, [:flush]) + {:noreply, state} + end + + @impl GenServer + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do + {state, _monitor_ref} = Implementation.return_channel(state, ref) + {:noreply, state} + end + + @impl GenServer + # Gun sends this when the HTTP/2 session drops (process still alive, connection ended). + # Treat it the same as a connection crash — remove the channel and its leases. + def handle_info({:gun_down, pid, _protocol, _reason, _killed_streams}, state) do + {state, monitor_refs} = Implementation.handle_connection_process_crush(state, pid) + Enum.each(monitor_refs, &Process.demonitor/1) + {:noreply, state} + end + + @impl GenServer + def handle_info({:gun_up, _pid, _protocol}, state), do: {:noreply, state} + + @impl GenServer + def handle_info({:elixir_grpc, :connection_down, pid}, state) do + {state, monitor_refs} = Implementation.handle_connection_process_crush(state, pid) + Enum.map(monitor_refs, &Process.demonitor/1) + {:noreply, state} + end + + @impl GenServer + # In case that Client process crushes, GRPC ConnectionProcess is crushing as well with + # GenServer #PID<0.1333.0> terminating + # ** (stop) exited in: GenServer.call(#PID<0.1369.0>, {:consume_response, {:headers, [{"content-type", "application/grpc+proto"}, {"date", "Wed, 10 Apr 2024 16:22:50 GMT"}, {"server", "Cowboy"}]}}, 5000) + # ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started + # + # And because our pool is linked with ConnectionProcess, it's crushing as well + # over here, we want to prevent ourself from crushing, and instead remove the GRPC.Channel from the pool + # + # PID received in this info refers to %GRPC.Channel{adapter_payload: %{conn_pid: ^pid}} + # So what we need to do is: + # - search all channels for this pid + # - fetch `channel_id` + # - remove channel from the list + # - remove by channel id from `leases_by_channel_id` + # - remove by references (fetched from `leases_by_channel_id`) from `leases_by_monitor_ref` + # + def handle_info({:EXIT, pid, _reason}, state) do + {state, monitor_refs} = Implementation.handle_connection_process_crush(state, pid) + Enum.map(monitor_refs, &Process.demonitor/1) + {:noreply, state} + end + + @impl GenServer + def terminate(_reason, state) do + state.channels + |> Map.values() + |> Enum.each(fn %State.Channel{channel: ch} -> ch.adapter.disconnect(ch) end) + end + + defp via_tuple(pool_ref) do + {:via, Registry, {GRPC.Client.Pool.Registry, {__MODULE__, pool_ref}}} + end +end diff --git a/grpc/lib/grpc/client/pool/supervisor.ex b/grpc/lib/grpc/client/pool/supervisor.ex new file mode 100644 index 00000000..86079662 --- /dev/null +++ b/grpc/lib/grpc/client/pool/supervisor.ex @@ -0,0 +1,44 @@ +defmodule GRPC.Client.Pool.Supervisor do + @moduledoc false + + use Supervisor + + alias GRPC.Client.Pool.Config + + def child_spec(%Config{} = config) do + %{ + id: {__MODULE__, config.pool_ref}, + start: {__MODULE__, :start_link, [config]}, + type: :supervisor, + restart: :transient + } + end + + @spec start_link(Config.t()) :: :ignore | {:error, any()} | {:ok, pid()} + def start_link(%Config{} = config) do + Supervisor.start_link(__MODULE__, config, name: via_tuple(config.pool_ref)) + end + + @impl Supervisor + def init(%Config{} = config) do + pool_ref = config.pool_ref + + Supervisor.init( + [ + Supervisor.child_spec( + {GRPC.Client.Pool.HealthCheck.DynamicSupervisor, config}, + id: {GRPC.Client.Pool.HealthCheck.DynamicSupervisor, pool_ref} + ), + Supervisor.child_spec( + {GRPC.Client.Pool.Server, config}, + id: {GRPC.Client.Pool.Server, pool_ref} + ) + ], + strategy: :one_for_all + ) + end + + defp via_tuple(pool_ref) do + {:via, Registry, {GRPC.Client.Pool.Registry, {__MODULE__, pool_ref}}} + end +end diff --git a/grpc/lib/grpc/health/v1/health.pb.ex b/grpc/lib/grpc/health/v1/health.pb.ex new file mode 100644 index 00000000..6b19b202 --- /dev/null +++ b/grpc/lib/grpc/health/v1/health.pb.ex @@ -0,0 +1,177 @@ +defmodule GRPC.Health.V1.HealthCheckResponse.ServingStatus do + @moduledoc false + use Protobuf, enum: true, syntax: :proto3, protoc_gen_elixir_version: "0.11.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.EnumDescriptorProto{ + name: "ServingStatus", + value: [ + %Google.Protobuf.EnumValueDescriptorProto{ + name: "UNKNOWN", + number: 0, + options: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.EnumValueDescriptorProto{ + name: "SERVING", + number: 1, + options: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.EnumValueDescriptorProto{ + name: "NOT_SERVING", + number: 2, + options: nil, + __unknown_fields__: [] + } + ], + options: nil, + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:UNKNOWN, 0) + field(:SERVING, 1) + field(:NOT_SERVING, 2) +end + +defmodule GRPC.Health.V1.HealthCheckRequest do + @moduledoc false + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.11.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.DescriptorProto{ + name: "HealthCheckRequest", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "service", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "service", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [], + enum_type: [], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:service, 1, type: :string) +end + +defmodule GRPC.Health.V1.HealthCheckResponse do + @moduledoc false + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.11.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.DescriptorProto{ + name: "HealthCheckResponse", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "status", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_ENUM, + type_name: ".grpc.health.v1.HealthCheckResponse.ServingStatus", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "status", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [], + enum_type: [ + %Google.Protobuf.EnumDescriptorProto{ + name: "ServingStatus", + value: [ + %Google.Protobuf.EnumValueDescriptorProto{ + name: "UNKNOWN", + number: 0, + options: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.EnumValueDescriptorProto{ + name: "SERVING", + number: 1, + options: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.EnumValueDescriptorProto{ + name: "NOT_SERVING", + number: 2, + options: nil, + __unknown_fields__: [] + } + ], + options: nil, + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + ], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:status, 1, type: GRPC.Health.V1.HealthCheckResponse.ServingStatus, enum: true) +end + +defmodule GRPC.Health.V1.Health.Service do + @moduledoc false + use GRPC.Service, name: "grpc.health.v1.Health", protoc_gen_elixir_version: "0.11.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.ServiceDescriptorProto{ + name: "Health", + method: [ + %Google.Protobuf.MethodDescriptorProto{ + name: "Check", + input_type: ".grpc.health.v1.HealthCheckRequest", + output_type: ".grpc.health.v1.HealthCheckResponse", + options: nil, + client_streaming: false, + server_streaming: false, + __unknown_fields__: [] + } + ], + options: nil, + __unknown_fields__: [] + } + end + + rpc(:Check, GRPC.Health.V1.HealthCheckRequest, GRPC.Health.V1.HealthCheckResponse) +end + +defmodule GRPC.Health.V1.Health.Stub do + @moduledoc false + use GRPC.Stub, service: GRPC.Health.V1.Health.Service +end diff --git a/grpc/lib/grpc/stub.ex b/grpc/lib/grpc/stub.ex index ed9756f4..5e2e391a 100644 --- a/grpc/lib/grpc/stub.ex +++ b/grpc/lib/grpc/stub.ex @@ -215,6 +215,14 @@ defmodule GRPC.Stub do without `:accepted_compressors`. * `:accepted_compressors` - tell servers accepted compressors, this can be used without `:compressor` * `:headers` - headers to attach to each request + * `:pool` - connection pool options (map): + * `:size` - number of persistent connections (default: `1`) + * `:max_overflow` - max extra connections when pool is saturated; `nil` for no limit (default: `0`) + * `:max_streams` - max concurrent streams per connection; `nil` for no client-side limit (default: `nil`) + * `:health_check_enabled` - send periodic gRPC health-check pings (default: `false`) + + The pool can be disabled entirely by setting `config :grpc, pool_enabled: false` in your + application config, which restores the pre-pool behaviour (single direct connection per `connect/2` call). ## Examples @@ -301,13 +309,53 @@ defmodule GRPC.Stub do def call(_service_mod, rpc, %{channel: channel} = stream, request, opts) do {_, {req_mod, req_stream}, {res_mod, response_stream}, _rpc_options} = rpc + with {:ok, ch, pool_lease} <- acquire_channel(channel, opts) do + stream = %{stream | channel: ch, request_mod: req_mod, response_mod: res_mod} + + opts = + if req_stream || response_stream do + parse_req_opts([{:timeout, :infinity} | opts]) + else + parse_req_opts([{:timeout, @default_timeout} | opts]) + end + + compressor = Keyword.get(opts, :compressor, ch.compressor) + accepted_compressors = Keyword.get(opts, :accepted_compressors, ch.accepted_compressors) + + if not is_list(accepted_compressors) do + raise ArgumentError, "accepted_compressors is not a list" + end + + accepted_compressors = + if compressor do + Enum.uniq([compressor | accepted_compressors]) + else + accepted_compressors + end + + stream = %{ + stream + | codec: Keyword.get(opts, :codec, ch.codec), + compressor: compressor, + accepted_compressors: accepted_compressors + } + + try do + GRPC.Telemetry.client_span(stream, request, fn -> + do_call(req_stream, stream, request, opts) + end) + after + release_channel(channel, pool_lease) + end + end + end + + defp acquire_channel(%Channel{pool: nil} = channel, opts) do ch = case Connection.pick_channel(channel, opts) do - {:ok, %Channel{adapter_payload: adapter_payload} = ch} when is_map(adapter_payload) -> - conn_pid = Map.get(adapter_payload, :conn_pid) - - if is_pid(conn_pid) and Process.alive?(conn_pid) do - ch + {:ok, %Channel{adapter_payload: %{conn_pid: conn_pid}} = picked} when is_pid(conn_pid) -> + if Process.alive?(conn_pid) do + picked else Logger.warning( "The connection process #{inspect(conn_pid)} is not alive, " <> @@ -318,43 +366,34 @@ defmodule GRPC.Stub do end _ -> - # fallback to the channel in the stream channel end - stream = %{stream | channel: ch, request_mod: req_mod, response_mod: res_mod} + {:ok, ch, nil} + end - opts = - if req_stream || response_stream do - parse_req_opts([{:timeout, :infinity} | opts]) - else - parse_req_opts([{:timeout, @default_timeout} | opts]) + defp acquire_channel(%Channel{pool: pool_ref} = channel, opts) do + # Always go through pick_channel so the LB can rotate between address pools + actual_pool_ref = + case Connection.pick_channel(channel, opts) do + {:ok, %Channel{pool: picked_pool_ref}} -> picked_pool_ref + _ -> pool_ref end - compressor = Keyword.get(opts, :compressor, ch.compressor) - accepted_compressors = Keyword.get(opts, :accepted_compressors, ch.accepted_compressors) + case GRPC.Client.Pool.checkout(actual_pool_ref) do + {wrapped, actual_channel} -> + {:ok, actual_channel, {wrapped, actual_pool_ref}} - if not is_list(accepted_compressors) do - raise ArgumentError, "accepted_compressors is not a list" + nil -> + {:error, + GRPC.RPCError.exception(GRPC.Status.resource_exhausted(), "no pool channel available")} end + end - accepted_compressors = - if compressor do - Enum.uniq([compressor | accepted_compressors]) - else - accepted_compressors - end - - stream = %{ - stream - | codec: Keyword.get(opts, :codec, ch.codec), - compressor: compressor, - accepted_compressors: accepted_compressors - } + defp release_channel(_channel, nil), do: :ok - GRPC.Telemetry.client_span(stream, request, fn -> - do_call(req_stream, stream, request, opts) - end) + defp release_channel(_channel, {wrapped_channel, pool_ref}) do + GRPC.Client.Pool.checkin(pool_ref, wrapped_channel) end defp do_call( diff --git a/grpc/mix.exs b/grpc/mix.exs index 6b4b936b..086199b4 100644 --- a/grpc/mix.exs +++ b/grpc/mix.exs @@ -62,7 +62,8 @@ defmodule GRPC.MixProject do "CHANGELOG.md", "guides/getting_started/client.md", "guides/advanced/custom_codecs.md", - "guides/advanced/load_balancing.md" + "guides/advanced/load_balancing.md", + "guides/advanced/connection_pool.md" ], groups_for_modules: [ "Client Core": [ diff --git a/grpc/test/grpc/client/pool/health_check/dynamic_supervisor_test.exs b/grpc/test/grpc/client/pool/health_check/dynamic_supervisor_test.exs new file mode 100644 index 00000000..91ed9128 --- /dev/null +++ b/grpc/test/grpc/client/pool/health_check/dynamic_supervisor_test.exs @@ -0,0 +1,87 @@ +defmodule GRPC.Client.Pool.HealthCheck.DynamicSupervisorTest do + use ExUnit.Case + + alias GRPC.Client.Pool.Config + + @subject GRPC.Client.Pool.HealthCheck.DynamicSupervisor + + describe "start/3" do + setup do + config = make_config() + supervisor_pid = start_supervised!({GRPC.Client.Pool.Supervisor, config}) + + [ + {_, hc_supervisor_pid, :supervisor, [GRPC.Client.Pool.HealthCheck.DynamicSupervisor]}, + {_, _server_pid, :worker, [GRPC.Client.Pool.Server]} + ] = + supervisor_pid + |> Supervisor.which_children() + |> Enum.sort_by(fn {_id, _pid, _type, [module]} -> module end) + + %{hc_dynamic_supervisor_pid: hc_supervisor_pid, pool_ref: config.pool_ref, id: make_ref()} + end + + test "successfully starts health-check server", + %{ + hc_dynamic_supervisor_pid: hc_dynamic_supervisor_pid, + pool_ref: pool_ref, + id: channel_id + } do + loop_pid = spawn_infinite_process() + + assert {:ok, pid} = @subject.start(channel_id, loop_pid, pool_ref) + + assert [{:undefined, ^pid, :worker, [GRPC.Client.Pool.HealthCheck.Server]}] = + hc_dynamic_supervisor_pid + |> DynamicSupervisor.which_children() + |> Enum.sort_by(fn {service_name, _, _, _} -> service_name end) + end + + test "once process exits health-check server stops with reason normal and does not get restarted", + %{pool_ref: pool_ref, id: id} do + loop_pid = spawn_infinite_process() + Process.monitor(loop_pid) + + opts = %{channel_id: id, conn_pid: loop_pid, pool_ref: pool_ref} + pid = start_supervised!({GRPC.Client.Pool.HealthCheck.Server, opts}) + Process.monitor(pid) + + assert Process.alive?(pid) + Process.exit(loop_pid, :kill) + + assert_receive {:DOWN, _ref, :process, ^loop_pid, _reason} + assert_receive {:DOWN, _ref, :process, ^pid, _reason} + + refute Process.alive?(pid) + end + end + + defp spawn_infinite_process do + spawn(fn -> + receive do + _ -> :ok + end + end) + end + + defp make_config do + %Config{ + pool_ref: make_ref(), + channel: %GRPC.Channel{ + host: "localhost", + port: 1337, + scheme: "http", + adapter: GRPC.Client.Adapters.Gun, + codec: GRPC.Codec.Proto, + interceptors: [], + compressor: nil, + accepted_compressors: [], + headers: [] + }, + pool_size: 1, + max_pool_overflow: 20, + max_client_streams_per_connection: 100, + adapter_opts: [] + } + end +end diff --git a/grpc/test/grpc/client/pool/implementation_test.exs b/grpc/test/grpc/client/pool/implementation_test.exs new file mode 100644 index 00000000..a721040d --- /dev/null +++ b/grpc/test/grpc/client/pool/implementation_test.exs @@ -0,0 +1,323 @@ +defmodule GRPC.Client.Pool.ImplementationTest do + use ExUnit.Case, async: false + + defmodule HelloServer do + use GRPC.Server, service: Helloworld.Greeter.Service + + def say_hello(req, _stream) do + %Helloworld.HelloReply{message: "Hello, #{req.name}"} + end + end + + alias GRPC.Client.Pool.Config + alias GRPC.Client.Pool.Server.State + + @subject GRPC.Client.Pool.Implementation + + setup_all do + {:ok, _, port} = GRPC.Server.start(HelloServer, 0, max_concurrent_streams: 1_000) + on_exit(fn -> :ok = GRPC.Server.stop(HelloServer) end) + %{port: port} + end + + describe "init/1" do + setup %{port: port} do + config = make_config(port) + %{state: %State{config: config}} + end + + test "opens N amount of channels, where N is a value of pool_size specified in config", %{ + state: state + } do + %State{channels: wrapped_channels} = @subject.init(state) + + assert wrapped_channels + |> Enum.map(fn {_id, wrapped} -> wrapped.channel end) + |> Enum.count() == state.config.pool_size + + assert Enum.all?(wrapped_channels, fn {_id, %State.Channel{channel: channel}} -> + is_struct(channel, GRPC.Channel) + end) + end + + test "all channels should have 0 streams", %{state: state} do + %State{channels: wrapped_channels} = @subject.init(state) + + assert Enum.all?(wrapped_channels, fn {_id, %State.Channel{open_streams: open_streams}} -> + open_streams == 0 + end) + end + end + + describe "take_channel/2" do + setup %{port: port} do + config = make_config(port) + %{state: @subject.init(%State{config: config})} + end + + test "returns a first channel that has < max_number_of_streams_per_connection", %{ + state: state + } do + {channels, _state, _pids, _refs} = take_channels_n_times(100, state) + assert length(channels) == 100 + + assert [_its_the_same_channel_for] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + end + + test "when channel has opened max_number_of_streams_per_connection next channel from pool is returned", + %{state: state} do + {channels, state, _pids, _refs} = take_channels_n_times(100, state) + + [%State.Channel{id: max_streams_channel_id}] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert {%State.Channel{id: next_channel_in_the_pool_id}, _monitor_ref, _state} = + @subject.take_channel(state, :c.pid(0, 1, 0)) + + refute max_streams_channel_id == next_channel_in_the_pool_id + end + + test "opening channel adds to leases by channel id", %{state: state} do + {channels, state, [expected_pid1, expected_pid2, expected_pid3], + [expected_ref1, expected_ref2, expected_ref3]} = + take_channels_n_times(3, state) + + [%State.Channel{id: channel_id}] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert %State{ + leases_by_channel_id: %{ + ^channel_id => [ + %State.Lease{monitor_ref: ^expected_ref1, caller_pid: ^expected_pid1}, + %State.Lease{monitor_ref: ^expected_ref2, caller_pid: ^expected_pid2}, + %State.Lease{monitor_ref: ^expected_ref3, caller_pid: ^expected_pid3} + ] + } + } = state + end + + test "opening channel adds to leases by monitor_ref", %{state: state} do + {channels, state, [expected_pid1, expected_pid2, expected_pid3], + [expected_ref1, expected_ref2, expected_ref3]} = + take_channels_n_times(3, state) + + [%State.Channel{id: channel_id}] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert %State{ + leases_by_monitor_ref: %{ + ^expected_ref1 => %State.Lease{id: ^channel_id, caller_pid: ^expected_pid1}, + ^expected_ref2 => %State.Lease{id: ^channel_id, caller_pid: ^expected_pid2}, + ^expected_ref3 => %State.Lease{id: ^channel_id, caller_pid: ^expected_pid3} + } + } = state + end + + test "when there is no free channel in the pool, new one is opened, and it's id is added to leases_by_channel_id with empty array", + %{state: state} do + [old_key] = Map.keys(state[:leases_by_channel_id]) + {_channels, state, _pids, _refs} = take_channels_n_times(101, state) + + [new_key] = + state[:leases_by_channel_id] |> Map.keys() |> Enum.reject(fn k -> k == old_key end) + + assert [%State.Lease{}] = state[:leases_by_channel_id][new_key] + end + end + + describe "take_channel/3" do + setup %{port: port} do + config = make_config(port) + %{state: @subject.init(%State{config: config})} + end + + test "when channel is requested by it's ID it is returned if it hasn't reached max_number_of_streams_per_connection", + %{state: state} do + {channels, state, _pids, _refs} = take_channels_n_times(99, state) + + assert [%{id: channel_id} = _channel] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert {%{id: ^channel_id}, _monitor_ref, _state} = + @subject.take_channel(state, channel_id, :c.pid(0, 0, 100)) + end + + test "when channel is requested by it's ID if it has reached max_number_of_streams_per_connection nil is returned", + %{state: state} do + {channels, state, _pids, _refs} = take_channels_n_times(100, state) + + assert [%{id: channel_id} = _channel] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert {nil = _channel, nil = _monitor_ref, ^state} = + @subject.take_channel(state, channel_id, :c.pid(0, 0, 100)) + end + end + + describe "return_channel/3" do + setup %{port: port} do + config = make_config(port) + %{state: %State{config: config}} + end + + test "returning a channel decrements it's stream count and makes it available to other clients", + %{state: state} do + {channels, state, [pid | _], [ref | _]} = take_channels_n_times(100, state) + + [%State.Channel{id: max_streams_channel_id} = channel] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert {state, ^ref} = @subject.return_channel(state, channel, pid) + + assert {%State.Channel{id: next_channel_in_the_pool_id}, _monitor_ref, _state} = + @subject.take_channel(state, pid) + + assert max_streams_channel_id == next_channel_in_the_pool_id + end + + test "returning a channel removes from leases by channel id", %{state: state} do + {channels, state, [expected_pid1, expected_pid2, expected_pid3], + [expected_ref1, expected_ref2, expected_ref3]} = + take_channels_n_times(3, state) + + [%State.Channel{id: channel_id} = channel] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert {state, ^expected_ref1} = @subject.return_channel(state, channel, expected_pid1) + + assert %State{ + leases_by_channel_id: %{ + ^channel_id => [ + %State.Lease{monitor_ref: ^expected_ref2, caller_pid: ^expected_pid2}, + %State.Lease{monitor_ref: ^expected_ref3, caller_pid: ^expected_pid3} + ] + } + } = state + end + + test "returning a channel removes from leases by monitor ref", %{state: state} do + {channels, state, [expected_pid1, expected_pid2, expected_pid3], + [expected_ref1, expected_ref2, expected_ref3]} = + take_channels_n_times(3, state) + + [%State.Channel{id: channel_id} = channel] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert {state, ^expected_ref1} = @subject.return_channel(state, channel, expected_pid1) + + assert %State{ + leases_by_monitor_ref: %{ + ^expected_ref2 => %State.Lease{id: ^channel_id, caller_pid: ^expected_pid2}, + ^expected_ref3 => %State.Lease{id: ^channel_id, caller_pid: ^expected_pid3} + } + } = state + end + end + + describe "handle_connection_process_crush/2" do + setup %{port: port} do + config = make_config(port) + %{state: %State{config: config}} + end + + test "when connection process crushes, related channel is removed from the list, all leases are cleared for this channel", + %{state: state} do + {channels, state, _pids, [expected_ref1, expected_ref2, expected_ref3] = refs} = + take_channels_n_times(3, state) + + [ + %State.Channel{ + id: crushed_channel_id, + channel: %GRPC.Channel{adapter_payload: %{conn_pid: crushed_pid}} + } + ] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert %State{ + channels: %{^crushed_channel_id => %State.Channel{}}, + leases_by_channel_id: %{^crushed_channel_id => [_, _, _]}, + leases_by_monitor_ref: %{ + ^expected_ref1 => _, + ^expected_ref2 => _, + ^expected_ref3 => _ + } + } = state + + assert {state, [^expected_ref1, ^expected_ref2, ^expected_ref3]} = + @subject.handle_connection_process_crush(state, crushed_pid) + + assert %State{ + channels: channels, + leases_by_channel_id: leases_by_ch_id, + leases_by_monitor_ref: leases_by_monitor_ref + } = state + + refute Map.has_key?(channels, crushed_channel_id) + refute Map.has_key?(leases_by_ch_id, crushed_channel_id) + assert Enum.all?(refs, &(not Map.has_key?(leases_by_monitor_ref, &1))) + end + + test "only crushed channel leases are removed", %{state: state} do + {channels, state, _pids, [survive_ref | expected_crush_refs]} = + take_channels_n_times(101, state) + + [ + %State.Channel{id: survive_channel_id}, + %State.Channel{channel: %GRPC.Channel{adapter_payload: %{conn_pid: crushed_pid}}} + ] = + Enum.uniq_by(channels, fn %State.Channel{id: id} -> id end) + + assert {state, crush_refs} = @subject.handle_connection_process_crush(state, crushed_pid) + assert Enum.sort(crush_refs) == Enum.sort(expected_crush_refs) + + assert %State{ + channels: channels, + leases_by_channel_id: leases_by_ch_id, + leases_by_monitor_ref: leases_by_monitor_ref + } = state + + assert Map.has_key?(channels, survive_channel_id) + assert Map.has_key?(leases_by_ch_id, survive_channel_id) + assert Map.has_key?(leases_by_monitor_ref, survive_ref) + end + + test "if channel not found in state, state is unchanged", %{state: state} do + {_channels, state, _pids, refs} = take_channels_n_times(101, state) + + assert {state, []} = @subject.handle_connection_process_crush(state, :c.pid(0, 0, 0)) + + assert %State{leases_by_monitor_ref: leases} = state + assert Enum.all?(refs, &Map.has_key?(leases, &1)) + end + end + + defp make_config(port) do + %Config{ + pool_ref: make_ref(), + channel: %GRPC.Channel{ + host: "localhost", + port: port, + scheme: "http", + adapter: GRPC.Client.Adapters.Gun, + codec: GRPC.Codec.Proto, + interceptors: [], + compressor: nil, + accepted_compressors: [], + headers: [] + }, + pool_size: 1, + max_pool_overflow: 20, + max_client_streams_per_connection: 100, + adapter_opts: [] + } + end + + defp take_channels_n_times(n, state) do + Enum.reduce(1..n, {[], state, [], []}, fn itt, {channels_acc, state_acc, pid_acc, ref_acc} -> + pid = :c.pid(0, 0, itt) + {channel, ref, state} = @subject.take_channel(state_acc, pid) + {[channel | channels_acc], state, [pid | pid_acc], [ref | ref_acc]} + end) + end +end diff --git a/grpc/test/grpc/client/pool/supervisor_test.exs b/grpc/test/grpc/client/pool/supervisor_test.exs new file mode 100644 index 00000000..d4cd3577 --- /dev/null +++ b/grpc/test/grpc/client/pool/supervisor_test.exs @@ -0,0 +1,45 @@ +defmodule GRPC.Client.Pool.SupervisorTest do + use ExUnit.Case + + alias GRPC.Client.Pool.Config + + @subject GRPC.Client.Pool.Supervisor + + describe "start_link/1" do + test "supervisor is started as well as its children" do + {:ok, supervisor_pid} = start_supervised({@subject, make_config()}) + + [ + {_, hc_supervisor_pid, :supervisor, [GRPC.Client.Pool.HealthCheck.DynamicSupervisor]}, + {_, server_pid, :worker, [GRPC.Client.Pool.Server]} + ] = + supervisor_pid + |> Supervisor.which_children() + |> Enum.sort_by(fn {_id, _pid, _type, [module]} -> module end) + + assert is_pid(server_pid) + assert is_pid(hc_supervisor_pid) + end + end + + defp make_config do + %Config{ + pool_ref: make_ref(), + channel: %GRPC.Channel{ + host: "localhost", + port: 1337, + scheme: "http", + adapter: GRPC.Client.Adapters.Gun, + codec: GRPC.Codec.Proto, + interceptors: [], + compressor: nil, + accepted_compressors: [], + headers: [] + }, + pool_size: 1, + max_pool_overflow: 20, + max_client_streams_per_connection: 100, + adapter_opts: [] + } + end +end diff --git a/grpc/test/grpc/integration/legacy_mode_test.exs b/grpc/test/grpc/integration/legacy_mode_test.exs new file mode 100644 index 00000000..571c6320 --- /dev/null +++ b/grpc/test/grpc/integration/legacy_mode_test.exs @@ -0,0 +1,230 @@ +defmodule GRPC.Integration.LegacyModeTest do + # async: false — toggles global Application env and uses global telemetry handlers + use ExUnit.Case, async: false + + import GRPC.Integration.TestCase + + defmodule HelloServer do + use GRPC.Server, service: Helloworld.Greeter.Service + + def say_hello(%{name: "raise", duration: duration}, _stream) do + Process.sleep(duration) + raise ArgumentError, "exception raised" + end + + def say_hello(%{name: "delay", duration: duration}, _stream) do + Process.sleep(duration) + %Helloworld.HelloReply{message: "Hello"} + end + + def say_hello(req, _stream) do + %Helloworld.HelloReply{message: "Hello, #{req.name}"} + end + end + + setup do + Application.put_env(:grpc, :pool_enabled, false) + on_exit(fn -> Application.delete_env(:grpc, :pool_enabled) end) + end + + def port_for(pid) do + Port.list() + |> Enum.find(fn port -> + case Port.info(port, :links) do + {:links, links} -> + pid in links + + _ -> + false + end + end) + end + + test "you can disconnect stubs" do + run_server(HelloServer, fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + Process.sleep(100) + + %{adapter_payload: %{conn_pid: gun_conn_pid}} = channel + + gun_port = port_for(gun_conn_pid) + ref = :erlang.monitor(:port, gun_port) + + {:ok, channel} = GRPC.Stub.disconnect(channel) + + assert %{adapter_payload: %{conn_pid: nil}} = channel + assert_receive {:DOWN, ^ref, :port, ^gun_port, _} + assert port_for(gun_conn_pid) == nil + end) + end + + describe "telemetry" do + test "sends server start+stop events on success" do + server_rpc_prefix = GRPC.Telemetry.server_rpc_prefix() + start_server_name = server_rpc_prefix ++ [:start] + stop_server_name = server_rpc_prefix ++ [:stop] + exception_server_name = server_rpc_prefix ++ [:exception] + + client_rpc_prefix = GRPC.Telemetry.client_rpc_prefix() + start_client_name = client_rpc_prefix ++ [:start] + stop_client_name = client_rpc_prefix ++ [:stop] + exception_client_name = client_rpc_prefix ++ [:exception] + + attach_events([ + start_server_name, + stop_server_name, + exception_server_name, + start_client_name, + stop_client_name, + exception_client_name + ]) + + run_server([HelloServer], fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + + req = %Helloworld.HelloRequest{name: "delay", duration: 1000} + + assert {:ok, _} = Helloworld.Greeter.Stub.say_hello(channel, req) + end) + + assert_received {^start_server_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{} + } = metadata + + assert_received {^stop_server_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1000 + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{} + } = metadata + + assert_received {:gun_down, _, _, _, _} + + assert_received {^start_client_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {:SayHello, {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}, + %{}} + } + } = metadata + + assert_received {^stop_client_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1100 + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {:SayHello, {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}, + %{}} + } + } = metadata + + refute_receive _ + end + + test "sends server start+exception events on success" do + server_rpc_prefix = GRPC.Telemetry.server_rpc_prefix() + start_server_name = server_rpc_prefix ++ [:start] + stop_server_name = server_rpc_prefix ++ [:stop] + exception_server_name = server_rpc_prefix ++ [:exception] + + client_rpc_prefix = GRPC.Telemetry.client_rpc_prefix() + start_client_name = client_rpc_prefix ++ [:start] + stop_client_name = client_rpc_prefix ++ [:stop] + exception_client_name = client_rpc_prefix ++ [:exception] + + attach_events([ + start_server_name, + stop_server_name, + exception_server_name, + start_client_name, + stop_client_name, + exception_client_name + ]) + + run_server([HelloServer], fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + + req = %Helloworld.HelloRequest{name: "raise", duration: 1100} + + assert {:error, %GRPC.RPCError{status: 2}} = + Helloworld.Greeter.Stub.say_hello(channel, req) + end) + + assert_received {^start_server_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{} + } = metadata + + assert_received {^exception_server_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1100 + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{}, + kind: :error, + reason: %ArgumentError{message: "exception raised"}, + stacktrace: stacktrace + } = metadata + + assert is_list(stacktrace) + + Enum.each(stacktrace, fn entry -> + assert {mod, fun, arity, meta} = entry + assert is_atom(mod) + assert is_atom(fun) + assert is_integer(arity) + assert is_list(meta) + end) + + assert_received {^start_client_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {:SayHello, {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}, + %{}} + } + } = metadata + + assert_received {^stop_client_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1100 + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {:SayHello, {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}, + %{}} + } + } = metadata + + assert_received {:gun_down, _, _, _, _} + + refute_receive _ + end + end +end diff --git a/grpc/test/grpc/integration/pool_test.exs b/grpc/test/grpc/integration/pool_test.exs new file mode 100644 index 00000000..936f5c19 --- /dev/null +++ b/grpc/test/grpc/integration/pool_test.exs @@ -0,0 +1,373 @@ +defmodule GRPC.Integration.PoolTest do + use GRPC.Integration.TestCase + + defmodule HelloServer do + use GRPC.Server, service: Helloworld.Greeter.Service + + def say_hello(req, _stream) do + %Helloworld.HelloReply{message: "Hello, #{req.name}"} + end + end + + defmodule SlowServer do + use GRPC.Server, service: Helloworld.Greeter.Service + + def say_hello(req, _stream) do + Process.sleep(100) + %Helloworld.HelloReply{message: "Hello, #{req.name}"} + end + + def loop_back(req, _stream) do + client_pid = :erlang.binary_to_term(req.client_pid) + client_msg = :erlang.binary_to_term(req.client_msg) + send(client_pid, client_msg) + Process.sleep(100) + %Helloworld.LoopBackReply{message: "Hello, #{req.client_name}"} + end + end + + defmodule ErrorServer do + use GRPC.Server, service: Helloworld.Greeter.Service + + def say_hello(_req, _stream) do + raise GRPC.RPCError, status: :internal, message: "forced error" + end + end + + test "RPC calls work through the pool" do + run_server(HelloServer, fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + + assert {:ok, %Helloworld.HelloReply{message: "Hello, World"}} = + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{name: "World"}) + end) + end + + test "pool_size controls the number of open channels" do + run_server(HelloServer, fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}", pool: %{size: 3}) + + [{pool_pid, _}] = + Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, channel.pool}) + + state = :sys.get_state(pool_pid) + assert map_size(state.channels) == 3 + end) + end + + test "concurrent requests from multiple processes all succeed" do + run_server(SlowServer, fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + + tasks = + for i <- 1..20 do + Task.async(fn -> + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{name: "#{i}"}) + end) + end + + results = Task.await_many(tasks, 5_000) + assert Enum.all?(results, fn {status, _} -> status == :ok end) + end) + end + + test "channel is returned to pool after use, allowing sequential reuse" do + run_server(HelloServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", pool: %{size: 1, max_overflow: 0, max_streams: 1}) + + assert {:ok, _} = + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{name: "first"}) + + assert {:ok, _} = + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{name: "second"}) + end) + end + + test "overflow channel is opened when all pool channels are at max streams" do + run_server(SlowServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", pool: %{size: 1, max_overflow: 1, max_streams: 1}) + + client_pid = self() + client_msg = :received_loopback_request + + task1 = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary({client_msg, 1}), + client_name: "1" + }) + end) + + assert_receive {:received_loopback_request, 1} + + task2 = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary({client_msg, 2}), + client_name: "2" + }) + end) + + assert_receive {:received_loopback_request, 2} + + assert {:ok, %Helloworld.LoopBackReply{}} = Task.await(task1, 2_000) + assert {:ok, %Helloworld.LoopBackReply{}} = Task.await(task2, 2_000) + end) + end + + test "lease is released automatically when caller process dies mid-RPC" do + run_server(SlowServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", pool: %{size: 1, max_streams: 1}) + + pool_ref = channel.pool + + [{pool_pid, _}] = + Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) + + test_pid = self() + + task = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(test_pid), + client_msg: :erlang.term_to_binary(:acquired), + client_name: "crash" + }) + end) + + # Server sends :acquired once it has received the request (channel is held by task) + assert_receive :acquired + Task.shutdown(task, :brutal_kill) + + # :sys.get_state is a synchronous GenServer call — it sits behind the :DOWN message + # in the pool server's mailbox, so by the time it returns the lease is already cleared + state = :sys.get_state(pool_pid) + assert Enum.all?(state.channels, fn {_, ch} -> ch.open_streams == 0 end) + assert map_size(state.leases_by_monitor_ref) == 0 + + assert {:ok, _} = + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{name: "after"}) + end) + end + + test "pool replaces a crashed channel immediately to maintain pool_size" do + run_server(HelloServer, fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}", pool: %{size: 2}) + pool_ref = channel.pool + + [{pool_pid, _}] = + Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) + + %{channels: channels} = state = :sys.get_state(pool_pid) + old_references = Enum.map(channels, fn {id, _} -> id end) + + [{_, wrapped_ch} | _] = Map.to_list(state.channels) + conn_pid = wrapped_ch.channel.adapter_payload.conn_pid + + Process.exit(conn_pid, :kill) + + # :sys.get_state sits behind the :EXIT in the pool server's mailbox, so by the + # time it returns the crash has been handled and the replacement channel opened + spawn(fn -> + %{channels: channels} = :sys.get_state(pool_pid) + new_references = Enum.map(channels, fn {id, _} -> id end) + + assert Enum.count(old_references) == 2 + assert Enum.count(new_references) == 2 + + assert (new_references ++ old_references) |> Enum.uniq() |> Enum.count() == 3 + end) + + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{name: "after"}) + end) + end + + test "nil max_overflow allows unbounded overflow connections" do + run_server(SlowServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", + pool: %{size: 1, max_overflow: nil, max_streams: 1} + ) + + pool_ref = channel.pool + + [{pool_pid, _}] = + Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) + + client_pid = self() + + tasks = + for i <- 1..5 do + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary({:holding, i}), + client_name: "#{i}" + }) + end) + end + + for i <- 1..5, do: assert_receive({:holding, ^i}) + + state = :sys.get_state(pool_pid) + assert map_size(state.channels) == 5 + + for task <- tasks, do: assert({:ok, _} = Task.await(task, 2_000)) + end) + end + + test "overflow channel increases channel count in pool state" do + run_server(SlowServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", pool: %{size: 1, max_overflow: 1, max_streams: 1}) + + pool_ref = channel.pool + + [{pool_pid, _}] = + Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) + + client_pid = self() + + task1 = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary(:holding_1), + client_name: "1" + }) + end) + + assert_receive :holding_1 + + task2 = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary(:holding_2), + client_name: "2" + }) + end) + + assert_receive :holding_2 + + # :sys.get_state is synchronous — by the time it returns the overflow channel is open + state = :sys.get_state(pool_pid) + assert map_size(state.channels) == 2 + + assert {:ok, _} = Task.await(task1, 2_000) + assert {:ok, _} = Task.await(task2, 2_000) + end) + end + + test "resource_exhausted when pool is full and max_overflow is 0" do + run_server(SlowServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", pool: %{size: 1, max_overflow: 0, max_streams: 1}) + + client_pid = self() + + task = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary(:holding), + client_name: "1" + }) + end) + + assert_receive :holding + + assert {:error, %GRPC.RPCError{status: status}} = + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{ + name: "blocked" + }) + + assert status == GRPC.Status.resource_exhausted() + + Task.await(task, 2_000) + end) + end + + test "resource_exhausted when pool and all overflow channels are at max_streams" do + run_server(SlowServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", pool: %{size: 1, max_overflow: 1, max_streams: 1}) + + client_pid = self() + + task1 = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary(:holding_1), + client_name: "1" + }) + end) + + assert_receive :holding_1 + + task2 = + Task.async(fn -> + Helloworld.Greeter.Stub.loop_back(channel, %Helloworld.LoopBackRequest{ + client_pid: :erlang.term_to_binary(client_pid), + client_msg: :erlang.term_to_binary(:holding_2), + client_name: "2" + }) + end) + + assert_receive :holding_2 + + assert {:error, %GRPC.RPCError{status: status}} = + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{ + name: "blocked" + }) + + assert status == GRPC.Status.resource_exhausted() + + assert {:ok, _} = Task.await(task1, 2_000) + assert {:ok, _} = Task.await(task2, 2_000) + end) + end + + test "channel is returned to pool even when RPC returns an error" do + run_server(ErrorServer, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", pool: %{size: 1, max_streams: 1}) + + pool_ref = channel.pool + + [{pool_pid, _}] = + Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Server, pool_ref}) + + assert {:error, _} = + Helloworld.Greeter.Stub.say_hello(channel, %Helloworld.HelloRequest{name: "fail"}) + + # checkin is a cast — :sys.get_state sits behind it in the mailbox + state = :sys.get_state(pool_pid) + assert Enum.all?(state.channels, fn {_, ch} -> ch.open_streams == 0 end) + assert map_size(state.leases_by_monitor_ref) == 0 + end) + end + + test "disconnect stops the pool supervisor" do + run_server(HelloServer, fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + pool_ref = channel.pool + + [{sup_pid, _}] = + Registry.lookup(GRPC.Client.Pool.Registry, {GRPC.Client.Pool.Supervisor, pool_ref}) + + assert Process.alive?(sup_pid) + + {:ok, disconnected} = GRPC.Stub.disconnect(channel) + assert disconnected.pool == nil + + refute Process.alive?(sup_pid) + end) + end +end diff --git a/grpc/test/grpc/integration/server_test.exs b/grpc/test/grpc/integration/server_test.exs index 169af543..5fe37457 100644 --- a/grpc/test/grpc/integration/server_test.exs +++ b/grpc/test/grpc/integration/server_test.exs @@ -784,8 +784,6 @@ defmodule GRPC.Integration.ServerTest do stream: %GRPC.Server.Stream{} } = metadata - assert_received {:gun_down, _, _, _, _} - assert_received {^start_client_name, measurements, metadata} assert %{monotonic_time: _, system_time: _} = measurements @@ -899,8 +897,6 @@ defmodule GRPC.Integration.ServerTest do } } = metadata - assert_received {:gun_down, _, _, _, _} - refute_receive _ end end diff --git a/grpc/test/grpc/integration/stub_test.exs b/grpc/test/grpc/integration/stub_test.exs index 255a74ba..c3489d46 100644 --- a/grpc/test/grpc/integration/stub_test.exs +++ b/grpc/test/grpc/integration/stub_test.exs @@ -17,35 +17,13 @@ defmodule GRPC.Integration.StubTest do end end - def port_for(pid) do - Port.list() - |> Enum.find(fn port -> - case Port.info(port, :links) do - {:links, links} -> - pid in links - - _ -> - false - end - end) - end - test "you can disconnect stubs" do run_server(HelloServer, fn port -> {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") - Process.sleep(100) - - %{adapter_payload: %{conn_pid: gun_conn_pid}} = channel - - gun_port = port_for(gun_conn_pid) - # Using :erlang.monitor to be compatible with <= 1.5 - ref = :erlang.monitor(:port, gun_port) - - {:ok, channel} = GRPC.Stub.disconnect(channel) + assert is_reference(channel.pool) - assert %{adapter_payload: %{conn_pid: nil}} = channel - assert_receive {:DOWN, ^ref, :port, ^gun_port, _} - assert port_for(gun_conn_pid) == nil + {:ok, disconnected_channel} = GRPC.Stub.disconnect(channel) + assert %{pool: nil} = disconnected_channel end) end @@ -70,15 +48,15 @@ defmodule GRPC.Integration.StubTest do test "use a channel name to send a message" do run_server(HelloServer, fn port -> - {:ok, _channel} = - GRPC.Client.Connection.connect("localhost:#{port}", - interceptors: [GRPC.Client.Interceptors.Logger], - name: :my_channel - ) + assert {:ok, %GRPC.Channel{ref: :my_channel} = channel} = + GRPC.Client.Connection.connect("localhost:#{port}", + interceptors: [GRPC.Client.Interceptors.Logger], + name: :my_channel + ) name = "GRPC user!" req = %Helloworld.HelloRequest{name: name} - {:ok, reply} = %GRPC.Channel{ref: :my_channel} |> Helloworld.Greeter.Stub.say_hello(req) + {:ok, reply} = Helloworld.Greeter.Stub.say_hello(channel, req) assert reply.message == "Hello, #{name}" end) end diff --git a/grpc/test/support/proto/helloworld.pb.ex b/grpc/test/support/proto/helloworld.pb.ex index 0d6b7220..3c4a4c8c 100644 --- a/grpc/test/support/proto/helloworld.pb.ex +++ b/grpc/test/support/proto/helloworld.pb.ex @@ -13,6 +13,22 @@ defmodule Helloworld.HelloReply do field(:message, 1, type: :string) end +defmodule Helloworld.LoopBackRequest do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 + + field(:client_pid, 1, type: :bytes) + field(:client_msg, 2, type: :bytes) + field(:client_name, 3, type: :string) +end + +defmodule Helloworld.LoopBackReply do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 + + field(:message, 1, type: :string) +end + defmodule Helloworld.HeaderRequest do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 @@ -33,6 +49,8 @@ defmodule Helloworld.Greeter.Service do rpc(:SayHello, Helloworld.HelloRequest, Helloworld.HelloReply, %{}) rpc(:CheckHeaders, Helloworld.HeaderRequest, Helloworld.HeaderReply, %{}) + + rpc(:LoopBack, Helloworld.LoopBackRequest, Helloworld.LoopBackReply, %{}) end defmodule Helloworld.Greeter.Stub do diff --git a/grpc/test/support/proto/helloworld.proto b/grpc/test/support/proto/helloworld.proto index e0fac966..a9e4ea15 100644 --- a/grpc/test/support/proto/helloworld.proto +++ b/grpc/test/support/proto/helloworld.proto @@ -7,6 +7,7 @@ service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} rpc CheckHeaders (HeaderRequest) returns (HeaderReply) {} + rpc LoopBack (LoopBackRequest) returns (LoopBackReply) {} } // The request message containing the user's name. @@ -26,3 +27,13 @@ message HeaderRequest { message HeaderReply { string authorization = 1; } + +message LoopBackRequest { + bytes client_pid = 1; + bytes client_msg = 2; + string client_name = 3; +} + +message LoopBackReply { + string message = 1; +}