Conversation
|
@FlorentinDUBOIS Tagging you for visibility. I know this is a lot so no rush here! |
|
Bumping and tagging some additional folks - @freeznet @BewareMyPower |
|
I'm going to review this PR soon. |
|
Thanks you @rkrishn7, I will take a look at this pull request and test it in production next week. Sorry for the delay for my answer |
| current_backoff = std::cmp::min( | ||
| Self::OP_MAX_BACKOFF * 2u32.saturating_pow(current_retries), | ||
| Self::OP_MAX_BACKOFF, |
There was a problem hiding this comment.
Why use cmp::min here? OP_MAX_BACKOFF should always be the smaller one. It looks like you should use OP_MIN_BACKOFF * 2u32.saturating_pow(current_retries).
BTW, could you reuse the operation_retry_parameters in Pulsar::new instead of hard coding backoff parameters? And it would be better to abstract a Backoff class like Java so that we can reuse the logic in connect_inner rather than rewriting the same logic again
There was a problem hiding this comment.
Why use
cmp::minhere?OP_MAX_BACKOFFshould always be the smaller one. It looks like you should useOP_MIN_BACKOFF * 2u32.saturating_pow(current_retries).
Thanks for catching that! Updated to use OP_MIN_BACKOFF there.
BTW, could you reuse the
operation_retry_parametersinPulsar::newinstead of hard coding backoff parameters?
The same operation_retry_parameters as declared in Pulsar::new should already be getting used here because these operations rely on ConnectionSender::send_message .
The retry logic here is solely for TransactionCoordinatorNotFound errors. I think it's possible to make the backoff parameters here configurable, but they should be different from operation_retry_parameters.
And it would be better to abstract a
Backoffclass like Java so that we can reuse the logic inconnect_innerrather than rewriting the same logic again
Agreed, but I think maybe we can make another issue for this? May not be a great idea to increase the scope of this PR since it's already quite large.
I don't think so.
Sorry I don't get it. It only registers the partition with the transaction id via the |
👍🏾
Sorry! To clarify, I think there's a potential footgun lurking here. For example: Let's say we've enabled transactions and start producing transactional messages with a batching producer. If we haven't hit the batching threshold before the transaction's timeout, then all those buffered messages will fail to be produced. This is subtle, but definitely seems undesirable. It seems like either transactional messages should bypass batching or we should warn the user if both transactions and batching are enabled. |
|
Hi @BewareMyPower @FlorentinDUBOIS Just bumping this PR! Would love to get this in soon 😄 |
|
I am on holiday until the 10 of February, I will be able to test after. |
|
Bumping again @FlorentinDUBOIS @BewareMyPower 😄 |
|
Hey :) @BewareMyPower @rkrishn7 |

This PR adds client support for Pulsar Transactions.
Resolves #253
Notes:
Open Questions:
TransactionAPI uses internal synchronization to provide a "no mut" public interface. Are there any alternative approaches that may be better?