Skip to content

Commit 79d1485

Browse files
committed
feat kafka: add bulk send producer method
Добавлен метод `Producer::Send` для синхронной отправки пачки сообщений в указанный топик Kafka. В случае ошибки отправки выкидывается исключение `BulkSendException`, которое содержит подробности об ошибках отправки сообщений из пачки. `Producer::Send` пытается обработать ошибки переполнения локальной очереди librdkafka в пределах настроенного `delivery.timeout.ms`. В отличие от старых методов `Send` и `SendAsync` метод порождает только одну корутину на всю пачку сообщений.
1 parent 7729826 commit 79d1485

7 files changed

Lines changed: 333 additions & 39 deletions

File tree

kafka/include/userver/kafka/exceptions.hpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22

33
#include <cstdint>
4+
#include <exception>
5+
#include <map>
46
#include <stdexcept>
57
#include <string_view>
68

@@ -27,6 +29,29 @@ class SendException : public std::runtime_error {
2729
const bool is_retryable_{false};
2830
};
2931

32+
/// @brief Base exception thrown by Producer::Send in bulk mode
33+
/// in case of one or more send errors.
34+
class BulkSendException : public std::runtime_error {
35+
static constexpr const char* kWhat{"Some messages was not delivered."};
36+
37+
public:
38+
using ExceptionMap = std::map<std::size_t, std::exception_ptr>;
39+
40+
BulkSendException(ExceptionMap nested_exceptions);
41+
42+
/// @return nested errors.
43+
/// Nested exceptions are subclasses of SendException.
44+
const ExceptionMap& GetExceptions() const noexcept;
45+
46+
private:
47+
/// @brief A mapping from the message's index in the bulk send operation
48+
/// to the exception that occurred during its delivering.
49+
/// @details Key: 0-based index of the element in the input batch.
50+
/// Value: Pointer to the exception.
51+
/// @note Contains only indices that resulted in an error.
52+
const ExceptionMap nested_exceptions_;
53+
};
54+
3055
class DeliveryTimeoutException final : public SendException {
3156
static constexpr const char* kWhat{
3257
"Message is not delivered after `delivery_timeout` milliseconds. Hint: "

kafka/include/userver/kafka/producer.hpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <userver/kafka/exceptions.hpp>
88
#include <userver/kafka/headers.hpp>
99
#include <userver/utils/fast_pimpl.hpp>
10+
#include <userver/utils/span.hpp>
1011
#include <userver/utils/statistics/writer.hpp>
1112

1213
USERVER_NAMESPACE_BEGIN
@@ -110,6 +111,36 @@ class Producer final {
110111
HeaderViews headers = {}
111112
) const;
112113

114+
/// @brief Sends given messages to topic `topic_name` by given `key`
115+
/// and `partition` (if passed) with payload contains the `messages`
116+
/// data. Asynchronously waits until the messages are delivered or the delivery
117+
/// error occurred.
118+
///
119+
/// No payload data is copied. Method holds the data until messages are
120+
/// delivered.
121+
///
122+
/// Thread-safe and can be called from any number of threads
123+
/// concurrently.
124+
///
125+
/// If `partition` not passed, partition is chosen by internal
126+
/// Kafka partitioner.
127+
///
128+
/// @warning if `enable_idempotence` option is enabled, do not use both
129+
/// explicit partitions and Kafka-chosen ones.
130+
///
131+
/// @throws BulkSendException if some messages was not delivered
132+
/// and acked by Kafka Broker in configured timeout.
133+
///
134+
/// @note Use BulkSendException::GetExceptions method to get a list
135+
/// of occured nested exceptions.
136+
void Send(
137+
utils::zstring_view topic_name,
138+
std::string_view key,
139+
utils::span<const std::string> messages,
140+
std::optional<std::uint32_t> partition = kUnassignedPartition,
141+
HeaderViews headers = {}
142+
) const;
143+
113144
/// @brief Same as Producer::Send, but returns the task which can be
114145
/// used to wait the message delivery manually.
115146
///
@@ -141,6 +172,14 @@ class Producer final {
141172
impl::HeadersHolder&& headers_holder
142173
) const;
143174

175+
void SendImpl(
176+
utils::zstring_view topic_name,
177+
std::string_view key,
178+
utils::span<const std::string> messages,
179+
std::optional<std::uint32_t> partition,
180+
std::vector<impl::HeadersHolder>&& headers_holders
181+
) const;
182+
144183
const std::string name_;
145184
engine::TaskProcessor& producer_task_processor_;
146185

kafka/src/kafka/exceptions.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ SendException::SendException(const char* what, bool is_retryable)
1313
is_retryable_(is_retryable)
1414
{}
1515

16+
BulkSendException::BulkSendException(BulkSendException::ExceptionMap nested_exceptions)
17+
: std::runtime_error(kWhat),
18+
nested_exceptions_(std::move(nested_exceptions))
19+
{}
20+
21+
const BulkSendException::ExceptionMap& BulkSendException::GetExceptions() const noexcept { return nested_exceptions_; }
22+
1623
DeliveryTimeoutException::DeliveryTimeoutException()
1724
: SendException(kWhat, /*is_retryable=*/true)
1825
{}

kafka/src/kafka/impl/producer_impl.cpp

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <userver/engine/wait_any.hpp>
99
#include <userver/kafka/impl/configuration.hpp>
1010
#include <userver/tracing/span.hpp>
11+
#include <userver/utils/span.hpp>
1112
#include <userver/utils/trivial_map.hpp>
1213

1314
#include <kafka/impl/log_level.hpp>
@@ -138,20 +139,56 @@ DeliveryResult ProducerImpl::Send(
138139
HeadersHolder headers_holder
139140
) const {
140141
LOG(operation_log_level_) << fmt::format("Message to topic '{}' is requested to send", topic_name);
142+
auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
141143
auto delivery_result_future =
142-
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder));
144+
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder), deadline);
143145

144146
WaitUntilDeliveryReported(delivery_result_future);
145147

146148
return delivery_result_future.get();
147149
}
148150

151+
std::vector<DeliveryResult> ProducerImpl::Send(
152+
utils::zstring_view topic_name,
153+
std::string_view key,
154+
utils::span<const std::string> messages,
155+
std::optional<std::uint32_t> partition,
156+
std::vector<HeadersHolder> headers_holders
157+
) const {
158+
UASSERT(messages.size() == headers_holders.size());
159+
160+
LOG(operation_log_level_) <<
161+
fmt::format("Messages {} to topic '{}' are requested to send", messages.size(), topic_name);
162+
163+
std::vector<engine::Future<DeliveryResult>> delivery_result_futures;
164+
delivery_result_futures.reserve(messages.size());
165+
166+
auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
167+
for (std::size_t i = 0; i < messages.size(); ++i) {
168+
delivery_result_futures.emplace_back(
169+
ScheduleMessageDelivery(topic_name, key, messages[i], partition, std::move(headers_holders[i]), deadline)
170+
);
171+
}
172+
173+
std::vector<DeliveryResult> delivery_results;
174+
delivery_results.reserve(messages.size());
175+
176+
for (auto& delivery_result_future : delivery_result_futures) {
177+
WaitUntilDeliveryReported(delivery_result_future);
178+
179+
delivery_results.emplace_back(delivery_result_future.get());
180+
}
181+
182+
return delivery_results;
183+
}
184+
149185
engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
150186
utils::zstring_view topic_name,
151187
std::string_view key,
152188
std::string_view message,
153189
std::optional<std::uint32_t> partition,
154-
HeadersHolder headers_holder
190+
HeadersHolder headers_holder,
191+
engine::Deadline deadline
155192
) const {
156193
auto waiter = std::make_unique<DeliveryWaiter>();
157194
auto wait_handle = waiter->GetFuture();
@@ -181,6 +218,7 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
181218
///
182219
/// Headers holder **must** be released if `rd_kafka_producev` succeeded.
183220

221+
while (!deadline.IsReached() && !engine::current_task::ShouldCancel()) {
184222
#ifdef __clang__
185223
#pragma clang diagnostic push
186224
#pragma clang diagnostic ignored "-Wgnu-statement-expression"
@@ -203,12 +241,19 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
203241
#pragma clang diagnostic pop
204242
#endif
205243

206-
if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
207-
[[maybe_unused]] const auto headers_holder_ptr = headers_holder.release();
208-
[[maybe_unused]] const auto waiter_ptr = waiter.release();
209-
} else {
210-
LOG_WARNING("Failed to enqueue message to Kafka local queue: {}", rd_kafka_err2str(enqueue_error));
211-
waiter->SetDeliveryResult(DeliveryResult{enqueue_error});
244+
if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
245+
[[maybe_unused]] const auto headers_holder_ptr = headers_holder.release();
246+
[[maybe_unused]] const auto waiter_ptr = waiter.release();
247+
} else if (enqueue_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
248+
LOG_LIMITED_WARNING("Kafka local queue is full");
249+
/// waiting for a while for the queue to clear up
250+
engine::InterruptibleSleepFor(std::chrono::milliseconds{10});
251+
continue;
252+
} else {
253+
LOG_WARNING("Failed to enqueue message to Kafka local queue: {}", rd_kafka_err2str(enqueue_error));
254+
waiter->SetDeliveryResult(DeliveryResult{enqueue_error});
255+
}
256+
break;
212257
}
213258

214259
return wait_handle;

kafka/src/kafka/impl/producer_impl.hpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66

77
#include <librdkafka/rdkafka.h>
88

9+
#include <userver/engine/deadline.hpp>
910
#include <userver/kafka/impl/stats.hpp>
1011
#include <userver/logging/level.hpp>
1112
#include <userver/utils/periodic_task.hpp>
13+
#include <userver/utils/span.hpp>
1214

1315
#include <kafka/impl/concurrent_event_waiter.hpp>
1416
#include <kafka/impl/delivery_waiter.hpp>
@@ -40,6 +42,16 @@ class ProducerImpl final {
4042
HeadersHolder headers
4143
) const;
4244

45+
/// @brief Send messages and waits for its delivery.
46+
/// While waiting handles other messages delivery reports, errors and logs.
47+
[[nodiscard]] std::vector<DeliveryResult> Send(
48+
utils::zstring_view topic_name,
49+
std::string_view key,
50+
utils::span<const std::string> messages,
51+
std::optional<std::uint32_t> partition,
52+
std::vector<HeadersHolder> headers
53+
) const;
54+
4355
/// @brief Waits until scheduled messages are delivered for
4456
/// at most 2 x `delivery_timeout`.
4557
///
@@ -58,7 +70,8 @@ class ProducerImpl final {
5870
std::string_view key,
5971
std::string_view message,
6072
std::optional<std::uint32_t> partition,
61-
HeadersHolder headers
73+
HeadersHolder headers,
74+
engine::Deadline deadline
6275
) const;
6376

6477
/// @brief Poll a delivery or error event from producer's queue.

kafka/src/kafka/producer.cpp

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <userver/testsuite/testpoint.hpp>
88
#include <userver/tracing/span.hpp>
99
#include <userver/utils/async.hpp>
10+
#include <userver/utils/span.hpp>
1011
#include <userver/utils/text_light.hpp>
1112

1213
#include <kafka/impl/producer_impl.hpp>
@@ -54,26 +55,52 @@ void SendToTestPoint(
5455
}());
5556
}
5657

57-
[[noreturn]] void ThrowSendError(const impl::DeliveryResult& delivery_result) {
58+
std::exception_ptr BuildSendError(const impl::DeliveryResult& delivery_result) {
5859
const auto error = delivery_result.GetMessageError();
5960
UASSERT(error != RD_KAFKA_RESP_ERR_NO_ERROR);
6061

6162
switch (error) {
6263
case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT:
63-
throw DeliveryTimeoutException{};
64+
return std::make_exception_ptr(DeliveryTimeoutException{});
6465
case RD_KAFKA_RESP_ERR__QUEUE_FULL:
65-
throw QueueFullException{};
66+
return std::make_exception_ptr(QueueFullException{});
6667
case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
67-
throw MessageTooLargeException{};
68+
return std::make_exception_ptr(MessageTooLargeException{});
6869
case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
69-
throw kafka::UnknownTopicException{};
70+
return std::make_exception_ptr(UnknownTopicException{});
7071
case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION:
71-
throw kafka::UnknownPartitionException{};
72+
return std::make_exception_ptr(UnknownPartitionException{});
7273
default:
73-
throw kafka::SendException{rd_kafka_err2str(error)};
74+
return std::make_exception_ptr(SendException{rd_kafka_err2str(error)});
75+
}
76+
}
77+
78+
[[noreturn]] void ThrowSendError(const impl::DeliveryResult& delivery_result) {
79+
std::rethrow_exception(BuildSendError(delivery_result));
80+
}
81+
82+
auto BuildOwningHeaders(const impl::HeadersHolder& headers) {
83+
auto reader = HeadersReader{headers.GetHandle()};
84+
return std::vector<OwningHeader>{reader.begin(), reader.end()};
85+
}
86+
87+
auto BuildHeaderHolders(HeaderViews headers, std::size_t count) {
88+
std::vector<impl::HeadersHolder> holders;
89+
holders.reserve(count);
90+
for (std::size_t i = 0; i < count; ++i) holders.emplace_back(impl::HeadersHolder{headers});
91+
return holders;
92+
}
93+
94+
void HandleDeliveryErrors(const std::vector<impl::DeliveryResult>& delivery_results) {
95+
std::map<std::size_t, std::exception_ptr> exceptions;
96+
for (std::size_t i = 0; i < delivery_results.size(); ++i) {
97+
const auto& delivery_result = delivery_results[i];
98+
if (!delivery_result.IsSuccess()) {
99+
exceptions[i] = BuildSendError(delivery_result);
100+
}
74101
}
75102

76-
UASSERT(false);
103+
if (!exceptions.empty()) throw BulkSendException{std::move(exceptions)};
77104
}
78105

79106
} // namespace
@@ -111,6 +138,22 @@ void Producer::Send(
111138
}).Get();
112139
}
113140

141+
void Producer::Send(
142+
utils::zstring_view topic_name,
143+
std::string_view key,
144+
utils::span<const std::string> messages,
145+
std::optional<std::uint32_t> partition,
146+
HeaderViews headers
147+
) const {
148+
utils::Async(
149+
producer_task_processor_,
150+
"producer_send_bulk",
151+
[this, topic_name, key, messages, partition, &headers] {
152+
SendImpl(topic_name, key, messages, partition, BuildHeaderHolders(headers, messages.size()));
153+
}
154+
).Get();
155+
}
156+
114157
engine::TaskWithResult<void> Producer::SendAsync(
115158
std::string topic_name,
116159
std::string key,
@@ -148,8 +191,7 @@ void Producer::SendImpl(
148191

149192
std::vector<OwningHeader> headers_copy;
150193
if (testsuite::AreTestpointsAvailable()) {
151-
auto reader = HeadersReader{headers_holder.GetHandle()};
152-
headers_copy = std::vector<OwningHeader>{reader.begin(), reader.end()};
194+
headers_copy = BuildOwningHeaders(headers_holder);
153195
}
154196
if (testsuite::AreTestpointsAvailable()) {
155197
SendToTestPoint(name_, topic_name, key, message, partition, headers_copy, "::started");
@@ -166,6 +208,40 @@ void Producer::SendImpl(
166208
}
167209
}
168210

211+
void Producer::SendImpl(
212+
utils::zstring_view topic_name,
213+
std::string_view key,
214+
utils::span<const std::string> messages,
215+
std::optional<std::uint32_t> partition,
216+
std::vector<impl::HeadersHolder>&& headers_holders
217+
) const {
218+
tracing::Span::CurrentSpan().AddTag("kafka_producer", name_);
219+
tracing::Span::CurrentSpan().AddTag("kafka_send_key", std::string{key});
220+
221+
std::vector<std::vector<OwningHeader>> headers_copies;
222+
if (testsuite::AreTestpointsAvailable()) {
223+
for (std::size_t i = 0; i < messages.size(); ++i) {
224+
headers_copies.emplace_back(BuildOwningHeaders(headers_holders[i]));
225+
}
226+
}
227+
if (testsuite::AreTestpointsAvailable()) {
228+
for (std::size_t i = 0; i < messages.size(); ++i) {
229+
SendToTestPoint(name_, topic_name, key, messages[i], partition, headers_copies[i], "::started");
230+
}
231+
}
232+
233+
const std::vector<impl::DeliveryResult> delivery_results =
234+
producer_->Send(topic_name, key, messages, partition, std::move(headers_holders));
235+
236+
HandleDeliveryErrors(delivery_results);
237+
238+
if (testsuite::AreTestpointsAvailable()) {
239+
for (std::size_t i = 0; i < messages.size(); ++i) {
240+
SendToTestPoint(name_, topic_name, key, messages[i], partition, headers_copies[i]);
241+
}
242+
}
243+
}
244+
169245
} // namespace kafka
170246

171247
USERVER_NAMESPACE_END

0 commit comments

Comments
 (0)