Skip to content

Commit 5bf516c

Browse files
committed
WiP
1 parent 6176240 commit 5bf516c

4 files changed

Lines changed: 23 additions & 38 deletions

File tree

kafka/include/userver/kafka/producer.hpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,22 @@ class Producer final {
104104
///
105105
/// @note Use SendException::IsRetryable method to understand whether there is
106106
/// a sense to retry the message sending.
107-
/// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
107+
///
108+
/// @code{.cpp}
109+
/// bool delivered{false};
110+
/// while (!delivered && !deadline.IsReached()) {
111+
/// try {
112+
/// producer.Send(topic, key, message);
113+
/// delivered = true;
114+
/// } catch (const kafka::SendException& e) {
115+
/// if (e.IsRetryable()) {
116+
/// engine::InterruptibleSleepFor(std::chrono::milliseconds{10});
117+
/// continue;
118+
/// }
119+
/// break;
120+
/// }
121+
/// }
122+
/// @endcode
108123
void Send(
109124
utils::zstring_view topic_name,
110125
std::string_view key,

kafka/src/kafka/impl/producer_impl.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ DeliveryResult ProducerImpl::Send(
140140
LOG(operation_log_level_) << fmt::format("Message to topic '{}' is requested to send", topic_name);
141141
auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
142142
auto delivery_result_future =
143-
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder), deadline, QueueFullHandlingPolicy::Throw);
143+
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder), deadline);
144144

145145
WaitUntilDeliveryReported(delivery_result_future);
146146

@@ -165,7 +165,7 @@ std::vector<DeliveryResult> ProducerImpl::Send(
165165
auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
166166
for (std::size_t i = 0; i < messages.Size(); ++i) {
167167
delivery_result_futures.emplace_back(
168-
ScheduleMessageDelivery(topic_name, key, messages[i], partition, std::move(headers_holders[i]), deadline, QueueFullHandlingPolicy::Retry)
168+
ScheduleMessageDelivery(topic_name, key, messages[i], partition, std::move(headers_holders[i]), deadline)
169169
);
170170
}
171171

@@ -187,8 +187,7 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
187187
std::string_view message,
188188
std::optional<std::uint32_t> partition,
189189
HeadersHolder headers_holder,
190-
engine::Deadline deadline,
191-
QueueFullHandlingPolicy queue_full_handling_policy
190+
engine::Deadline deadline
192191
) const {
193192
auto waiter = std::make_unique<DeliveryWaiter>();
194193
auto wait_handle = waiter->GetFuture();
@@ -244,7 +243,7 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
244243
if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
245244
[[maybe_unused]] const auto headers_holder_ptr = headers_holder.release();
246245
[[maybe_unused]] const auto waiter_ptr = waiter.release();
247-
} else if (queue_full_handling_policy == QueueFullHandlingPolicy::Retry && enqueue_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
246+
} else if (enqueue_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
248247
LOG_LIMITED_WARNING("Kafka local queue is full");
249248
/// waiting for a while for the queue to clear up
250249
engine::Yield();

kafka/src/kafka/impl/producer_impl.hpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,6 @@ class ProducerImpl final {
6363
void EventCallback();
6464

6565
private:
66-
enum class QueueFullHandlingPolicy {
67-
Throw,
68-
Retry,
69-
};
70-
7166
/// @brief Schedules the message delivery.
7267
/// @returns the future for delivery result, which must be awaited.
7368
[[nodiscard]] engine::Future<DeliveryResult> ScheduleMessageDelivery(
@@ -76,8 +71,7 @@ class ProducerImpl final {
7671
std::string_view message,
7772
std::optional<std::uint32_t> partition,
7873
HeadersHolder headers,
79-
engine::Deadline deadline,
80-
QueueFullHandlingPolicy queue_full_handling_policy
74+
engine::Deadline deadline
8175
) const;
8276

8377
/// @brief Poll a delivery or error event from producer's queue.

kafka/tests/producer_kafkatest.cpp

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -200,32 +200,9 @@ UTEST_F(ProducerTest, FullQueue) {
200200
.emplace_back(producer.SendAsync(topic, fmt::format("test-key-{}", send), fmt::format("test-msg-{}", send))
201201
);
202202
}
203-
auto make_send_request =
204-
[&producer,
205-
&topic,
206-
key = fmt::format("test-key-{}", kMaxQueueMessages),
207-
message = fmt::format("test-msg-{}", kMaxQueueMessages)] { producer.Send(topic, key, message); };
208-
209-
UEXPECT_THROW(make_send_request(), kafka::QueueFullException);
210-
211-
/// [Producer retryable error]
212-
bool delivered{false};
213-
const auto deadline = engine::Deadline::FromDuration(producer_configuration.delivery_timeout);
214-
while (!delivered && !deadline.IsReached()) {
215-
try {
216-
make_send_request();
217-
delivered = true;
218-
} catch (const kafka::SendException& e) {
219-
if (e.IsRetryable()) {
220-
engine::InterruptibleSleepFor(std::chrono::milliseconds{10});
221-
continue;
222-
}
223-
break;
224-
}
225-
}
226-
/// [Producer retryable error]
227203

228-
EXPECT_TRUE(delivered);
204+
UEXPECT_NO_THROW(producer.Send(
205+
topic, fmt::format("test-key-{}", kMaxQueueMessages), fmt::format("test-msg-{}", kMaxQueueMessages)));
229206
UEXPECT_NO_THROW(engine::WaitAllChecked(results));
230207
}
231208

0 commit comments

Comments
 (0)