From 189ed1b8ba15b3b2b361bf502773c5a6c58a61fd Mon Sep 17 00:00:00 2001 From: gelivisg Date: Thu, 11 Feb 2021 16:17:59 +0530 Subject: [PATCH 1/2] partitions listeners without pid --- lib/kaffe/consumer_group/subscriber/group_member.ex | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/kaffe/consumer_group/subscriber/group_member.ex b/lib/kaffe/consumer_group/subscriber/group_member.ex index 5487bb8..dfac3a4 100644 --- a/lib/kaffe/consumer_group/subscriber/group_member.ex +++ b/lib/kaffe/consumer_group/subscriber/group_member.ex @@ -111,14 +111,14 @@ defmodule Kaffe.GroupMember do # configuration. def handle_cast({:assignments_received, gen_id, assignments}, state) do Logger.info("event#assignments_received=#{name(state.subscriber_name, state.topic)} generation_id=#{gen_id}") - + partitions_listener().assigned(assignments) Process.send_after(self(), {:allocate_subscribers, gen_id, assignments}, rebalance_delay()) {:noreply, %{state | current_gen_id: gen_id}} end def handle_cast({:assignments_revoked}, state) do Logger.info("event#assignments_revoked=#{name(state.subscriber_name, state.topic)}") - + partitions_listener().revoked() stop_subscribers(state.subscribers) {:noreply, %{state | :subscribers => []}} end @@ -207,6 +207,10 @@ defmodule Kaffe.GroupMember do Application.get_env(:kaffe, :subscriber_mod, Kaffe.Subscriber) end + defp partitions_listener do + Application.get_env(:kaffe, :partitions_listener) + end + defp name(subscriber_name, topic) do :"#{__MODULE__}.#{subscriber_name}.#{topic}" end From db08f159eccad6c682f5b55e001f0d5aa9bb2f3d Mon Sep 17 00:00:00 2001 From: gelivisg Date: Thu, 11 Feb 2021 16:31:26 +0530 Subject: [PATCH 2/2] set listener in consumer config --- lib/kaffe/config/consumer.ex | 3 +++ lib/kaffe/consumer_group/subscriber/group_member.ex | 10 ++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index 781ff99..1c28dd3 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -10,6 +10,7 @@ defmodule Kaffe.Config.Consumer do group_config: consumer_group_config(), consumer_config: client_consumer_config(), message_handler: message_handler(), + partitions_listener: partitions_listener(), async_message_ack: async_message_ack(), rebalance_delay_ms: rebalance_delay_ms(), max_bytes: max_bytes(), @@ -31,6 +32,8 @@ defmodule Kaffe.Config.Consumer do def message_handler, do: config_get!(:message_handler) + def partitions_listener, do: config_get!(:partitions_listener) + def async_message_ack, do: config_get(:async_message_ack, false) def endpoints do diff --git a/lib/kaffe/consumer_group/subscriber/group_member.ex b/lib/kaffe/consumer_group/subscriber/group_member.ex index dfac3a4..cac405c 100644 --- a/lib/kaffe/consumer_group/subscriber/group_member.ex +++ b/lib/kaffe/consumer_group/subscriber/group_member.ex @@ -36,7 +36,8 @@ defmodule Kaffe.GroupMember do worker_manager_pid: nil, topic: nil, configured_offset: nil, - current_gen_id: nil + current_gen_id: nil, + partitions_listener: nil end ## ========================================================================== @@ -102,7 +103,8 @@ defmodule Kaffe.GroupMember do group_coordinator_pid: pid, consumer_group: consumer_group, worker_manager_pid: worker_manager_pid, - topic: topic + topic: topic, + partitions_listener: Kaffe.Config.Consumer.configuration().partitions_listener }} end @@ -111,14 +113,14 @@ defmodule Kaffe.GroupMember do # configuration. def handle_cast({:assignments_received, gen_id, assignments}, state) do Logger.info("event#assignments_received=#{name(state.subscriber_name, state.topic)} generation_id=#{gen_id}") - partitions_listener().assigned(assignments) + state.partitions_listener.assigned(assignments) Process.send_after(self(), {:allocate_subscribers, gen_id, assignments}, rebalance_delay()) {:noreply, %{state | current_gen_id: gen_id}} end def handle_cast({:assignments_revoked}, state) do Logger.info("event#assignments_revoked=#{name(state.subscriber_name, state.topic)}") - partitions_listener().revoked() + state.partitions_listener.revoked() stop_subscribers(state.subscribers) {:noreply, %{state | :subscribers => []}} end