diff --git a/changelog.d/21967_mqtt_source_acknowledgements.enhancement.md b/changelog.d/21967_mqtt_source_acknowledgements.enhancement.md new file mode 100644 index 0000000000000..2eeed4469b549 --- /dev/null +++ b/changelog.d/21967_mqtt_source_acknowledgements.enhancement.md @@ -0,0 +1 @@ +The `mqtt` source now supports end-to-end acknowledgements. When `acknowledgements` is enabled, the QoS 1 `PUBACK` for an incoming publish is deferred until the resulting events have been delivered to all connected sinks. Because the source already uses `clean_session = false` and subscribes at QoS `AtLeastOnce`, an unacknowledged message is redelivered by the broker after a crash or reconnect, providing at-least-once delivery when the broker resumes the persistent session. A stable `client_id` must be configured when `acknowledgements` is enabled, so the session (and its unacknowledged messages) can be resumed after a restart. Publishes delivered by the broker with QoS 0 cannot be acknowledged end-to-end and are forwarded with a warning instead of being registered for deferred acknowledgement. The source also warns if the broker starts a new session while acknowledgements are enabled, since unacknowledged messages from any previous session for that client ID will not be redelivered. diff --git a/src/common/mqtt.rs b/src/common/mqtt.rs index 8d4d377ab6746..d9e982815b36b 100644 --- a/src/common/mqtt.rs +++ b/src/common/mqtt.rs @@ -104,6 +104,11 @@ pub enum ConfigurationError { /// Credentials provided were incomplete #[snafu(display("Username and password must be either both or neither provided."))] IncompleteCredentials, + /// Acknowledgements enabled without a stable client ID + #[snafu(display( + "A stable `client_id` must be set when `acknowledgements` are enabled, so the MQTT session and its unacknowledged messages can be resumed after a restart." + ))] + AcknowledgementsRequireClientId, } #[derive(Clone)] diff --git a/src/sources/mqtt/config.rs b/src/sources/mqtt/config.rs index 92b8cd78f6a5b..86b845d7b7770 100644 --- a/src/sources/mqtt/config.rs +++ b/src/sources/mqtt/config.rs @@ -19,8 +19,8 @@ use crate::{ ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError, TlsSnafu, }, - config::{SourceConfig, SourceContext, SourceOutput}, - serde::{OneOrMany, default_decoding, default_framing_message_based}, + config::{SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput}, + serde::{OneOrMany, bool_or_struct, default_decoding, default_framing_message_based}, }; /// Configuration for the `mqtt` source. @@ -61,6 +61,10 @@ pub struct MqttSourceConfig { #[serde(default = "default_topic_key")] #[configurable(metadata(docs::examples = "topic"))] pub topic_key: OptionalValuePath, + + #[configurable(derived)] + #[serde(default, deserialize_with = "bool_or_struct")] + pub acknowledgements: SourceAcknowledgementsConfig, } fn default_topic() -> OneOrMany { @@ -77,14 +81,22 @@ impl SourceConfig for MqttSourceConfig { async fn build(&self, cx: SourceContext) -> crate::Result { let log_namespace = cx.log_namespace(self.log_namespace); - let connector = self.build_connector()?; + let acknowledgements = cx.do_acknowledgements(self.acknowledgements); + + let connector = self.build_connector(acknowledgements)?; let decoder = DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) .build()?; - let sink = MqttSource::new(connector.clone(), decoder, log_namespace, self.clone())?; - Ok(Box::pin(sink.run(cx.out, cx.shutdown))) + let source = MqttSource::new( + connector.clone(), + decoder, + log_namespace, + self.clone(), + acknowledgements, + )?; + Ok(Box::pin(source.run(cx.out, cx.shutdown))) } fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { @@ -107,12 +119,22 @@ impl SourceConfig for MqttSourceConfig { } fn can_acknowledge(&self) -> bool { - false + true } } impl MqttSourceConfig { - fn build_connector(&self) -> Result { + fn build_connector(&self, acknowledgements: bool) -> Result { + // End-to-end acknowledgements rely on resuming the MQTT session (and its + // unacknowledged in-flight messages) after a restart, which is keyed by the + // client ID. A generated/random client ID would start a fresh session and + // orphan those messages, silently breaking at-least-once — so require an + // explicit, stable client ID when acknowledgements are enabled. + if acknowledgements && self.common.client_id.is_none() { + return Err(ConfigurationError::AcknowledgementsRequireClientId) + .context(ConfigurationSnafu); + } + let client_id = self.common.client_id.clone().unwrap_or_else(|| { let hash = rand::rng() .sample_iter(&rand_distr::Alphanumeric) @@ -133,6 +155,16 @@ impl MqttSourceConfig { options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size); options.set_clean_session(false); + + // With end-to-end acknowledgements enabled, defer the QoS-1 PUBACK until + // the event has been delivered to all sinks. rumqttc then requires every + // incoming publish to be acked explicitly via `client.ack(&publish)`. + // Combined with `clean_session(false)` and QoS `AtLeastOnce`, an unacked + // message is redelivered by the broker after a crash/reconnect. + if acknowledgements { + options.set_manual_acks(true); + } + match (&self.common.user, &self.common.password) { (Some(user), Some(password)) => { options.set_credentials(user, password); @@ -171,4 +203,25 @@ mod test { fn generate_config() { crate::test_util::test_generate_config::(); } + + #[test] + fn acknowledgements_require_a_stable_client_id() { + // Without acks, a client ID is auto-generated — fine. + let default_config = MqttSourceConfig::default(); + assert!(default_config.build_connector(false).is_ok()); + + // With acks and no explicit client ID, building must fail (a generated ID + // would orphan the session's unacknowledged messages after a restart). + assert!(default_config.build_connector(true).is_err()); + + // With acks and an explicit client ID, building succeeds. + let with_client_id = MqttSourceConfig { + common: MqttCommonConfig { + client_id: Some("stable-id".to_owned()), + ..Default::default() + }, + ..Default::default() + }; + assert!(with_client_id.build_connector(true).is_ok()); + } } diff --git a/src/sources/mqtt/integration_tests.rs b/src/sources/mqtt/integration_tests.rs index ed1013f4bef0f..9f31295a113c7 100644 --- a/src/sources/mqtt/integration_tests.rs +++ b/src/sources/mqtt/integration_tests.rs @@ -11,7 +11,7 @@ use crate::{ SourceSender, common::mqtt::MqttCommonConfig, config::{SourceConfig, SourceContext, log_schema}, - event::Event, + event::{Event, EventStatus}, serde::OneOrMany, sources::mqtt::MqttSourceConfig, test_util::{ @@ -40,9 +40,21 @@ async fn send_test_events(client: &AsyncClient, topic: &str, messages: &Vec String { + event + .as_log() + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .to_string_lossy() + .into_owned() +} + async fn get_mqtt_client() -> AsyncClient { + // Unique client ID per producer: brokers that strictly enforce client-ID + // uniqueness (e.g. RabbitMQ) otherwise kick a previous connection when tests + // run concurrently, which manifests as spurious publish timeouts. let mut mqtt_options = MqttOptions::new( - "integration-test-producer", + format!("integration-test-producer-{}", random_string(6)), mqtt_broker_address(), mqtt_broker_port(), ); @@ -118,6 +130,99 @@ async fn mqtt_one_topic_happy() { .await; } +/// With end-to-end acknowledgements enabled, a message that is received but not +/// successfully delivered (the sink rejects it) must not be acknowledged to the +/// broker, so the broker redelivers it. This proves the at-least-once guarantee +/// added by the `acknowledgements` option: no data is lost when a downstream +/// failure or crash occurs before the write is confirmed. +#[tokio::test] +async fn mqtt_redelivers_unacknowledged_messages() { + trace_init(); + + let topic = "source-redelivery-test"; + // A stable client ID so the second connection resumes the same persistent + // session (the source sets `clean_session = false`); the broker then + // redelivers any in-flight QoS 1 message that was never acknowledged. + let client_id = format!("sourceAckTest{}", random_string(6)); + let message = random_string(32); + + let make_config = || MqttSourceConfig { + common: MqttCommonConfig { + host: mqtt_broker_address(), + port: mqtt_broker_port(), + client_id: Some(client_id.clone()), + ..Default::default() + }, + topic: OneOrMany::One(topic.to_owned()), + acknowledgements: true.into(), + ..MqttSourceConfig::default() + }; + + // Phase 1: the first instance subscribes (creating the persistent session) + // and receives the message, but its sink rejects every event, so the source + // never sends a PUBACK. + let (tx1, mut rx1) = SourceSender::new_test_finalize(EventStatus::Rejected); + let config1 = make_config(); + let source1 = tokio::spawn(async move { + config1 + .build(SourceContext::new_test(tx1, None)) + .await + .unwrap() + .await + .unwrap() + }); + + // Wait for the subscription to be established before publishing. + tokio::time::sleep(Duration::from_millis(500)).await; + + let producer = get_mqtt_client().await; + producer + .publish(topic, QoS::AtLeastOnce, false, message.as_bytes()) + .await + .unwrap(); + + // The first instance must actually receive (and reject) the message so that + // it remains unacknowledged in the broker. + let first = timeout(Duration::from_secs(5), rx1.next()) + .await + .expect("timed out waiting for first delivery") + .expect("source stream ended unexpectedly"); + assert_eq!(message_body(&first), message); + drop(first); + + // Give the source a moment to observe the rejected status (and therefore + // skip the ack), then drop the connection without acknowledging. + tokio::time::sleep(Duration::from_millis(200)).await; + source1.abort(); + let _ = source1.await; + + // Phase 2: a new instance resumes the same session; the broker must + // redeliver the unacknowledged message. + let (tx2, mut rx2) = SourceSender::new_test(); + let config2 = make_config(); + let source2 = tokio::spawn(async move { + config2 + .build(SourceContext::new_test(tx2, None)) + .await + .unwrap() + .await + .unwrap() + }); + + let redelivered = timeout(Duration::from_secs(10), rx2.next()) + .await + .expect("timed out waiting for redelivery: the message was lost") + .expect("source stream ended unexpectedly"); + assert_eq!( + message_body(&redelivered), + message, + "redelivered message did not match the original" + ); + + source2.abort(); + let _ = source2.await; +} + #[tokio::test] async fn mqtt_many_topics_happy() { trace_init(); diff --git a/src/sources/mqtt/source.rs b/src/sources/mqtt/source.rs index d0f3efea5b0f7..af9267f0511cc 100644 --- a/src/sources/mqtt/source.rs +++ b/src/sources/mqtt/source.rs @@ -1,8 +1,10 @@ +use futures::StreamExt; use itertools::Itertools; use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS, SubscribeFilter}; use vector_lib::{ codecs::Decoder, config::{LegacyKey, LogNamespace}, + finalizer::UnorderedFinalizer, internal_event::EventsReceived, lookup::path, }; @@ -10,18 +12,105 @@ use vector_lib::{ use crate::{ SourceSender, common::mqtt::MqttConnector, - event::{BatchNotifier, Event}, + event::{BatchNotifier, BatchStatus, Event}, internal_events::{EndpointBytesReceived, StreamClosedError}, serde::OneOrMany, shutdown::ShutdownSignal, sources::{mqtt::MqttSourceConfig, util}, }; +/// Identifies an in-flight publish so its QoS-1 PUBACK can be sent once the +/// downstream sinks confirm delivery. Only the packet id (carried by `Publish`) +/// is needed to ack; the payload is cleared before the entry is retained so +/// pending acks don't pin payloads in memory under backpressure. +#[derive(Clone, Debug)] +struct FinalizerEntry { + publish: Publish, + connection_generation: u64, +} + +fn should_ack_finalized_publish( + status: BatchStatus, + entry_generation: u64, + connection_generation: u64, +) -> bool { + status == BatchStatus::Delivered && entry_generation == connection_generation +} + +fn publish_supports_end_to_end_acknowledgements(qos: QoS) -> bool { + qos != QoS::AtMostOnce +} + +fn should_defer_publish_ack(acknowledgements: bool, qos: QoS) -> bool { + acknowledgements && publish_supports_end_to_end_acknowledgements(qos) +} + +fn warn_unsupported_acknowledgement_qos(qos: QoS, topic: &str) { + warn!( + message = "MQTT acknowledgements require publishes with QoS 1 or greater; forwarding message without end-to-end acknowledgement guarantee.", + ?qos, + topic, + internal_log_rate_limit = true, + ); +} + +const fn should_warn_session_not_resumed(acknowledgements: bool, session_present: bool) -> bool { + acknowledgements && !session_present +} + +fn warn_session_not_resumed() { + warn!( + message = "MQTT broker started a new session while acknowledgements are enabled; unacknowledged messages from any previous session for this client ID will not be redelivered.", + internal_log_rate_limit = true, + ); +} + +#[derive(Default)] +struct PendingAcks { + publishes: Vec, +} + +impl PendingAcks { + fn push(&mut self, publish: Publish) { + self.publishes.push(publish); + } + + fn clear(&mut self) { + self.publishes.clear(); + } + + fn retry(&mut self, client: &rumqttc::AsyncClient) { + self.retry_with(|publish| client.try_ack(publish).is_ok()); + } + + fn try_ack(&mut self, connected: bool, publish: Publish, client: &rumqttc::AsyncClient) { + self.try_ack_with(connected, publish, |publish| { + client.try_ack(publish).is_ok() + }); + } + + fn try_ack_with( + &mut self, + connected: bool, + publish: Publish, + mut try_ack: impl FnMut(&Publish) -> bool, + ) { + if connected && !try_ack(&publish) { + self.push(publish); + } + } + + fn retry_with(&mut self, mut try_ack: impl FnMut(&Publish) -> bool) { + self.publishes.retain(|publish| !try_ack(publish)); + } +} + pub struct MqttSource { connector: MqttConnector, decoder: Decoder, log_namespace: LogNamespace, config: MqttSourceConfig, + acknowledgements: bool, } impl MqttSource { @@ -30,12 +119,14 @@ impl MqttSource { decoder: Decoder, log_namespace: LogNamespace, config: MqttSourceConfig, + acknowledgements: bool, ) -> crate::Result { Ok(Self { connector, decoder, log_namespace, config, + acknowledgements, }) } @@ -62,23 +153,115 @@ impl MqttSource { } } + // Finalizer drives end-to-end acknowledgements: each in-flight publish is + // registered with its batch-status receiver, and we send the QoS-1 PUBACK + // only once the sinks report `Delivered`. Unused when acknowledgements are + // disabled (rumqttc auto-acks in that mode). MQTT PUBACKs are independent + // per packet id (unlike Kafka offsets), so finalization is unordered — a + // slow/stuck batch must not hold back acks for already-delivered publishes. + let (finalizer, mut ack_stream) = UnorderedFinalizer::::maybe_new( + self.acknowledgements, + Some(shutdown.clone()), + ); + + // PUBACKs that rumqttc's bounded request channel was too full to accept, + // retained for retry rather than dropped. Dropping a PUBACK for an already + // delivered message would pin it in the broker's in-flight window until the + // next reconnect. This is bounded in practice by that in-flight window (the + // broker stops delivering once it fills), and the event loop below drains the + // request channel, so entries flush on subsequent iterations. + let mut pending_acks = PendingAcks::default(); + let mut connected = false; + let mut connection_generation = 0; + loop { + // Retry deferred PUBACKs while connected (the event loop below drains the + // request channel). Skipped while disconnected: a publish's packet id is + // only valid on the connection it arrived on, so stale PUBACKs must not be + // replayed across a reconnect. + if connected { + pending_acks.retry(&client); + } + tokio::select! { _ = shutdown.clone() => return Ok(()), + entry = ack_stream.next() => { + // Only PUBACK delivered events. On Errored/Rejected we skip the + // ack so the broker redelivers after reconnect (QoS-1 + + // clean_session=false), giving at-least-once delivery. Use the + // non-blocking `try_ack` — awaiting `ack` could deadlock, since + // this same task polls the event loop that drains rumqttc's request + // channel. If that channel is full, retain the PUBACK for retry + // (above) instead of dropping it. + if let Some((status, entry)) = entry + && should_ack_finalized_publish( + status, + entry.connection_generation, + connection_generation, + ) + { + pending_acks.try_ack(connected, entry.publish, &client); + } + }, mqtt_event = connection.poll() => { - // If an error is returned here there is currently no way to tie this back - // to the event that was posted which means we can't accurately provide - // delivery guarantees. - // We need this issue resolved first: - // https://github.com/bytebeamio/rumqtt/issues/349 + // Providing at-least-once here does not require correlating a + // connection/poll error back to a specific in-flight publish. + // rumqtt#349 (no packet id for *outbound* publishes) concerns the + // publish/sink direction and does not apply to a subscribe-only + // source: each incoming Publish already carries its packet id, and + // we withhold its QoS-1 PUBACK until the event is delivered + // end-to-end. Anything left unacked when the connection drops is + // redelivered by the broker on reconnect (clean_session=false + QoS + // AtLeastOnce). match mqtt_event { Ok(MqttEvent::Incoming(Incoming::Publish(publish))) => { - self.process_message(publish, &mut out).await; + self.process_message( + publish, + &mut out, + finalizer.as_ref(), + connection_generation, + ).await; + } + Ok(MqttEvent::Incoming(Incoming::SubAck(suback))) + if self.acknowledgements => + { + for return_code in suback.return_codes { + if let rumqttc::SubscribeReasonCode::Success(qos) = return_code + && !publish_supports_end_to_end_acknowledgements(qos) + { + warn!( + message = "MQTT broker granted a subscription QoS below the level required for end-to-end acknowledgements.", + ?qos, + internal_log_rate_limit = true, + ); + } + } } - Ok(MqttEvent::Incoming( - Incoming::PubAck(_) | Incoming::PubRec(_) | Incoming::PubComp(_), - )) => { - // TODO Handle acknowledgement - https://github.com/vectordotdev/vector/issues/21967 + // A (re)connected session resumes here; the broker will + // redeliver any unacknowledged publishes, so drop deferred + // PUBACKs whose packet ids came from the previous connection. + Ok(MqttEvent::Incoming(Incoming::ConnAck(connack))) => { + if should_warn_session_not_resumed( + self.acknowledgements, + connack.session_present, + ) { + warn_session_not_resumed(); + } + connected = true; + connection_generation += 1; + pending_acks.clear(); + if let Some(finalizer) = &finalizer { + finalizer.flush(); + } + } + // Connection lost: same stale-packet-id reasoning, and rumqttc + // drops its own queued acks while reconnecting. + Err(_) => { + connected = false; + pending_acks.clear(); + if let Some(finalizer) = &finalizer { + finalizer.flush(); + } } _ => {} } @@ -87,7 +270,13 @@ impl MqttSource { } } - async fn process_message(&self, publish: Publish, out: &mut SourceSender) { + async fn process_message( + &self, + mut publish: Publish, + out: &mut SourceSender, + finalizer: Option<&UnorderedFinalizer>, + connection_generation: u64, + ) { emit!(EndpointBytesReceived { byte_size: publish.payload.len(), protocol: "mqtt", @@ -95,7 +284,14 @@ impl MqttSource { }); let events_received = register!(EventsReceived); - let (batch, _batch_receiver) = BatchNotifier::maybe_new_with_receiver(false); + let use_end_to_end_acknowledgements = + should_defer_publish_ack(finalizer.is_some(), publish.qos); + if finalizer.is_some() && !use_end_to_end_acknowledgements { + warn_unsupported_acknowledgement_qos(publish.qos, &publish.topic); + } + + let (batch, batch_receiver) = + BatchNotifier::maybe_new_with_receiver(use_end_to_end_acknowledgements); // Error is logged by `vector_lib::codecs::Decoder`, no further handling // is needed here. let decoded = util::decode_message( @@ -116,7 +312,23 @@ impl MqttSource { let count = decoded.len(); match out.send_batch(decoded).await { - Ok(()) => {} + Ok(()) => { + // Register the publish for deferred PUBACK once the batch is + // delivered. Without acknowledgements `batch_receiver` is None and + // rumqttc has already auto-acked. The payload is no longer needed + // (ack only uses the packet id), so clear it before retaining the + // entry to avoid pinning payloads in memory while sinks process. + if let Some((finalizer, receiver)) = finalizer.zip(batch_receiver) { + publish.payload = Default::default(); + finalizer.add( + FinalizerEntry { + publish, + connection_generation, + }, + receiver, + ); + } + } Err(_) => emit!(StreamClosedError { count }), } } @@ -137,3 +349,82 @@ impl MqttSource { } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn publish(pkid: u16) -> Publish { + let mut publish = Publish::new("topic", QoS::AtLeastOnce, vec![1, 2, 3]); + publish.pkid = pkid; + publish + } + + #[test] + fn pending_acks_keeps_failed_retries() { + let mut pending_acks = PendingAcks::default(); + pending_acks.push(publish(1)); + pending_acks.push(publish(2)); + pending_acks.push(publish(3)); + + let mut attempted = Vec::new(); + pending_acks.retry_with(|publish| { + attempted.push(publish.pkid); + publish.pkid != 2 + }); + + assert_eq!(attempted, vec![1, 2, 3]); + assert_eq!(pending_acks.publishes.len(), 1); + assert_eq!(pending_acks.publishes[0].pkid, 2); + + pending_acks.retry_with(|_| true); + assert!(pending_acks.publishes.is_empty()); + } + + #[test] + fn pending_acks_clear_drops_stale_packet_ids() { + let mut pending_acks = PendingAcks::default(); + pending_acks.push(publish(1)); + pending_acks.push(publish(2)); + + pending_acks.clear(); + + assert!(pending_acks.publishes.is_empty()); + } + + #[test] + fn pending_acks_drops_finalized_publish_while_disconnected() { + let mut pending_acks = PendingAcks::default(); + let mut attempted = false; + + pending_acks.try_ack_with(false, publish(1), |_| { + attempted = true; + true + }); + + assert!(!attempted); + assert!(pending_acks.publishes.is_empty()); + } + + #[test] + fn finalized_publish_must_match_current_connection_generation() { + assert!(should_ack_finalized_publish(BatchStatus::Delivered, 2, 2)); + assert!(!should_ack_finalized_publish(BatchStatus::Delivered, 1, 2)); + assert!(!should_ack_finalized_publish(BatchStatus::Rejected, 2, 2)); + } + + #[test] + fn qos0_publish_does_not_defer_acknowledgement() { + assert!(!should_defer_publish_ack(true, QoS::AtMostOnce)); + assert!(should_defer_publish_ack(true, QoS::AtLeastOnce)); + assert!(should_defer_publish_ack(true, QoS::ExactlyOnce)); + assert!(!should_defer_publish_ack(false, QoS::AtLeastOnce)); + } + + #[test] + fn warns_when_acknowledgement_session_is_not_resumed() { + assert!(should_warn_session_not_resumed(true, false)); + assert!(!should_warn_session_not_resumed(true, true)); + assert!(!should_warn_session_not_resumed(false, false)); + } +} diff --git a/website/cue/reference/components/sources/generated/mqtt.cue b/website/cue/reference/components/sources/generated/mqtt.cue index 4a699a4e6951a..75f556b472399 100644 --- a/website/cue/reference/components/sources/generated/mqtt.cue +++ b/website/cue/reference/components/sources/generated/mqtt.cue @@ -1,6 +1,27 @@ package metadata generated: components: sources: mqtt: configuration: { + acknowledgements: { + deprecated: true + description: """ + Controls how acknowledgements are handled by this source. + + This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level. + + Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: "Whether or not end-to-end acknowledgements are enabled for this source." + required: false + type: bool: {} + } + } client_id: { description: "MQTT client ID." required: false