enhancement(mqtt source): support end-to-end acknowledgements#25666
enhancement(mqtt source): support end-to-end acknowledgements#25666antaresindustries wants to merge 1 commit into
Conversation
|
All contributors have signed the CLA ✍️ ✅ |
This comment has been minimized.
This comment has been minimized.
|
I have read the CLA Document and I hereby sign the CLA |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6ab74d5f38
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if acknowledgements { | ||
| options.set_manual_acks(true); |
There was a problem hiding this comment.
Require stable client IDs when manual acks are enabled
When acknowledgements are enabled but common.client_id is omitted, the code above still creates a random vectorSource... ID and this block turns on manual ACKs. MQTT persistent sessions/redelivery are keyed by client ID, so after Vector crashes or restarts before a PUBACK, the next process generates a different ID and won't resume the session that owns the unacknowledged QoS 1 publish. That breaks the advertised at-least-once behavior for the default config; require an explicit stable client ID, or avoid enabling manual ACKs, when acknowledgements are active.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — fixed in e1ec93b. build_connector now returns a configuration error when acknowledgements is enabled and no client_id is set, since a generated ID would orphan the session's unacknowledged messages on restart. Added a unit test (acknowledgements_require_a_stable_client_id).
| let (finalizer, mut ack_stream) = | ||
| OrderedFinalizer::<FinalizerEntry>::new(Some(shutdown.clone())); |
There was a problem hiding this comment.
Use unordered finalization for independent MQTT PUBACKs
With acknowledgements enabled, this creates a FuturesOrdered finalizer. In MQTT each PUBACK is tied to its own packet id, so if one earlier batch is slow or never finalizes, for example while a sink retries indefinitely, ack_stream.next() cannot yield later delivered publishes and their PUBACKs are withheld. Once the broker's in-flight window fills, delivery stalls even though those later events already reached sinks; use an unordered finalizer for MQTT acks.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Agreed — fixed in e1ec93b. Switched to UnorderedFinalizer. MQTT PUBACKs are independent per packet id (unlike Kafka offsets), matching the aws_sqs/amqp sources, so a slow or stuck batch no longer holds back acks for already-delivered publishes.
| // clean_session=false), giving at-least-once delivery. | ||
| if let Some((status, entry)) = entry | ||
| && status == BatchStatus::Delivered | ||
| && let Err(error) = client.ack(&entry.publish).await |
There was a problem hiding this comment.
Keep polling the event loop while queuing PUBACKs
When acknowledgements are enabled and many batches finalize while the broker/socket is slow, this await can block on rumqttc's bounded request channel (the client was created with cap 1024). The same task is the only code polling connection.poll(), which is what drains that channel and writes PUBACKs, so once the channel fills the source can deadlock and stop receiving/acking until restarted. Queue PUBACKs without awaiting capacity in this branch, or keep polling the event loop while waiting.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in e1ec93b. Switched from client.ack(...).await to the non-blocking client.try_ack(...), so the ack branch never awaits on rumqttc's bounded request channel — the same task keeps polling the event loop that drains it. A dropped ack (full channel) just defers to broker redelivery instead of risking a deadlock.
| if let Some((status, entry)) = entry | ||
| && status == BatchStatus::Delivered | ||
| && let Err(error) = client.ack(&entry.publish).await |
There was a problem hiding this comment.
Reconnect after failed batches to trigger redelivery
When a batch finalizes as Errored or Rejected, this condition just falls through: no PUBACK is sent, but the MQTT connection stays open. For QoS 1 deliveries, the broker keeps that packet in-flight until it gets a PUBACK (the comment relies on redelivery only after reconnect), so a transient sink failure can leave the message stuck indefinitely and eventually exhaust the broker's in-flight window while Vector keeps running. Force a reconnect or otherwise requeue/release the message on non-delivered statuses.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Thanks — this is a real consideration, but we'd like to defer it (and flag it as a known limitation) rather than fix it in this PR:
- It's a stall risk, not a loss risk: a batch that finalizes
Errored/Rejectedis never PUBACK'd, so the broker retains it and redelivers on the next reconnect (clean_session=false). No data is lost. - With sink-level retries,
Errored/Rejectedonly occurs after retries are exhausted — i.e. a sustained sink failure, a degenerate state where data can't be written regardless. - Forcing a reconnect on every non-delivered batch is invasive: it drops the connection for all in-flight messages and triggers mass duplicate redelivery, trading a rare stall for routine duplication.
The narrow remaining concern is head-of-line stalling if a sink rejects sustainedly while the connection stays up and the broker's in-flight window fills. Happy to address that as a follow-up (e.g. an opt-in reconnect-on-persistent-failure) if maintainers prefer — open to guidance on the desired behavior.
| struct FinalizerEntry { | ||
| publish: Publish, | ||
| } |
There was a problem hiding this comment.
Drop MQTT payloads before retaining publish handles
With acknowledgements enabled, every pending finalizer owns the whole Publish, including its payload, until all downstream sinks finalize. Under slow sinks or backpressure, large MQTT payloads are therefore retained in memory in addition to the decoded events that were sent downstream, multiplying per-message memory for all in-flight publishes. Store only the fields needed for client.ack or clear the payload before adding the entry.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in e1ec93b. The payload is cleared (publish.payload = Default::default()) before the finalizer entry is retained — ack only needs the packet id (get_ack_req uses publish.pkid), so pending acks no longer pin payloads in memory under backpressure.
27f3385 to
e1ec93b
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e1ec93b403
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // (full channel) just means the message is redelivered later. | ||
| if let Some((status, entry)) = entry | ||
| && status == BatchStatus::Delivered | ||
| && let Err(error) = client.try_ack(&entry.publish) |
There was a problem hiding this comment.
Retry PUBACKs when the request queue is full
When many batches finalize while rumqttc's bounded request channel is full, this try_ack error path just logs and drops the PUBACK for an event that has already been delivered. Because the MQTT connection remains open, the broker keeps that packet in flight until it receives a PUBACK or the client reconnects, so enough dropped acks can fill the broker's in-flight window and stop delivery even though downstream succeeded. Fresh evidence: the current code now uses try_ack and does not retain the entry for retry after a full-channel error.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in fefefac. Failed try_acks are now retained in a pending_acks buffer and retried on subsequent event-loop iterations (which drain rumqttc's request channel), rather than dropped — so a delivered message is never left pinned in the broker's in-flight window. The buffer is bounded in practice by the broker's in-flight window, since the broker stops delivering once that fills.
e1ec93b to
fefefac
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fefefacb71
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // end-to-end. Anything left unacked when the connection drops is | ||
| // redelivered by the broker on reconnect (clean_session=false + QoS | ||
| // AtLeastOnce). | ||
| if let Ok(MqttEvent::Incoming(Incoming::Publish(publish))) = mqtt_event { |
There was a problem hiding this comment.
Clear deferred PUBACKs after reconnect
When the request queue is full and some delivered publishes are kept in pending_acks, a subsequent connection.poll() error/reconnect leaves those packet ids in memory because this branch ignores all errors. The next loop retries them before polling the reconnected event loop and before the broker retransmits the unacknowledged publish; rumqttc's own reconnect cleanup deliberately drops queued PubAck requests in this situation to wait for retransmission, so under a full queue followed by a disconnect this can send unexpected stale PUBACKs and stall or close the resumed session. Clear the deferred ack list when poll() reports a connection error instead of carrying it across reconnects.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 4754a4d. The source now tracks connection state: pending_acks is cleared on both a poll() connection error and on ConnAck, and deferred PUBACKs are only retried while connected. After a reconnect the broker redelivers the unacknowledged publishes (re-acked through the normal path), so packet ids from the previous connection are never replayed into the resumed session.
fefefac to
4754a4d
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4754a4d221
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // (above) instead of dropping it. | ||
| if let Some((status, entry)) = entry | ||
| && status == BatchStatus::Delivered | ||
| && client.try_ack(&entry.publish).is_err() |
There was a problem hiding this comment.
Gate PUBACKs on the current connection
When connection.poll() has returned Err and set connected = false, downstream finalizers for publishes from that old connection can still arrive before the next ConnAck. This branch still calls try_ack unconditionally; because try_ack can succeed by only queuing a PUBACK into rumqttc's request channel, the ack is neither retained in pending_acks nor cleared by the reconnect handling, so a packet id from the dead connection can be replayed on the resumed session instead of waiting for the broker's redelivery. Fresh evidence: the retry path is gated on connected, but this new-finalization path is not.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed in ecafae7. The new-finalization path is now gated on connection state (PendingAcks::try_ack(connected, ...)), and — more robustly — each FinalizerEntry is stamped with a connection generation that is bumped on every ConnAck. A PUBACK is only sent when the entry's generation still matches the current one (should_ack_finalized_publish), so a finalizer from a dead connection is never replayed onto the resumed session; the broker redelivers it and it is acked fresh.
| .await | ||
| .expect("timed out waiting for first delivery") | ||
| .expect("source stream ended unexpectedly"); | ||
| assert_eq!(message_body(&first), message); |
There was a problem hiding this comment.
Drop the first event before aborting
In this test, SourceSender::new_test_finalize marks the event as Rejected, but the batch status is not sent back to the source until all Event handles are dropped. The first event remains in scope here until after source1.abort(), so the source never observes the rejected finalization before the connection is killed; the test would still pass if the ack-stream branch incorrectly PUBACKed Rejected batches after finalization. Drop first before the sleep/abort to actually exercise the skip-ack path.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 3974323. first is now dropped immediately after the assertion (before the sleep/abort), so the source observes the Rejected finalization and exercises the skip-ack path before the connection is killed.
While here, the deferred-ack handling is extracted into a small PendingAcks type with unit tests covering: retries keep only the publishes whose try_ack failed, clear() drops stale packet ids (reconnect), and a batch finalized while disconnected is dropped rather than acked (the broker redelivers on resume).
4754a4d to
3974323
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3974323d15
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| Ok(MqttEvent::Incoming(Incoming::ConnAck(_))) => { | ||
| connected = true; | ||
| pending_acks.clear(); |
There was a problem hiding this comment.
Flush stale finalizers when reconnecting
When the broker replies with a fresh session after a disconnect (for example ConnAck.session_present == false after broker restart or session expiry), marking the source connected while old UnorderedFinalizer entries are still pending lets a batch from the previous connection finalize later and try_ack its old packet id on the new connection. If the broker has already reused that packet id for a new publish, this can acknowledge the new delivery before its events are delivered; clear/flush old finalizer entries on disconnect or ConnAck, or track a connection generation before enabling PUBACKs again.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed in ecafae7 using the connection-generation approach you suggested. connection_generation is incremented on each ConnAck; each FinalizerEntry records the generation it was created on; and should_ack_finalized_publish only acks when entry_generation == connection_generation. So a batch from a previous connection finalizing after a (fresh-session) reconnect won't try_ack its old packet id on the new session — the broker redelivers it and it's acked fresh. Added a unit test (finalized_publish_must_match_current_connection_generation).
3974323 to
ecafae7
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ecafae7d5e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| self.process_message( | ||
| publish, | ||
| &mut out, | ||
| finalizer.as_ref(), | ||
| connection_generation, |
There was a problem hiding this comment.
Reject QoS0 deliveries when acknowledgements are enabled
When acknowledgements are enabled but the broker delivers a QoS0 publish (for example because the publisher used QoS0, or the broker granted a lower subscription QoS), this path still forwards the event under the acknowledgement flow even though there is no PUBACK to defer; try_ack is a no-op for QoS0 publishes. In that scenario, a crash after receiving the publish but before sink delivery loses the message instead of causing broker redelivery, so Vector can appear to have e2e acks enabled while receiving at-most-once data. Check the incoming/granted QoS and fail or warn when it cannot support the advertised guarantee.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed in 69e35c3. When acknowledgements are enabled, a publish is only registered for deferred ack if its QoS supports one (>= 1); a QoS 0 delivery is forwarded best-effort with a rate-limited warning rather than being given a no-op finalizer that would imply a guarantee it can't provide. (QoS 0 is inherently at-most-once, so this surfaces the limitation rather than making QoS 0 lossless.)
Two related warnings were added: on SubAck if the broker grants a subscription QoS below 1, and on ConnAck if the broker starts a fresh session (session_present == false) while acks are enabled (previous-session messages won't be redelivered). New unit tests cover the QoS gating and the session warning.
ecafae7 to
69e35c3
Compare
The mqtt source now participates in Vector's end-to-end acknowledgement framework. When acknowledgements are enabled, the QoS 1 PUBACK is deferred via an OrderedFinalizer until the events are delivered to all sinks; with the source's existing clean_session=false + QoS AtLeastOnce subscription, unacknowledged messages are redelivered after a reconnect (at-least-once). Closes vectordotdev#21967
69e35c3 to
eed9c59
Compare
|
Rebased to latest, ready for review! |
Closes #21967
What
Makes the
mqttsource participate in Vector's end-to-end acknowledgementframework, so it can provide at-least-once delivery instead of best-effort.
acknowledgementsoption (SourceAcknowledgementsConfig, the standardbool_or_structform used by other sources) and flipscan_acknowledge()totrue.MqttOptions::set_manual_acks(true)so rumqttc stops auto-acking incomingpublishes.
BatchNotifier; the publish is registered with anOrderedFinalizer, and the QoS 1PUBACKis sent (client.ack(&publish))only once the batch reaches
BatchStatus::Delivered. OnErrored/Rejectedthe message is left unacked.
This mirrors the existing
kafkasource's offset-commit-on-delivered pattern.Why
The source previously created a
BatchNotifierbut discarded the receiver andauto-acked every message on arrival, so events could be lost if Vector or a
downstream sink failed before the write completed. Because the source already
uses
clean_session = falseand subscribes at QoSAtLeastOnce, deferring thePUBACKlets the broker redeliver any unacknowledged message after a reconnect —giving genuine at-least-once delivery. This is the work tracked in #21967.
Behavior / compatibility
acknowledgementsoff → manual acks off → rumqttcauto-acks exactly as before). Opt-in only.
This matches the guarantee of other ack-capable sources.
How it was tested
On Rust 1.92 (Vector's pinned toolchain):
cargo test -p vector --no-default-features --features sources-mqtt --lib sources::mqtt— builds; existinggenerate_configtest passes.cargo fmt -p vector --check— clean.cargo clippy -p vector --no-default-features --features sources-mqtt— clean.cargo build --releasewith the mqtt source + influxdb/prometheus/blackholesinks + remap — produces a working binary;
vector listshows the components.mqtt_redelivers_unacknowledged_messagestosrc/sources/mqtt/integration_tests.rs: with acks enabled, a first sourceinstance rejects the event (no PUBACK) and disconnects; a second instance
resuming the same persistent session asserts the broker redelivers the message.
sources::mqttintegration suite against a real broker(RabbitMQ MQTT) — all four tests pass, in parallel, stable across repeated runs.
get_mqtt_client()now uses a unique clientID per producer. The previous fixed ID (
integration-test-producer) causedspurious publish timeouts on brokers that strictly enforce client-ID uniqueness
(RabbitMQ kicks the older connection) when tests run concurrently; emqx tolerated
it. This makes the suite broker-agnostic.
(
website/cue/reference/components/sources/generated/mqtt.cuegains the standardacknowledgementsblock).(
changelog.d/21967_mqtt_source_acknowledgements.enhancement.md).Follow-ups / notes for reviewers
master(parentb6d00bf); the branch is one commit aheadof master and re-validated on Rust 1.95 (fmt, check, clippy, integration-test
compile all clean). The generated cue
acknowledgementsblock was confirmedunchanged vs master, but a maintainer re-running
make generate-component-docsto confirm verbatim output is still worthwhile.