Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/kafka_source_ignore_tombstones.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a new `ignore_tombstones` option to the `kafka` source. When set to `false`, Kafka tombstone messages (records with a `null` payload) are no longer skipped and instead produce an event, allowing pipelines to consume messages where only the key, headers, or other metadata are needed. Defaults to `true` to preserve the existing behavior.
172 changes: 162 additions & 10 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use vector_lib::{
DecoderFramedRead, StreamDecodingError,
decoding::{DeserializerConfig, FramingConfig},
},
config::{LegacyKey, LogNamespace},
config::{DataType, LegacyKey, LogNamespace},
configurable::configurable_component,
finalizer::OrderedFinalizer,
lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
Expand All @@ -56,7 +56,7 @@ use crate::{
LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
log_schema,
},
event::{BatchNotifier, BatchStatus, Event, Value},
event::{BatchNotifier, BatchStatus, Event, LogEvent, Value},
internal_events::{
KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
StreamClosedError,
Expand Down Expand Up @@ -225,6 +225,15 @@ pub struct KafkaSourceConfig {
))]
librdkafka_options: Option<HashMap<String, String>>,

/// Whether to ignore Kafka tombstone messages (messages with a `null` payload).
///
/// When `true` (the default), messages with no payload are skipped. Set to `false`
/// to emit an event for each tombstone, which is useful when only the message
/// headers, key, or metadata are needed.
#[serde(default = "crate::serde::default_true")]
#[derivative(Default(value = "crate::serde::default_true()"))]
ignore_tombstones: bool,

#[serde(flatten)]
auth: kafka::KafkaAuthConfig,

Expand Down Expand Up @@ -407,10 +416,14 @@ impl SourceConfig for KafkaSourceConfig {
None,
);

vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
// When tombstones are emitted, the source produces log events for them
// regardless of the configured decoder.
let mut output_type = self.decoding.output_type();
if !self.ignore_tombstones {
output_type |= DataType::Log;
}

vec![SourceOutput::new_maybe_logs(output_type, schema_definition)]
}

fn can_acknowledge(&self) -> bool {
Expand Down Expand Up @@ -591,6 +604,7 @@ impl ConsumerStateInner<Consuming> {
let keys = self.config.keys();
let decoder = self.decoder.clone();
let log_namespace = self.log_namespace;
let ignore_tombstones = self.config.ignore_tombstones;
let mut out = self.out.clone();

let (end_tx, mut end_signal) = oneshot::channel::<()>();
Expand Down Expand Up @@ -651,7 +665,7 @@ impl ConsumerStateInner<Consuming> {
topic: msg.topic(),
partition: msg.partition(),
});
parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace, ignore_tombstones).await;
}
},
)
Expand Down Expand Up @@ -948,6 +962,7 @@ fn drive_kafka_consumer(
});
}

#[allow(clippy::too_many_arguments)]
async fn parse_message(
msg: BorrowedMessage<'_>,
decoder: Decoder,
Expand All @@ -956,8 +971,11 @@ async fn parse_message(
acknowledgements: bool,
finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
log_namespace: LogNamespace,
ignore_tombstones: bool,
) {
if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
if let Some((count, stream)) =
parse_stream(&msg, decoder, keys, log_namespace, ignore_tombstones)
{
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut stream = stream.map(|event| {
// All acknowledgements flow through the normal Finalizer stream so
Expand Down Expand Up @@ -991,11 +1009,27 @@ fn parse_stream<'a>(
decoder: Decoder,
keys: &'a Keys,
log_namespace: LogNamespace,
ignore_tombstones: bool,
) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
let payload = msg.payload()?; // skip messages with empty payload

let rmsg = ReceivedMessage::from(msg);

let payload = match msg.payload() {
Some(payload) => payload,
None if ignore_tombstones => return None,
None => {
let mut event = Event::Log(LogEvent::default());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include log outputs when emitting tombstones

When ignore_tombstones = false is combined with a metric-only decoder such as decoding.codec = "influxdb", KafkaSourceConfig::outputs() still advertises only self.decoding.output_type() (Metric), but this branch emits an Event::Log. The topology will therefore accept metric-only downstream components, which filter out log arrays, and reject log-only consumers as a data-type mismatch, so tombstone records produced here cannot be consumed in those metric pipelines. Include DataType::Log in the source output type when this option can emit tombstones, or reject metric-only decoders for this mode.

Useful? React with 👍 / 👎.

rmsg.apply(keys, &mut event, log_namespace);
emit!(KafkaEventsReceived {
count: 1,
byte_size: event.estimated_json_encoded_size_of(),
topic: &rmsg.topic,
partition: rmsg.partition,
});
let stream = stream! { yield event; }.boxed();
return Some((1, stream));
}
};

let payload = Cursor::new(Bytes::copy_from_slice(payload));

let mut stream = DecoderFramedRead::with_capacity(payload, decoder, msg.payload_len());
Expand Down Expand Up @@ -1668,6 +1702,124 @@ mod integration_test {
send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
}

async fn send_tombstone(topic: &str, key: &str) {
let producer: &FutureProducer = &client_config(None);
let record: FutureRecord<'_, str, str> =
FutureRecord::to(topic)
.key(key)
.headers(OwnedHeaders::new().insert(Header {
key: HEADER_KEY,
value: Some(HEADER_VALUE),
}));
if let Err(error) = producer.send(record, Timeout::Never).await {
panic!("Cannot send tombstone to Kafka: {error:?}");
}
}

#[tokio::test]
async fn consumes_tombstones_when_disabled() {
let topic = format!("test-topic-{}", random_string(10));
let group_id = format!("test-group-{}", random_string(10));
create_topic(&topic, 1).await;

send_tombstone(&topic, KEY).await;

let config = KafkaSourceConfig {
ignore_tombstones: false,
..make_config(&topic, &group_id, LogNamespace::Legacy, None)
};

let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
let (tx, rx) = SourceSender::new_test();
let (trigger_shutdown, shutdown_done) =
spawn_kafka(tx, config, false, false, LogNamespace::Legacy);
let events = collect_n(rx, 1).await;
tokio::task::yield_now().await;
drop(trigger_shutdown);
shutdown_done.await;
events
})
.await;

assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert_eq!(log["message_key"], KEY.into());
assert_eq!(log["topic"], topic.into());
assert!(log.contains("partition"));
assert!(log.contains("offset"));
let mut expected_headers = ObjectMap::new();
expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
assert_eq!(log["headers"], Value::from(expected_headers));
}

#[tokio::test]
async fn consumes_tombstones_with_json_decoding() {
use vector_lib::codecs::decoding::format::JsonDeserializerConfig;

let topic = format!("test-topic-{}", random_string(10));
let group_id = format!("test-group-{}", random_string(10));
create_topic(&topic, 1).await;

send_tombstone(&topic, KEY).await;

let config = KafkaSourceConfig {
ignore_tombstones: false,
decoding: JsonDeserializerConfig::default().into(),
..make_config(&topic, &group_id, LogNamespace::Legacy, None)
};

let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
let (tx, rx) = SourceSender::new_test();
let (trigger_shutdown, shutdown_done) =
spawn_kafka(tx, config, false, false, LogNamespace::Legacy);
let events = collect_n(rx, 1).await;
tokio::task::yield_now().await;
drop(trigger_shutdown);
shutdown_done.await;
events
})
.await;

assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert_eq!(log["message_key"], KEY.into());
assert_eq!(log["topic"], topic.into());
let mut expected_headers = ObjectMap::new();
expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
assert_eq!(log["headers"], Value::from(expected_headers));
}

#[tokio::test]
async fn ignores_tombstones_by_default() {
let topic = format!("test-topic-{}", random_string(10));
let group_id = format!("test-group-{}", random_string(10));

create_topic(&topic, 1).await;
send_tombstone(&topic, KEY).await;
send_events(topic.clone(), 1, 1).await;

let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
assert!(config.ignore_tombstones, "default should be true");

let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
let (tx, rx) = SourceSender::new_test();
let (trigger_shutdown, shutdown_done) =
spawn_kafka(tx, config, false, false, LogNamespace::Legacy);
let events = collect_n(rx, 1).await;
tokio::task::yield_now().await;
drop(trigger_shutdown);
shutdown_done.await;
events
})
.await;

assert_eq!(events.len(), 1);
assert_eq!(
events[0].as_log()[log_schema().message_key().unwrap().to_string()],
format!("{TEXT} 000").into()
);
}

async fn send_receive(
acknowledgements: bool,
error_at: impl Fn(usize) -> bool,
Expand Down
Loading