From ffaf0c8f583dd867fc7b3f7e7211f05c816e4a1c Mon Sep 17 00:00:00 2001 From: Damien Krotkine Date: Fri, 19 Jan 2018 15:08:28 +0100 Subject: [PATCH] implement unsubscribing from a topic. And getting current topic --- .../group_member/manager/group_manager.ex | 34 +++++++++++++++++++ .../manager/group_member_supervisor.ex | 7 ++++ 2 files changed, 41 insertions(+) diff --git a/lib/kaffe/group_member/manager/group_manager.ex b/lib/kaffe/group_member/manager/group_manager.ex index b34fd13..daa823a 100644 --- a/lib/kaffe/group_member/manager/group_manager.ex +++ b/lib/kaffe/group_member/manager/group_manager.ex @@ -25,6 +25,21 @@ defmodule Kaffe.GroupManager do GenServer.start_link(__MODULE__, [self()], name: name()) end + # to manually subscribe to new topics (not from config), at any point in time + def subscribe_topics(topics) do + GenServer.call(name(), {:subscribe_topics, topics}) + end + + # to manually unsubscribe from existing topics (not from config), at any point in time + def unsubscribe_topics(topics) do + GenServer.call(name(), {:unsubscribe_topics, topics}) + end + + # returns the current subscribed topics + def get_topics() do + GenServer.call(name(), {:get_topics}) + end + ## Callbacks def init([supervisor_pid]) do @@ -64,6 +79,25 @@ defmodule Kaffe.GroupManager do {:noreply, state} end + def handle_call({:unsubscribe_topics, old_topics}, _from, %State{topics: topics} = state) do + unsubscribed_topics = Enum.flat_map(old_topics, fn(old_topic) -> + if ! Enum.member?(topics, old_topic) do + Logger.info "Not unsubscribing from #{old_topic}, already unsubscribed" + [] + else + :ok = GroupMemberSupervisor.stop_group_member(state.supervisor_pid, state.subscriber_name, old_topic) + Logger.debug "Unsubscribed from topic: #{old_topic}" + [old_topic] + end + end) + {:reply, {:ok, unsubscribed_topics}, %State{state|topics: topics -- unsubscribed_topics } } + end + + + def handle_call({:get_topics}, _from, %State{topics: topics} = state) do + {:reply, {:ok, topics}, state} + end + defp kafka do Application.get_env(:kaffe, :kafka_mod, :brod) end diff --git a/lib/kaffe/group_member/manager/group_member_supervisor.ex b/lib/kaffe/group_member/manager/group_member_supervisor.ex index c37b7d3..faef56b 100644 --- a/lib/kaffe/group_member/manager/group_member_supervisor.ex +++ b/lib/kaffe/group_member/manager/group_member_supervisor.ex @@ -22,6 +22,13 @@ defmodule Kaffe.GroupMemberSupervisor do id: :"group_member_#{subscriber_name}_#{topic}")) end + def stop_group_member(supervisor_pid, subscriber_name, topic) do + case Supervisor.terminate_child(supervisor_pid, "group_member_#{subscriber_name}_#{topic}") do + :ok -> :ok + {:error, :not_found} -> :ok + end + end + def init(:ok) do Logger.info "event#starting=#{__MODULE__}"