Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The `mqtt` source now supports end-to-end acknowledgements. When `acknowledgements` is enabled, the QoS 1 `PUBACK` for an incoming publish is deferred until the resulting events have been delivered to all connected sinks. Because the source already uses `clean_session = false` and subscribes at QoS `AtLeastOnce`, an unacknowledged message is redelivered by the broker after a crash or reconnect, providing at-least-once delivery when the broker resumes the persistent session. A stable `client_id` must be configured when `acknowledgements` is enabled, so the session (and its unacknowledged messages) can be resumed after a restart. Publishes delivered by the broker with QoS 0 cannot be acknowledged end-to-end and are forwarded with a warning instead of being registered for deferred acknowledgement. The source also warns if the broker starts a new session while acknowledgements are enabled, since unacknowledged messages from any previous session for that client ID will not be redelivered.
5 changes: 5 additions & 0 deletions src/common/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ pub enum ConfigurationError {
/// Credentials provided were incomplete
#[snafu(display("Username and password must be either both or neither provided."))]
IncompleteCredentials,
/// Acknowledgements enabled without a stable client ID
#[snafu(display(
"A stable `client_id` must be set when `acknowledgements` are enabled, so the MQTT session and its unacknowledged messages can be resumed after a restart."
))]
AcknowledgementsRequireClientId,
}

#[derive(Clone)]
Expand Down
67 changes: 60 additions & 7 deletions src/sources/mqtt/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::{
ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
TlsSnafu,
},
config::{SourceConfig, SourceContext, SourceOutput},
serde::{OneOrMany, default_decoding, default_framing_message_based},
config::{SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
serde::{OneOrMany, bool_or_struct, default_decoding, default_framing_message_based},
};

/// Configuration for the `mqtt` source.
Expand Down Expand Up @@ -61,6 +61,10 @@ pub struct MqttSourceConfig {
#[serde(default = "default_topic_key")]
#[configurable(metadata(docs::examples = "topic"))]
pub topic_key: OptionalValuePath,

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
pub acknowledgements: SourceAcknowledgementsConfig,
}

fn default_topic() -> OneOrMany<String> {
Expand All @@ -77,14 +81,22 @@ impl SourceConfig for MqttSourceConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);

let connector = self.build_connector()?;
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

let connector = self.build_connector(acknowledgements)?;

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

let sink = MqttSource::new(connector.clone(), decoder, log_namespace, self.clone())?;
Ok(Box::pin(sink.run(cx.out, cx.shutdown)))
let source = MqttSource::new(
connector.clone(),
decoder,
log_namespace,
self.clone(),
acknowledgements,
)?;
Ok(Box::pin(source.run(cx.out, cx.shutdown)))
}

fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
Expand All @@ -107,12 +119,22 @@ impl SourceConfig for MqttSourceConfig {
}

fn can_acknowledge(&self) -> bool {
false
true
}
}

impl MqttSourceConfig {
fn build_connector(&self) -> Result<MqttConnector, MqttError> {
fn build_connector(&self, acknowledgements: bool) -> Result<MqttConnector, MqttError> {
// End-to-end acknowledgements rely on resuming the MQTT session (and its
// unacknowledged in-flight messages) after a restart, which is keyed by the
// client ID. A generated/random client ID would start a fresh session and
// orphan those messages, silently breaking at-least-once — so require an
// explicit, stable client ID when acknowledgements are enabled.
if acknowledgements && self.common.client_id.is_none() {
return Err(ConfigurationError::AcknowledgementsRequireClientId)
.context(ConfigurationSnafu);
}

let client_id = self.common.client_id.clone().unwrap_or_else(|| {
let hash = rand::rng()
.sample_iter(&rand_distr::Alphanumeric)
Expand All @@ -133,6 +155,16 @@ impl MqttSourceConfig {
options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);

options.set_clean_session(false);

// With end-to-end acknowledgements enabled, defer the QoS-1 PUBACK until
// the event has been delivered to all sinks. rumqttc then requires every
// incoming publish to be acked explicitly via `client.ack(&publish)`.
// Combined with `clean_session(false)` and QoS `AtLeastOnce`, an unacked
// message is redelivered by the broker after a crash/reconnect.
if acknowledgements {
options.set_manual_acks(true);
Comment on lines +164 to +165

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Require stable client IDs when manual acks are enabled

When acknowledgements are enabled but common.client_id is omitted, the code above still creates a random vectorSource... ID and this block turns on manual ACKs. MQTT persistent sessions/redelivery are keyed by client ID, so after Vector crashes or restarts before a PUBACK, the next process generates a different ID and won't resume the session that owns the unacknowledged QoS 1 publish. That breaks the advertised at-least-once behavior for the default config; require an explicit stable client ID, or avoid enabling manual ACKs, when acknowledgements are active.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in e1ec93b. build_connector now returns a configuration error when acknowledgements is enabled and no client_id is set, since a generated ID would orphan the session's unacknowledged messages on restart. Added a unit test (acknowledgements_require_a_stable_client_id).

}

match (&self.common.user, &self.common.password) {
(Some(user), Some(password)) => {
options.set_credentials(user, password);
Expand Down Expand Up @@ -171,4 +203,25 @@ mod test {
fn generate_config() {
crate::test_util::test_generate_config::<MqttSourceConfig>();
}

#[test]
fn acknowledgements_require_a_stable_client_id() {
// Without acks, a client ID is auto-generated — fine.
let default_config = MqttSourceConfig::default();
assert!(default_config.build_connector(false).is_ok());

// With acks and no explicit client ID, building must fail (a generated ID
// would orphan the session's unacknowledged messages after a restart).
assert!(default_config.build_connector(true).is_err());

// With acks and an explicit client ID, building succeeds.
let with_client_id = MqttSourceConfig {
common: MqttCommonConfig {
client_id: Some("stable-id".to_owned()),
..Default::default()
},
..Default::default()
};
assert!(with_client_id.build_connector(true).is_ok());
}
}
109 changes: 107 additions & 2 deletions src/sources/mqtt/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
SourceSender,
common::mqtt::MqttCommonConfig,
config::{SourceConfig, SourceContext, log_schema},
event::Event,
event::{Event, EventStatus},
serde::OneOrMany,
sources::mqtt::MqttSourceConfig,
test_util::{
Expand Down Expand Up @@ -40,9 +40,21 @@ async fn send_test_events(client: &AsyncClient, topic: &str, messages: &Vec<Stri
}
}

fn message_body(event: &Event) -> String {
event
.as_log()
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy()
.into_owned()
}

async fn get_mqtt_client() -> AsyncClient {
// Unique client ID per producer: brokers that strictly enforce client-ID
// uniqueness (e.g. RabbitMQ) otherwise kick a previous connection when tests
// run concurrently, which manifests as spurious publish timeouts.
let mut mqtt_options = MqttOptions::new(
"integration-test-producer",
format!("integration-test-producer-{}", random_string(6)),
mqtt_broker_address(),
mqtt_broker_port(),
);
Expand Down Expand Up @@ -118,6 +130,99 @@ async fn mqtt_one_topic_happy() {
.await;
}

/// With end-to-end acknowledgements enabled, a message that is received but not
/// successfully delivered (the sink rejects it) must not be acknowledged to the
/// broker, so the broker redelivers it. This proves the at-least-once guarantee
/// added by the `acknowledgements` option: no data is lost when a downstream
/// failure or crash occurs before the write is confirmed.
#[tokio::test]
async fn mqtt_redelivers_unacknowledged_messages() {
trace_init();

let topic = "source-redelivery-test";
// A stable client ID so the second connection resumes the same persistent
// session (the source sets `clean_session = false`); the broker then
// redelivers any in-flight QoS 1 message that was never acknowledged.
let client_id = format!("sourceAckTest{}", random_string(6));
let message = random_string(32);

let make_config = || MqttSourceConfig {
common: MqttCommonConfig {
host: mqtt_broker_address(),
port: mqtt_broker_port(),
client_id: Some(client_id.clone()),
..Default::default()
},
topic: OneOrMany::One(topic.to_owned()),
acknowledgements: true.into(),
..MqttSourceConfig::default()
};

// Phase 1: the first instance subscribes (creating the persistent session)
// and receives the message, but its sink rejects every event, so the source
// never sends a PUBACK.
let (tx1, mut rx1) = SourceSender::new_test_finalize(EventStatus::Rejected);
let config1 = make_config();
let source1 = tokio::spawn(async move {
config1
.build(SourceContext::new_test(tx1, None))
.await
.unwrap()
.await
.unwrap()
});

// Wait for the subscription to be established before publishing.
tokio::time::sleep(Duration::from_millis(500)).await;

let producer = get_mqtt_client().await;
producer
.publish(topic, QoS::AtLeastOnce, false, message.as_bytes())
.await
.unwrap();

// The first instance must actually receive (and reject) the message so that
// it remains unacknowledged in the broker.
let first = timeout(Duration::from_secs(5), rx1.next())
.await
.expect("timed out waiting for first delivery")
.expect("source stream ended unexpectedly");
assert_eq!(message_body(&first), message);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drop the first event before aborting

In this test, SourceSender::new_test_finalize marks the event as Rejected, but the batch status is not sent back to the source until all Event handles are dropped. The first event remains in scope here until after source1.abort(), so the source never observes the rejected finalization before the connection is killed; the test would still pass if the ack-stream branch incorrectly PUBACKed Rejected batches after finalization. Drop first before the sleep/abort to actually exercise the skip-ack path.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3974323. first is now dropped immediately after the assertion (before the sleep/abort), so the source observes the Rejected finalization and exercises the skip-ack path before the connection is killed.

While here, the deferred-ack handling is extracted into a small PendingAcks type with unit tests covering: retries keep only the publishes whose try_ack failed, clear() drops stale packet ids (reconnect), and a batch finalized while disconnected is dropped rather than acked (the broker redelivers on resume).

drop(first);

// Give the source a moment to observe the rejected status (and therefore
// skip the ack), then drop the connection without acknowledging.
tokio::time::sleep(Duration::from_millis(200)).await;
source1.abort();
let _ = source1.await;

// Phase 2: a new instance resumes the same session; the broker must
// redeliver the unacknowledged message.
let (tx2, mut rx2) = SourceSender::new_test();
let config2 = make_config();
let source2 = tokio::spawn(async move {
config2
.build(SourceContext::new_test(tx2, None))
.await
.unwrap()
.await
.unwrap()
});

let redelivered = timeout(Duration::from_secs(10), rx2.next())
.await
.expect("timed out waiting for redelivery: the message was lost")
.expect("source stream ended unexpectedly");
assert_eq!(
message_body(&redelivered),
message,
"redelivered message did not match the original"
);

source2.abort();
let _ = source2.await;
}

#[tokio::test]
async fn mqtt_many_topics_happy() {
trace_init();
Expand Down
Loading
Loading