Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kinesis_client/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ defmodule KinesisClient.Stream do
|> optional_kw(:app_state_opts, fetch_value_for_key!(opts, :app_state_opts))
|> optional_kw(:lease_renew_interval, Keyword.get(opts, :lease_renew_interval))
|> optional_kw(:lease_expiry, Keyword.get(opts, :lease_expiry))
|> optional_kw(:lease_renewal_limit, Keyword.get(opts, :lease_renewal_limit))
|> optional_kw(:spread_lease, Keyword.get(opts, :spread_lease))
|> optional_kw(:poll_interval, Keyword.get(opts, :poll_interval))
|> optional_kw(:rebalance_interval, Keyword.get(opts, :rebalance_interval))
|> optional_kw(:shard_iterator_type, Keyword.get(opts, :shard_iterator_type))
|> optional_kw(:timestamp, Keyword.get(opts, :timestamp))

Expand Down
18 changes: 18 additions & 0 deletions lib/kinesis_client/stream/app_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ defmodule KinesisClient.Stream.AppState do
def close_shard(app_name, stream_name, shard_id, lease_owner, opts \\ []),
do: adapter(opts).close_shard(app_name, stream_name, shard_id, lease_owner, opts)

@doc """
Get all leases for a given app and stream.
"""
def all_incomplete_leases(app_name, stream_name, opts \\ []),
do: adapter(opts).all_incomplete_leases(app_name, stream_name, opts)

@doc """
Get total lease counts per owner for load balancing.
"""
def total_incomplete_lease_counts_by_worker(app_name, stream_name, opts \\ []),
do: adapter(opts).total_incomplete_lease_counts_by_worker(app_name, stream_name, opts)

@doc """
Get lease owner that has the most leases.
"""
def lease_owner_with_most_leases(app_name, stream_name, opts \\ []),
do: adapter(opts).lease_owner_with_most_leases(app_name, stream_name, opts)

defp adapter(opts) do
case Keyword.get(opts, :adapter) do
:ecto -> KinesisClient.Stream.AppState.Ecto
Expand Down
23 changes: 22 additions & 1 deletion lib/kinesis_client/stream/app_state/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule KinesisClient.Stream.AppState.Adapter do
lease_owner :: String.t(),
opts :: keyword
) ::
[] | [ShardLease.t()]
list(ShardLease.t())

@callback create_lease(
app_name :: String.t(),
Expand Down Expand Up @@ -70,4 +70,25 @@ defmodule KinesisClient.Stream.AppState.Adapter do
opts :: keyword
) ::
:ok | {:error, any}

@callback all_incomplete_leases(
app_name :: String.t(),
stream_name :: String.t(),
opts :: keyword
) ::
list(ShardLease.t())

@callback total_incomplete_lease_counts_by_worker(
app_name :: String.t(),
stream_name :: String.t(),
opts :: keyword
) ::
list({worker :: String.t(), count :: integer})

@callback lease_owner_with_most_leases(
app_name :: String.t(),
stream_name :: String.t(),
opts :: keyword
) ::
list(ShardLease.t())
end
27 changes: 27 additions & 0 deletions lib/kinesis_client/stream/app_state/dynamo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,33 @@ defmodule KinesisClient.Stream.AppState.Dynamo do
end
end

@impl true
def all_incomplete_leases(_app_name, _stream_name, _opts) do
Logger.error(
"all_incomplete_leases/3 is not currently implemented for DynamoDB. Please implement the callback if you want to use DynamoDB."
)

[]
end

@impl true
def lease_owner_with_most_leases(_app_name, _stream_name, _opts) do
Logger.error(
"lease_owner_with_most_leases/3 is not currently implemented for DynamoDB. Please implement the callback if you want to use DynamoDB."
)

[]
end

@impl true
def total_incomplete_lease_counts_by_worker(_app_name, _stream_name, _opts) do
Logger.error(
"total_incomplete_lease_counts_by_worker/3 is not currently implemented for DynamoDB. Please implement the callback if you want to use DynamoDB."
)

[]
end

defp decode_item(item) do
Dynamo.decode_item(item, as: KinesisClient.Stream.AppState.ShardLease)
end
Expand Down
55 changes: 52 additions & 3 deletions lib/kinesis_client/stream/app_state/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ defmodule KinesisClient.Stream.AppState.Ecto do
{UpdateShardLeasePrimaryKey.version(), UpdateShardLeasePrimaryKey}
]

require Logger

@impl true
def initialize(app_name, opts) do
with {:ok, repo} <- get_repo(opts),
Expand Down Expand Up @@ -95,7 +97,12 @@ defmodule KinesisClient.Stream.AppState.Ecto do
{:ok, _} <- ShardLeases.update_shard_lease(shard_lease, repo, lease_count: updated_count) do
{:ok, updated_count}
else
{:error, _} -> {:error, :lease_renew_failed}
{:error, error} ->
Logger.error(
"KinesisClient: Error trying to renew lease for #{shard_id}: #{inspect(error)}"
)

{:error, :lease_renew_failed}
end
end

Expand All @@ -121,7 +128,10 @@ defmodule KinesisClient.Stream.AppState.Ecto do
) do
{:ok, updated_count}
else
{:error, _} -> {:error, :lease_take_failed}
{:error, error} ->
Logger.error("KinesisClient: Error trying to take lease for #{shard_id}: #{inspect(error)}")

{:error, :lease_take_failed}
end
end

Expand All @@ -140,7 +150,12 @@ defmodule KinesisClient.Stream.AppState.Ecto do
{:ok, _} <- ShardLeases.update_shard_lease(shard_lease, repo, checkpoint: checkpoint) do
:ok
else
{:error, _} -> {:error, :update_checkpoint_failed}
{:error, error} ->
Logger.error(
"KinesisClient: Error trying to update checkpoint for #{shard_id}: #{inspect(error)}"
)

{:error, :update_checkpoint_failed}
end
end

Expand All @@ -163,6 +178,40 @@ defmodule KinesisClient.Stream.AppState.Ecto do
end
end

@impl true
def all_incomplete_leases(app_name, stream_name, opts) do
repo = Keyword.get(opts, :repo)

%{
app_name: app_name,
stream_name: stream_name,
completed: false
}
|> ShardLeases.get_shard_leases(repo)
end

@impl true
def lease_owner_with_most_leases(app_name, stream_name, opts) do
repo = Keyword.get(opts, :repo)

app_name
|> ShardLeases.get_owner_with_most_leases(stream_name, repo)
|> case do
nil ->
[]

worker ->
get_leases_by_worker(app_name, stream_name, worker, opts)
end
end

@impl true
def total_incomplete_lease_counts_by_worker(app_name, stream_name, opts) do
repo = Keyword.get(opts, :repo)

ShardLeases.incomplete_group_by_owner(app_name, stream_name, repo)
end

def create_lease(attrs, opts) when is_map(attrs) do
repo = Keyword.get(opts, :repo)

Expand Down
6 changes: 6 additions & 0 deletions lib/kinesis_client/stream/app_state/ecto/shard_lease.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do
import Ecto.Changeset
import Ecto.Query

@type t :: %__MODULE__{}

@fields [:shard_id, :app_name, :stream_name, :lease_owner, :lease_count, :checkpoint, :completed]

@primary_key false
Expand Down Expand Up @@ -59,4 +61,8 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do
defp query_by({:lease_count, lease_count}, query) do
where(query, [sl], sl.lease_count == ^lease_count)
end

defp query_by({:completed, completed}, query) do
where(query, [sl], sl.completed == ^completed)
end
end
29 changes: 29 additions & 0 deletions lib/kinesis_client/stream/app_state/ecto/shard_leases.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeases do
end
end

@spec incomplete_group_by_owner(String.t(), String.t(), Ecto.Repo.t()) ::
[{lease_owner :: String.t(), count :: integer}]
def incomplete_group_by_owner(app_name, stream_name, repo) do
from(sl in ShardLeaseEcto,
where: sl.app_name == ^app_name and sl.stream_name == ^stream_name and not sl.completed,
group_by: sl.lease_owner,
select: {sl.lease_owner, count(sl.shard_id)}
)
|> repo.all()
end

@spec get_owner_with_most_leases(String.t(), String.t(), Ecto.Repo.t()) ::
owner :: String.t() | nil
def get_owner_with_most_leases(app_name, stream_name, repo) do
owner_counts = incomplete_group_by_owner(app_name, stream_name, repo)

case owner_counts do
[] ->
nil

counts ->
max_count = counts |> Enum.map(fn {_owner, count} -> count end) |> Enum.max()

{owner, _count} = Enum.find(counts, fn {_owner, count} -> count == max_count end)

owner
end
end

defp build_where_clause(query, shard_lease) do
query
|> where(
Expand Down
24 changes: 24 additions & 0 deletions lib/kinesis_client/stream/app_state/mimic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,30 @@ defmodule KinesisClient.Stream.AppState.Mimic do
from.close_shard(app_name, stream_name, shard_id, lease_owner, opts)
end

@impl true
def all_incomplete_leases(app_name, stream_name, opts) do
{from, to} = modules(opts)

to.all_incomplete_leases(app_name, stream_name, opts)
from.all_incomplete_leases(app_name, stream_name, opts)
end

@impl true
def total_incomplete_lease_counts_by_worker(app_name, stream_name, opts) do
{from, to} = modules(opts)

to.total_incomplete_lease_counts_by_worker(app_name, stream_name, opts)
from.total_incomplete_lease_counts_by_worker(app_name, stream_name, opts)
end

@impl true
def lease_owner_with_most_leases(app_name, stream_name, opts) do
{from, to} = modules(opts)

to.lease_owner_with_most_leases(app_name, stream_name, opts)
from.lease_owner_with_most_leases(app_name, stream_name, opts)
end

defp modules(opts) do
migration = Keyword.get(opts, :migration)

Expand Down
5 changes: 3 additions & 2 deletions lib/kinesis_client/stream/shard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ defmodule KinesisClient.Stream.Shard do

import KinesisClient.Util

alias KinesisClient.Stream.Shard.Lease
# alias KinesisClient.Stream.Shard.Lease
alias KinesisClient.Stream.Shard.LeaseV2, as: Lease
alias KinesisClient.Stream.Shard.Pipeline

def start_link(args) do
Expand All @@ -22,8 +23,8 @@ defmodule KinesisClient.Stream.Shard do
|> optional_kw(:app_state_opts, Keyword.get(opts, :app_state_opts))
|> optional_kw(:renew_interval, Keyword.get(opts, :lease_renew_interval))
|> optional_kw(:lease_expiry, Keyword.get(opts, :lease_expiry))
|> optional_kw(:rebalance_interval, Keyword.get(opts, :rebalance_interval))
|> optional_kw(:pipeline, Keyword.get(opts, :pipeline))
|> optional_kw(:lease_renewal_limit, Keyword.get(opts, :lease_renewal_limit))
|> optional_kw(:spread_lease, Keyword.get(opts, :spread_lease))

pipeline_opts =
Expand Down
Loading