You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
RFC: Message Processing and Delivery Acknowledgement
Abstract
Many stream processing systems and even telemetry data processors like vector utilize message delivery acknowledgement to facilitate better guarantees in regards to message processing and delivery. In particular, when a distributed stream processing system introduces asynchronous techniques such as buffering and batching, it becomes critical to employ such mechanisms to ensure all messages received are processed and delivered successfully to some external destination, without loss. Further, many distributed logging or message broker platforms like Kafka or AWS’s SQS place responsibility on the consumer of messages to acknowledge “receipt” of said messages, in order to provide the at-most-once or at-least-once delivery guarantees they advertise.
Recently Rotel introduced support for a Kafka receiver. In its current state the receiver utilizes librdkafka’s underlying “auto-commit” facilities to commit the offset of consumed messages back to Kafka, thus acknowledging receipt. There are several problems with this approach, with the most important being; if the rotel process were terminated after an offset is committed but before it is flushed by an exporter, the message will be lost. Such a scenario results in at-most-once guarantees, however many users are rightly unsatisfied with this weak guarantee. Therefore, Rotel SHOULD include new mechanisms to track the end-to-end processing and delivery of messages. Such a capability allows exporters to signal back to receivers acknowledgement of message delivery success or failure. In this way, a receiver such as the Kafka receiver, may now only acknowledge receipt of messages after successful processing and delivery, thus ensuring at-least-once guarantees.
This RFC will explore requirements for end-to-end message acknowledgement in Rotel and various considerations towards implementation and the trade-offs between various approaches. We will discuss the Kafka receiver as it is an important user of this new functionality, however we intend to design acknowledgement in such a manner that it may be reused more broadly for a wide combination of receivers, processors, and exporters in the future.
Requirements
Receivers which may benefit from end-to-end message acknowledgement SHOULD be allowed to opt-in or out of acknowledgement in a configurable manner.
Receivers which today do not require acknowledgement SHOULD be able to remain opted out, but not excluded from it in the future, if a benefit should arise.
Receivers which utilize message acknowledgement MUST be allowed to publish messages on OTLPOutput channels indicating these messages must be acknowledged
Message acknowledgement MUST support multiple-exporters or “fan-out” delivery, such that final acknowledgement is not complete until ALL exporters acknowledge delivery.
Message acknowledgement SHOULD strive to avoid adding significant overhead to the system both in terms of memory usage or increased CPU usage associated with allocation and deallocation.
Processors and Exporters which may participate in message acknowledged SHOULD be mostly unaware of any specific semantics associated with a particular receiver.
Goals
Define a well-designed and reusable mechanism for end-to-end message acknowledgement with low overhead that slots easily into the existing system.
Utilize message acknowledgement to implement pending offset tracking and maximum offset eligible for committing to guarantee at-least-once processing and delivery.
Non-Goals
Completely redesigning batching and pipelining.
Detailed design of Kafka Offset tracking and committing at the receiver.
Design
Message passing via flume Senders and Receivers will be the primary mechanism for acknowledgement. Receivers which require acknowledgement will create new mpsc pairs of BoundedSenders and BoundedReceivers. The BoundedReceiver will receive acknowledgements at the receiver and the BoundedSenders will be cloned and passed along with payloads so exporters may signal back to receivers.
Payload Changes
In order to facilitate our design goals we will need to modify our primary message model. Today we rely heavily on Vec where T is either ResourceSpans, ResourceMetrics, or ResourceLogs. The following is a sketch of the proposed model changes, replacing Vec with a Vec<Message>.
#[derive(Clone)]pubstructMessage<T>{pubmetadata:Option<MessageMetadata>,pubpayload:Vec<T>,}impl<T>Message<T>{// Used in testing#[allow(dead_code)]pub(crate)fnlen(&self) -> usize{self.payload.len()}}#[derive(Clone)]pubenumMessageMetadata{Kafka(KafkaMetadata),}#[derive(Clone)]pubstructKafkaMetadata{puboffset:i64,pubpartition:i32,pubtopic_id:u8,puback_chan:Option<BoundedSender<KafkaAcknowledgement>>,}pubenumKafkaAcknowledgement{Ack(KafkaAck),Nack(KafkaNack),}pubstructKafkaAck{puboffset:i64,pubpartition:i32,pubtopic_id:u8,}pubstructKafkaNack{puboffset:i64,pubpartition:i32,pubtopic_id:u8,pubreason:ExporterError,}
Details
Message
Separate metadata and payload members. Metadata is optional
MessageMetadata
Accommodate metadata from various source types via an enum. Initially the only required type is Kafka.
KafkaMetadata
begin_offset: offset associated with first item in OTLPPayload Vec.
index_to_offset: an optional ordered map of index to kafka offset only to be used when there are gaps in the offset range.
ack_chan: An optional BoundedSender for acknowledgement. More information is needed to determine whether we’d want a BoundedSender or Unbounded. Ideally we would avoid creating unique oneshot channels here, however if we chose a oneshot approach we need to investigate an allocation free oneshot channel. BoundedSender can be cloned without an allocation.
KafkaSuccess
offset_range: tuple of beginning offset and length to commit up to
offsets: only to be used when acknowledging offsets with gaps
Batch Splitting Changes
Currently the pipeline relies on the BatchSplittable interface for splitting messages when offering messages to the NestedBatch would result in an overflow. BatchSplittable is implemented for the types ResourceSpans, ResourceMetrics, and ResourceLogs. However, splitting a message into multiple batches for messages received from Kafka will require additional care. Specifically, splitting the KafkaMetadata into two separate KafkaMetadata objects associated with separate sets of offsets.
(pseudo code)
impl<T:BatchSplittable + BatchSizer>BatchSplittableforMessage<T>{fnsplit(&mutself,split_n:usize) -> SelfwhereSelf:Sized,{match&self.metadata{None => {// Normal splitting logic }Some(m) => {match m {MessageMetadata::Kafka(km) => {// Handle splitting payload and updating offsets // Happy path, no offset gaps. let res = self.payload[0].split(split_n);let size = res.size_of();// Construct new message with new offset data after splitting Message{metadata:Some(MessageMetadata::Kafka(KafkaMetadata{begin_offset: km.begin_offset + size asu64,index_to_offset:None,ack_chan: km.ack_chan.clone(),})),payload: res,}}}}}}}
Splitting considerations for Kafka
Care must be taken to ensure that whatever data was found for a given offset is not “split” to satisfy requirements of the batcher. When splitting data to fill a batch, all data at a given offset must remain intact within a single batch. It is better to publish a batch with less than the max batch size, rather than to split data from a given offset. To illustrate let us consider we read a Vec of length 100 at offset N from Kafka and our current Nested batch wants another 20 ResourceSpans to be marked full. We must not harvest 20 ResourceSpans from data found at offset 100 and consider this offset eligible for acknowledgement.
We may consider an optimization here where we could allow for some harvesting, but not include that offset in the range of offsets to commit and rather commit it in the following batch. However this would require a modification to the KafkaMetadata struct to include an additional field called end_offset, as we would no longer be able to rely on length to find the highest offset.
NestedBatch and Exporter Changes
NestedBatch currently works with a Vec<T> where T is ResourceSpans, ResourceMetrics, or ResourceLogs. NestedBatch will require a change to work with a Vec<Message<T>> and will require an additional layer of depth when calculating with BatchSizer.
Using a Vec<Message<T>> alleviates the need for an additional end_offset field as described above, and utilizing a Vec<Message<T>> properly allows for scenarios of multiple receivers of a given type to work with the same pipeline. Consider the scenario where Rotel is running an OTLP receiver and a Kafka Receiver and both are receiving ResourceSpans and working with a single OTLP exporter. They both need access to the pipeline and their data may be efficiently batched and delivered by the OTLP exporter. However we need to be able to calculate what offsets to acknowledge and we cannot purely rely on a single starting offset and total size of the batch for this.
The take_batch function on the NestedBatch returns a Vec<T>, which is sent to exporters via a BoundedSender<Vec<T>>. However, exporters will need access to the Vec<Message<T>> in order to acknowledge messages. Therefore we will also modify take_batch to return a Vec<Message<T>>.
In order to make things easy for the exporters we should also define a trait for a Vec<Message<T>> which is called flatten_payload -> Vec<T>. This trait will flatten all Message payloads into a Vec<T> for exporting. The exporter can then iterate through the Vec<Message<T>> and acknowledge after delivery is attempted.
Finally, we will need a way to associate any MessageMetadata with an encoded request and response as encoding and exporting is decoupled into separate tasks in the exporter. Ideally we should be able to pass this data along in the EncodedRequest.
#[derive(Clone)]pubstructEncodedRequest{pubmetadata:Option<Vec<MessageMetadata>>,pubrequest:Request<Full<Bytes>>,pubsize:usize,}/// Encodes a message into a full HTTP request. /// /// # Arguments /// * `message` - The message to encode /// /// # Returns /// * `Result<Request<Full<Bytes>>, ExporterError>` pubfnencode(&self,metadata:Option<Vec<MessageMetadata>>,message:T,size:usize,) -> Result<EncodedRequest,ExporterError>{let res = self.new_request(message);match res {Ok(request) => Ok(EncodedRequest{
metadata,
request,
size,}),Err(e) => {self.send_failed.add(size asu64,&[KeyValue::new("error","request.encode")]);Err(ExporterError::Generic(format!("error encoding request: {}",
e
)))}}}
And finally modify client.rs and response.rs move the data from the EncodedRequest into the appropriate response type. At this point the exporter can check whether the response was successful or an error and perform the appropriate acknowledgement action.
Acknowledgement
Acknowledgement should be encapsulated with the Message type ensuring specific receiver logic doesn’t leak into exporters. Consider the following example.
pubtraitAck{fnack(&mutself);fnnack(&mutself,Optional<NackReason>);}impl<T>AckforMessage<T>{fnack(&mutself){match&self.metadata{None => {}// no-op Some(m) => {match m {MessageMetadata::Kafka(km) => {//Check if index_to_offset is none, happy path ifletSome(ack_chan) = &km.ack_chan{ifletNone = &km.index_to_offset{let begin = km.begin_offset;let payload_len = self.payload.len()asu64;
ack_chan
.send(Ok(KafkaSuccess{offset_range:(begin, begin + payload_len),offsets:None,})).await;return;}}}}}}}fnnack(&mutself){todo!()}}
It is also expected we will likely create a new FuturesUnordered at each exporter to allow for separate tokio tasks to handle sending acknowledgements.
Additional Concerns and Out of Scope Items
Bad Data and Making Progress
When acknowledgement drives the logic around whether receipt of a message should be acknowledged at a receiver, it is critical to include configuration options that allow the user to tune this behavior. To illustrate, consider the case with a Kafka receiver where we read a message but the data is corrupted. If the exporter fails to export the message after some number of retries and is only capable of sending a nack, the Kafka receiver will never progress the committed offset past the bad message. On restart, Rotel would then begin consuming from a potentially very old offset (the bad message) and begin a massive replay of old messages. Potentially worse, if the log has already been compacted or expired and auto.reset is set to read from end, we could fail to process many messages and the only way to recover from this would be for the user to manually delete the committed offset and configure auto.reset to read from beginning. Configuration options should be available to allow the user to decide whether they want to skip past an offset after some number of retries or alternatively panic and exit if the offset committer cannot make progress. The reasons for export failure can be signaled back to the source via the enum NackReason.
Offset Tracking and Committing at the Kafka Receiver
The Kafka receiver and new subsystems at the receiver will be responsible for receiving acknowledgments and internally managing the state of offsets to commit.. This will likely be done with a min priority heap, which will allow us to push and pop offsets off the heap and peak at the lowest offset still pending.
Reference Counting for Multiple Exporters
In this design we have not opted for an atomic counter which could be shared across multiple message payloads for reference counting acknowledgements. This is primarily because we don’t know how common this use case will be and it will add additional overhead to single exporter configurations. It is therefore left to the receiver to determine and handle reference counting for acknowledgements. We could consider the alternative and include the option for reference counting of acks in messages with an Optional<Arc> ref_count and if present, decrement on each acknowledgement, and only send the final true acknowledgement when ref_count == 0.
It is likely not much additional work to add this, but it is also not clear yet whether receivers may choose to employ a single reader/writer task which may also handle processing acknowledgement much like exporters process encoding, exporting, and responses.. As it is the responsibility of the receiver to track any required state, such as in Kafka offset tracking and committing the last lowest acknowledged offset, it may be beneficial to avoid adding reference counting in the messages themselves. If the receiver chooses to implement a single task for reading, sending messages and receiving acknowledgements we could avoid the use of mutexes.
Out of Order Acknowledgement
It is expected that acknowledgements may arrive back at the receiver out-of-order as many exporters and data types, such as the OTLPExporter and ResourceSpans and ResourceMetrics do not enforce strict ordering guarantees on delivery. It is therefore the responsibility of the receiver to handle out-of-order acknowledgements and to not rely on ordering.
Partial Delivery and Partial Acknowledgement
Rotel exporters currently have no notion of partial delivery and or partial retry. Therefore at this time we do not intend to support partial acknowledgements. To illustrate a scenario, consider a Kafka receiver connected to an OTLPExporter. OTLP itself does support partial success and does report counts on rejection, but for acknowledgement we will not attempt to send partial acknowledgements.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
RFC: Message Processing and Delivery Acknowledgement
Abstract
Many stream processing systems and even telemetry data processors like vector utilize message delivery acknowledgement to facilitate better guarantees in regards to message processing and delivery. In particular, when a distributed stream processing system introduces asynchronous techniques such as buffering and batching, it becomes critical to employ such mechanisms to ensure all messages received are processed and delivered successfully to some external destination, without loss. Further, many distributed logging or message broker platforms like Kafka or AWS’s SQS place responsibility on the consumer of messages to acknowledge “receipt” of said messages, in order to provide the at-most-once or at-least-once delivery guarantees they advertise.
Recently Rotel introduced support for a Kafka receiver. In its current state the receiver utilizes librdkafka’s underlying “auto-commit” facilities to commit the offset of consumed messages back to Kafka, thus acknowledging receipt. There are several problems with this approach, with the most important being; if the rotel process were terminated after an offset is committed but before it is flushed by an exporter, the message will be lost. Such a scenario results in at-most-once guarantees, however many users are rightly unsatisfied with this weak guarantee. Therefore, Rotel SHOULD include new mechanisms to track the end-to-end processing and delivery of messages. Such a capability allows exporters to signal back to receivers acknowledgement of message delivery success or failure. In this way, a receiver such as the Kafka receiver, may now only acknowledge receipt of messages after successful processing and delivery, thus ensuring at-least-once guarantees.
This RFC will explore requirements for end-to-end message acknowledgement in Rotel and various considerations towards implementation and the trade-offs between various approaches. We will discuss the Kafka receiver as it is an important user of this new functionality, however we intend to design acknowledgement in such a manner that it may be reused more broadly for a wide combination of receivers, processors, and exporters in the future.
Requirements
Goals
Non-Goals
Design
Message passing via flume Senders and Receivers will be the primary mechanism for acknowledgement. Receivers which require acknowledgement will create new mpsc pairs of BoundedSenders and BoundedReceivers. The BoundedReceiver will receive acknowledgements at the receiver and the BoundedSenders will be cloned and passed along with payloads so exporters may signal back to receivers.
Payload Changes
In order to facilitate our design goals we will need to modify our primary message model. Today we rely heavily on Vec where T is either ResourceSpans, ResourceMetrics, or ResourceLogs. The following is a sketch of the proposed model changes, replacing Vec with a Vec<Message>.
Details
Batch Splitting Changes
Currently the pipeline relies on the BatchSplittable interface for splitting messages when offering messages to the NestedBatch would result in an overflow. BatchSplittable is implemented for the types ResourceSpans, ResourceMetrics, and ResourceLogs. However, splitting a message into multiple batches for messages received from Kafka will require additional care. Specifically, splitting the KafkaMetadata into two separate KafkaMetadata objects associated with separate sets of offsets.
(pseudo code)
Splitting considerations for Kafka
Care must be taken to ensure that whatever data was found for a given offset is not “split” to satisfy requirements of the batcher. When splitting data to fill a batch, all data at a given offset must remain intact within a single batch. It is better to publish a batch with less than the max batch size, rather than to split data from a given offset. To illustrate let us consider we read a Vec of length 100 at offset N from Kafka and our current Nested batch wants another 20 ResourceSpans to be marked full. We must not harvest 20 ResourceSpans from data found at offset 100 and consider this offset eligible for acknowledgement.
We may consider an optimization here where we could allow for some harvesting, but not include that offset in the range of offsets to commit and rather commit it in the following batch. However this would require a modification to the KafkaMetadata struct to include an additional field called end_offset, as we would no longer be able to rely on length to find the highest offset.
NestedBatch and Exporter Changes
NestedBatch currently works with a
Vec<T>where T is ResourceSpans, ResourceMetrics, or ResourceLogs. NestedBatch will require a change to work with aVec<Message<T>>and will require an additional layer of depth when calculating with BatchSizer.Using a
Vec<Message<T>>alleviates the need for an additional end_offset field as described above, and utilizing aVec<Message<T>>properly allows for scenarios of multiple receivers of a given type to work with the same pipeline. Consider the scenario where Rotel is running an OTLP receiver and a Kafka Receiver and both are receiving ResourceSpans and working with a single OTLP exporter. They both need access to the pipeline and their data may be efficiently batched and delivered by the OTLP exporter. However we need to be able to calculate what offsets to acknowledge and we cannot purely rely on a single starting offset and total size of the batch for this.The take_batch function on the NestedBatch returns a
Vec<T>, which is sent to exporters via aBoundedSender<Vec<T>>. However, exporters will need access to theVec<Message<T>>in order to acknowledge messages. Therefore we will also modify take_batch to return aVec<Message<T>>.In order to make things easy for the exporters we should also define a trait for a
Vec<Message<T>>which is called flatten_payload ->Vec<T>. This trait will flatten all Message payloads into aVec<T>for exporting. The exporter can then iterate through theVec<Message<T>>and acknowledge after delivery is attempted.Finally, we will need a way to associate any MessageMetadata with an encoded request and response as encoding and exporting is decoupled into separate tasks in the exporter. Ideally we should be able to pass this data along in the EncodedRequest.
And finally modify client.rs and response.rs move the data from the EncodedRequest into the appropriate response type. At this point the exporter can check whether the response was successful or an error and perform the appropriate acknowledgement action.
Acknowledgement
Acknowledgement should be encapsulated with the Message type ensuring specific receiver logic doesn’t leak into exporters. Consider the following example.
It is also expected we will likely create a new FuturesUnordered at each exporter to allow for separate tokio tasks to handle sending acknowledgements.
Additional Concerns and Out of Scope Items
Bad Data and Making Progress
When acknowledgement drives the logic around whether receipt of a message should be acknowledged at a receiver, it is critical to include configuration options that allow the user to tune this behavior. To illustrate, consider the case with a Kafka receiver where we read a message but the data is corrupted. If the exporter fails to export the message after some number of retries and is only capable of sending a nack, the Kafka receiver will never progress the committed offset past the bad message. On restart, Rotel would then begin consuming from a potentially very old offset (the bad message) and begin a massive replay of old messages. Potentially worse, if the log has already been compacted or expired and auto.reset is set to read from end, we could fail to process many messages and the only way to recover from this would be for the user to manually delete the committed offset and configure auto.reset to read from beginning. Configuration options should be available to allow the user to decide whether they want to skip past an offset after some number of retries or alternatively panic and exit if the offset committer cannot make progress. The reasons for export failure can be signaled back to the source via the enum
NackReason.Offset Tracking and Committing at the Kafka Receiver
The Kafka receiver and new subsystems at the receiver will be responsible for receiving acknowledgments and internally managing the state of offsets to commit.. This will likely be done with a min priority heap, which will allow us to push and pop offsets off the heap and peak at the lowest offset still pending.
Reference Counting for Multiple Exporters
In this design we have not opted for an atomic counter which could be shared across multiple message payloads for reference counting acknowledgements. This is primarily because we don’t know how common this use case will be and it will add additional overhead to single exporter configurations. It is therefore left to the receiver to determine and handle reference counting for acknowledgements. We could consider the alternative and include the option for reference counting of acks in messages with an Optional<Arc> ref_count and if present, decrement on each acknowledgement, and only send the final true acknowledgement when ref_count == 0.
It is likely not much additional work to add this, but it is also not clear yet whether receivers may choose to employ a single reader/writer task which may also handle processing acknowledgement much like exporters process encoding, exporting, and responses.. As it is the responsibility of the receiver to track any required state, such as in Kafka offset tracking and committing the last lowest acknowledged offset, it may be beneficial to avoid adding reference counting in the messages themselves. If the receiver chooses to implement a single task for reading, sending messages and receiving acknowledgements we could avoid the use of mutexes.
Out of Order Acknowledgement
It is expected that acknowledgements may arrive back at the receiver out-of-order as many exporters and data types, such as the OTLPExporter and ResourceSpans and ResourceMetrics do not enforce strict ordering guarantees on delivery. It is therefore the responsibility of the receiver to handle out-of-order acknowledgements and to not rely on ordering.
Partial Delivery and Partial Acknowledgement
Rotel exporters currently have no notion of partial delivery and or partial retry. Therefore at this time we do not intend to support partial acknowledgements. To illustrate a scenario, consider a Kafka receiver connected to an OTLPExporter. OTLP itself does support partial success and does report counts on rejection, but for acknowledgement we will not attempt to send partial acknowledgements.
Beta Was this translation helpful? Give feedback.
All reactions