diff --git a/changelog.d/kafka_source_ignore_tombstones.feature.md b/changelog.d/kafka_source_ignore_tombstones.feature.md new file mode 100644 index 0000000000000..7155256b719ab --- /dev/null +++ b/changelog.d/kafka_source_ignore_tombstones.feature.md @@ -0,0 +1 @@ +Added a new `ignore_tombstones` option to the `kafka` source. When set to `false`, Kafka tombstone messages (records with a `null` payload) are no longer skipped and instead produce an event, allowing pipelines to consume messages where only the key, headers, or other metadata are needed. Defaults to `true` to preserve the existing behavior. diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index f5927b365157b..ecc20f3dd988e 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -42,7 +42,7 @@ use vector_lib::{ DecoderFramedRead, StreamDecodingError, decoding::{DeserializerConfig, FramingConfig}, }, - config::{LegacyKey, LogNamespace}, + config::{DataType, LegacyKey, LogNamespace}, configurable::configurable_component, finalizer::OrderedFinalizer, lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path}, @@ -56,7 +56,7 @@ use crate::{ LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput, log_schema, }, - event::{BatchNotifier, BatchStatus, Event, Value}, + event::{BatchNotifier, BatchStatus, Event, LogEvent, Value}, internal_events::{ KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError, StreamClosedError, @@ -225,6 +225,15 @@ pub struct KafkaSourceConfig { ))] librdkafka_options: Option>, + /// Whether to ignore Kafka tombstone messages (messages with a `null` payload). + /// + /// When `true` (the default), messages with no payload are skipped. Set to `false` + /// to emit an event for each tombstone, which is useful when only the message + /// headers, key, or metadata are needed. + #[serde(default = "crate::serde::default_true")] + #[derivative(Default(value = "crate::serde::default_true()"))] + ignore_tombstones: bool, + #[serde(flatten)] auth: kafka::KafkaAuthConfig, @@ -407,10 +416,14 @@ impl SourceConfig for KafkaSourceConfig { None, ); - vec![SourceOutput::new_maybe_logs( - self.decoding.output_type(), - schema_definition, - )] + // When tombstones are emitted, the source produces log events for them + // regardless of the configured decoder. + let mut output_type = self.decoding.output_type(); + if !self.ignore_tombstones { + output_type |= DataType::Log; + } + + vec![SourceOutput::new_maybe_logs(output_type, schema_definition)] } fn can_acknowledge(&self) -> bool { @@ -591,6 +604,7 @@ impl ConsumerStateInner { let keys = self.config.keys(); let decoder = self.decoder.clone(); let log_namespace = self.log_namespace; + let ignore_tombstones = self.config.ignore_tombstones; let mut out = self.out.clone(); let (end_tx, mut end_signal) = oneshot::channel::<()>(); @@ -651,7 +665,7 @@ impl ConsumerStateInner { topic: msg.topic(), partition: msg.partition(), }); - parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await; + parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace, ignore_tombstones).await; } }, ) @@ -948,6 +962,7 @@ fn drive_kafka_consumer( }); } +#[allow(clippy::too_many_arguments)] async fn parse_message( msg: BorrowedMessage<'_>, decoder: Decoder, @@ -956,8 +971,11 @@ async fn parse_message( acknowledgements: bool, finalizer: &Option>, log_namespace: LogNamespace, + ignore_tombstones: bool, ) { - if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) { + if let Some((count, stream)) = + parse_stream(&msg, decoder, keys, log_namespace, ignore_tombstones) + { let (batch, receiver) = BatchNotifier::new_with_receiver(); let mut stream = stream.map(|event| { // All acknowledgements flow through the normal Finalizer stream so @@ -991,11 +1009,27 @@ fn parse_stream<'a>( decoder: Decoder, keys: &'a Keys, log_namespace: LogNamespace, + ignore_tombstones: bool, ) -> Option<(usize, impl Stream + 'a + use<'a>)> { - let payload = msg.payload()?; // skip messages with empty payload - let rmsg = ReceivedMessage::from(msg); + let payload = match msg.payload() { + Some(payload) => payload, + None if ignore_tombstones => return None, + None => { + let mut event = Event::Log(LogEvent::default()); + rmsg.apply(keys, &mut event, log_namespace); + emit!(KafkaEventsReceived { + count: 1, + byte_size: event.estimated_json_encoded_size_of(), + topic: &rmsg.topic, + partition: rmsg.partition, + }); + let stream = stream! { yield event; }.boxed(); + return Some((1, stream)); + } + }; + let payload = Cursor::new(Bytes::copy_from_slice(payload)); let mut stream = DecoderFramedRead::with_capacity(payload, decoder, msg.payload_len()); @@ -1668,6 +1702,124 @@ mod integration_test { send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await; } + async fn send_tombstone(topic: &str, key: &str) { + let producer: &FutureProducer = &client_config(None); + let record: FutureRecord<'_, str, str> = + FutureRecord::to(topic) + .key(key) + .headers(OwnedHeaders::new().insert(Header { + key: HEADER_KEY, + value: Some(HEADER_VALUE), + })); + if let Err(error) = producer.send(record, Timeout::Never).await { + panic!("Cannot send tombstone to Kafka: {error:?}"); + } + } + + #[tokio::test] + async fn consumes_tombstones_when_disabled() { + let topic = format!("test-topic-{}", random_string(10)); + let group_id = format!("test-group-{}", random_string(10)); + create_topic(&topic, 1).await; + + send_tombstone(&topic, KEY).await; + + let config = KafkaSourceConfig { + ignore_tombstones: false, + ..make_config(&topic, &group_id, LogNamespace::Legacy, None) + }; + + let events = assert_source_compliance(&["protocol", "topic", "partition"], async move { + let (tx, rx) = SourceSender::new_test(); + let (trigger_shutdown, shutdown_done) = + spawn_kafka(tx, config, false, false, LogNamespace::Legacy); + let events = collect_n(rx, 1).await; + tokio::task::yield_now().await; + drop(trigger_shutdown); + shutdown_done.await; + events + }) + .await; + + assert_eq!(events.len(), 1); + let log = events[0].as_log(); + assert_eq!(log["message_key"], KEY.into()); + assert_eq!(log["topic"], topic.into()); + assert!(log.contains("partition")); + assert!(log.contains("offset")); + let mut expected_headers = ObjectMap::new(); + expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE)); + assert_eq!(log["headers"], Value::from(expected_headers)); + } + + #[tokio::test] + async fn consumes_tombstones_with_json_decoding() { + use vector_lib::codecs::decoding::format::JsonDeserializerConfig; + + let topic = format!("test-topic-{}", random_string(10)); + let group_id = format!("test-group-{}", random_string(10)); + create_topic(&topic, 1).await; + + send_tombstone(&topic, KEY).await; + + let config = KafkaSourceConfig { + ignore_tombstones: false, + decoding: JsonDeserializerConfig::default().into(), + ..make_config(&topic, &group_id, LogNamespace::Legacy, None) + }; + + let events = assert_source_compliance(&["protocol", "topic", "partition"], async move { + let (tx, rx) = SourceSender::new_test(); + let (trigger_shutdown, shutdown_done) = + spawn_kafka(tx, config, false, false, LogNamespace::Legacy); + let events = collect_n(rx, 1).await; + tokio::task::yield_now().await; + drop(trigger_shutdown); + shutdown_done.await; + events + }) + .await; + + assert_eq!(events.len(), 1); + let log = events[0].as_log(); + assert_eq!(log["message_key"], KEY.into()); + assert_eq!(log["topic"], topic.into()); + let mut expected_headers = ObjectMap::new(); + expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE)); + assert_eq!(log["headers"], Value::from(expected_headers)); + } + + #[tokio::test] + async fn ignores_tombstones_by_default() { + let topic = format!("test-topic-{}", random_string(10)); + let group_id = format!("test-group-{}", random_string(10)); + + create_topic(&topic, 1).await; + send_tombstone(&topic, KEY).await; + send_events(topic.clone(), 1, 1).await; + + let config = make_config(&topic, &group_id, LogNamespace::Legacy, None); + assert!(config.ignore_tombstones, "default should be true"); + + let events = assert_source_compliance(&["protocol", "topic", "partition"], async move { + let (tx, rx) = SourceSender::new_test(); + let (trigger_shutdown, shutdown_done) = + spawn_kafka(tx, config, false, false, LogNamespace::Legacy); + let events = collect_n(rx, 1).await; + tokio::task::yield_now().await; + drop(trigger_shutdown); + shutdown_done.await; + events + }) + .await; + + assert_eq!(events.len(), 1); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + format!("{TEXT} 000").into() + ); + } + async fn send_receive( acknowledgements: bool, error_at: impl Fn(usize) -> bool,