Rework Elsa.Util.get_partition_count to optionally retry on failure#11
Rework Elsa.Util.get_partition_count to optionally retry on failure#11nathanmonteleone merged 4 commits intomainfrom
Conversation
…nUse get_partition_count retries in the producer and consumer initializers to get around race conditions with topic creation.
| @@ -1,4 +1,6 @@ | |||
| defmodule Elsa.Producer.Initializer do | |||
| alias Elsa.RetryConfig | |||
There was a problem hiding this comment.
This alias is only used once?
| @type message :: {iodata(), iodata()} | binary() | %{key: iodata(), value: iodata()} | ||
|
|
||
| alias Elsa.ElsaRegistry | ||
| alias Elsa.RetryConfig |
| # Use the non-connection based partition_count. | ||
| # This circumvents a behavior in brod that caches topics as non-existent, | ||
| # which would break our ability to retry. | ||
| {:ok, endpoints} = Elsa.Util.get_endpoints(brod_client) |
There was a problem hiding this comment.
I think along with what Liru said, the Elsa.Util is used twice here, and isn't aliased (unlike the other modules where we do alias it)
There was a problem hiding this comment.
Yeah I agree on both points. Makes me wonder if there's a credo flag to actually check this, that I'm missing...
There was a problem hiding this comment.
turns out there is and it's violated all over the place. I'll fix these by hand for now and make a separate PR for the rest.
| @spec no_retry() :: t() | ||
| def no_retry do | ||
| %__MODULE__{ | ||
| tries: 1, |
There was a problem hiding this comment.
I like the config key 'tries'.
it's funny to realize how often I've made a retry module and had 'retries' as an argument to something, and then debated what it meant, when 'tries' was right there the whole time 😂
| # This circumvents a behavior in brod that caches topics as non-existent, | ||
| # which would break our ability to retry. | ||
| {:ok, endpoints} = Elsa.Util.get_endpoints(brod_client) | ||
| {:ok, partitions} = Elsa.Util.partition_count(endpoints, topic, retry_config) |
There was a problem hiding this comment.
We are essentially just calling the ! version of each of these functions because we are pattern matching with {:ok, ...}. We should either use the ! version, or we should do a with or some other construct to handle errors. As-is, if we have a failure, it's going to complain about a pattern match, and that makes it difficult to identify the actual underlying error.
There was a problem hiding this comment.
(If even the retry fails, that means you're trying to create a producer or consumer for a topic that won't return metadata on the partition count -- probably because it doesn't exist. So we're dead in the water at that point, there's not much additional error handling we could do.)
…s where we use them more than once. Also replaced pattern match error checks with the ! version of get_partition_count.
https://simplifi.atlassian.net/browse/INT-11108
The basic problem is that creating topics in Kafka is asychronous, so unit tests that create topics tend to fail if we try to produce to them immediately afterwards. It turns out part of the reason this is so fatal, is that brod likes to cache topic non-existence once you've had a failed query.
This PR adds intentional retries when we're trying to pull the partition count from topic metadata. It also uses non-client based brod calls to make these queries (i.e. you just give it endpoints instead of a brod_client), to get around that caching behavior.