diff --git a/lib/kaffe/consumer_group/group_manager.ex b/lib/kaffe/consumer_group/group_manager.ex index d1a7c51..dac0db6 100644 --- a/lib/kaffe/consumer_group/group_manager.ex +++ b/lib/kaffe/consumer_group/group_manager.ex @@ -43,6 +43,13 @@ defmodule Kaffe.GroupManager do GenServer.call(name(), {:subscribe_to_topics, topics}) end + @doc """ + Dynamically unsubscribe topics. + """ + def unsubscribe_from_topics(topics) do + GenServer.call(name(), {:unsubscribe_from_topics, topics}) + end + @doc """ List of currently subscribed topics. """ @@ -57,6 +64,7 @@ defmodule Kaffe.GroupManager do Logger.info("event#startup=#{__MODULE__} name=#{name()}") config = Kaffe.Config.Consumer.configuration() + case kafka().start_client(config.endpoints, config.subscriber_name, config.consumer_config) do :ok -> :ok @@ -85,7 +93,9 @@ defmodule Kaffe.GroupManager do def handle_cast({:start_group_members}, state) do Logger.debug("Starting worker supervisors for group manager: #{inspect(self())}") - {:ok, worker_supervisor_pid} = group_member_supervisor().start_worker_supervisor(state.supervisor_pid, state.subscriber_name) + {:ok, worker_supervisor_pid} = + group_member_supervisor().start_worker_supervisor(state.supervisor_pid, state.subscriber_name) + {:ok, worker_manager_pid} = worker_supervisor().start_worker_manager(worker_supervisor_pid, state.subscriber_name) state = %State{state | worker_manager_pid: worker_manager_pid} @@ -106,6 +116,20 @@ defmodule Kaffe.GroupManager do {:reply, {:ok, new_topics}, %State{state | topics: state.topics ++ new_topics}} end + @doc """ + Unsubscribe from the given set of topics. + """ + def handle_call({:unsubscribe_from_topics, requested_topics}, _from, %State{topics: topics} = state) do + old_topics = + requested_topics + |> Enum.into(MapSet.new()) + |> MapSet.intersection(Enum.into(topics, MapSet.new())) + |> MapSet.to_list() + + unsubscribe_from_topics(state, old_topics) + {:reply, {:ok, old_topics}, %State{state | topics: topics -- old_topics}} + end + @doc """ List the currently subscribed topics """ @@ -133,6 +157,21 @@ defmodule Kaffe.GroupManager do ) end + defp unsubscribe_from_topics(state, topics) do + for topic <- topics do + Logger.debug("Stopping group member for topic: #{topic}") + :ok = unsubscribe_from_topic(state, topic) + end + end + + defp unsubscribe_from_topic(state, topic) do + group_member_supervisor().stop_group_member( + state.supervisor_pid, + state.subscriber_name, + topic + ) + end + defp kafka do Application.get_env(:kaffe, :kafka_mod, :brod) end diff --git a/lib/kaffe/consumer_group/group_member_supervisor.ex b/lib/kaffe/consumer_group/group_member_supervisor.ex index c68aaa0..fff3735 100644 --- a/lib/kaffe/consumer_group/group_member_supervisor.ex +++ b/lib/kaffe/consumer_group/group_member_supervisor.ex @@ -52,6 +52,18 @@ defmodule Kaffe.GroupMemberSupervisor do ) end + def stop_group_member( + supervisor_pid, + subscriber_name, + topic + ) do + group_member_id = :"group_member_#{subscriber_name}_#{topic}" + :ok = Kaffe.GroupMember.stop_subscribers(subscriber_name, topic) + :ok = Supervisor.terminate_child(supervisor_pid, group_member_id) + :ok = Supervisor.delete_child(supervisor_pid, group_member_id) + :ok + end + def init(:ok) do Logger.info("event#starting=#{__MODULE__}") diff --git a/lib/kaffe/consumer_group/subscriber/group_member.ex b/lib/kaffe/consumer_group/subscriber/group_member.ex index 5487bb8..608de5e 100644 --- a/lib/kaffe/consumer_group/subscriber/group_member.ex +++ b/lib/kaffe/consumer_group/subscriber/group_member.ex @@ -73,6 +73,16 @@ defmodule Kaffe.GroupMember do GenServer.cast(pid, {:assignments_revoked}) end + def stop_subscribers(subscriber_name, topic) do + case Process.whereis(name(subscriber_name, topic)) do + nil -> + {:error, :not_found} + + pid when is_pid(pid) -> + GenServer.call(pid, :stop_subscribers) + end + end + ## ========================================================================== ## Callbacks ## ========================================================================== @@ -89,12 +99,10 @@ defmodule Kaffe.GroupMember do self() ) - Logger.info( - "event#init=#{__MODULE__} + Logger.info("event#init=#{__MODULE__} group_coordinator=#{inspect(pid)} subscriber_name=#{subscriber_name} - consumer_group=#{consumer_group}" - ) + consumer_group=#{consumer_group}") {:ok, %State{ @@ -124,7 +132,8 @@ defmodule Kaffe.GroupMember do end # If we're not at the latest generation, discard the assignment for whatever is next. - def handle_info({:allocate_subscribers, gen_id, _assignments}, %{current_gen_id: current_gen_id} = state) when gen_id < current_gen_id do + def handle_info({:allocate_subscribers, gen_id, _assignments}, %{current_gen_id: current_gen_id} = state) + when gen_id < current_gen_id do Logger.debug("Discarding old generation #{gen_id} for current generation: #{current_gen_id}") {:noreply, state} end @@ -163,6 +172,11 @@ defmodule Kaffe.GroupMember do {:noreply, %{state | :subscribers => subscribers}} end + def handle_call(:stop_subscribers, _from, state) do + stop_subscribers(state.subscribers) + {:reply, :ok, %{state | subscribers: []}} + end + ## ========================================================================== ## Helpers ## ========================================================================== @@ -175,6 +189,7 @@ defmodule Kaffe.GroupMember do defp compute_offset(:undefined, configured_offset) do [begin_offset: configured_offset] end + defp compute_offset(offset, _configured_offset) do [begin_offset: offset] end diff --git a/lib/kaffe/consumer_group/subscriber/subscriber.ex b/lib/kaffe/consumer_group/subscriber/subscriber.ex index c2b85c9..6e5bd96 100644 --- a/lib/kaffe/consumer_group/subscriber/subscriber.ex +++ b/lib/kaffe/consumer_group/subscriber/subscriber.ex @@ -69,6 +69,10 @@ defmodule Kaffe.Subscriber do GenServer.stop(subscriber_pid) end + def status(subscriber_pid) do + GenServer.call(subscriber_pid, :status) + end + def commit_offsets(subscriber_pid, topic, partition, generation_id, offset) do GenServer.cast(subscriber_pid, {:commit_offsets, topic, partition, generation_id, offset}) end @@ -154,7 +158,11 @@ defmodule Kaffe.Subscriber do end def handle_cast({:commit_offsets, topic, partition, generation_id, offset}, state) do - Logger.debug("event#commit_offsets topic=#{state.topic} partition=#{state.partition} offset=#{offset} generation=#{generation_id}") + Logger.debug( + "event#commit_offsets topic=#{state.topic} partition=#{state.partition} offset=#{offset} generation=#{ + generation_id + }" + ) # Is this the ack we're looking for? ^topic = state.topic @@ -182,6 +190,10 @@ defmodule Kaffe.Subscriber do {:noreply, state} end + def handle_call(:status, _, state) do + {:reply, Map.take(state, [:subscriber_name, :topic, :partition]), state} + end + defp handle_subscribe({:ok, subscriber_pid}, state) do Logger.debug("Subscribe success: #{inspect(subscriber_pid)}") Process.monitor(subscriber_pid) diff --git a/lib/kaffe/consumer_group/worker/worker_manager.ex b/lib/kaffe/consumer_group/worker/worker_manager.ex index d1aaaa5..5aaec6c 100644 --- a/lib/kaffe/consumer_group/worker/worker_manager.ex +++ b/lib/kaffe/consumer_group/worker/worker_manager.ex @@ -56,6 +56,16 @@ defmodule Kaffe.WorkerManager do {:reply, worker_pid, state} end + def handle_call({:stop_worker_for, topic, partition}, _from, state) do + Logger.debug("Stopping worker: #{topic} / #{partition}") + + reply = + worker_name(topic, partition) + |> stop_worker(state) + + {:reply, reply, state} + end + ## ========================================================================== ## Helpers ## ========================================================================== @@ -81,11 +91,27 @@ defmodule Kaffe.WorkerManager do |> capture_worker(worker_name, state) end + defp stop_worker(worker_name, state) do + Logger.debug("Stopping worker: #{worker_name}") + + worker_supervisor().stop_worker( + state.supervisor_pid, + state.subscriber_name, + worker_name + ) + |> forget_worker(worker_name, state) + end + defp capture_worker({:ok, pid}, worker_name, %{worker_table: worker_table}) do true = :ets.insert(worker_table, {worker_name, pid}) pid end + defp forget_worker(:ok, worker_name, %{worker_table: worker_table}) do + true = :ets.delete(worker_table, worker_name) + :ok + end + def worker_name(topic, partition) do case worker_allocation_strategy() do :worker_per_partition -> :"worker_#{partition}" diff --git a/lib/kaffe/consumer_group/worker/worker_supervisor.ex b/lib/kaffe/consumer_group/worker/worker_supervisor.ex index a19bddb..ad89ae3 100644 --- a/lib/kaffe/consumer_group/worker/worker_supervisor.ex +++ b/lib/kaffe/consumer_group/worker/worker_supervisor.ex @@ -25,6 +25,13 @@ defmodule Kaffe.WorkerSupervisor do ) end + def stop_worker(pid, subscriber_name, worker_name) do + Logger.debug("Stopping worker: #{worker_name}") + worker_id = :"worker_#{subscriber_name}_#{worker_name}" + :ok = Supervisor.terminate_child(pid, worker_id) + :ok = Supervisor.delete_child(pid, worker_id) + end + def init(subscriber_name) do Logger.info("event#startup=#{__MODULE__} subscriber_name=#{subscriber_name}")