From 466e7a23a51c31da417b95b161bfc87f8cf5f89a Mon Sep 17 00:00:00 2001 From: Damien Krotkine Date: Tue, 16 Jan 2018 12:31:00 +0100 Subject: [PATCH] support for dynamically subscribing to new topics after start --- .../group_member/manager/group_manager.ex | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/lib/kaffe/group_member/manager/group_manager.ex b/lib/kaffe/group_member/manager/group_manager.ex index b34fd13..c3f4682 100644 --- a/lib/kaffe/group_member/manager/group_manager.ex +++ b/lib/kaffe/group_member/manager/group_manager.ex @@ -18,13 +18,19 @@ defmodule Kaffe.GroupManager do subscriber_name: nil, consumer_group: nil, topics: nil, - offset: nil + offset: nil, + worker_manager_pid: nil end def start_link() do GenServer.start_link(__MODULE__, [self()], name: name()) end + # to manually subscribe to new topics (not from config), at any point in time + def subscribe_topics(topics) do + GenServer.call(name(), {:subscribe_topics, topics}) + end + ## Callbacks def init([supervisor_pid]) do @@ -61,7 +67,26 @@ defmodule Kaffe.GroupManager do topic) end) - {:noreply, state} + {:noreply, %State{state|worker_manager_pid: worker_manager_pid} } + end + + def handle_call({:subscribe_topics, new_topics}, _from, %State{topics: topics} = state) do + subscribed_topics = Enum.flat_map(new_topics, fn(new_topic) -> + if Enum.member?(topics, new_topic) do + Logger.info "Not subscribing to #{new_topic}, already subscribed" + [] + else + Logger.debug "Starting group member for topic: #{new_topic}" + {:ok, _pid} = GroupMemberSupervisor.start_group_member( + state.supervisor_pid, + state.subscriber_name, + state.consumer_group, + state.worker_manager_pid, + new_topic) + [new_topic] + end + end) + {:reply, {:ok, subscribed_topics}, %State{state|topics: subscribed_topics ++ topics} } end defp kafka do