Replies: 1 comment
-
|
I like the idea, and for starters, 2 modes could be enough, e.g., Actually, why not just make use of the existing |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
The sink runtime currently configures consumers with
AutoCommit::When(AutoCommitWhen::PollingMessages)(runtime/src/sink.rs:421), meaning offsets are committed when messages are polled — before they are processed. A failed batch is silently lost.Sink must have an option to decide exactly when it is safe to advance the offset, as only the sink knows when a write is truly durable from the target's perspective.
Proposed Solution
Offset information is already available to the sink.
MessagesMetadata(passed toSink::consume) containspartition_idandcurrent_offset. EachConsumedMessagecarries its ownoffset. What is missing is a mechanism to actually call commit.The runtime passes a
CommitCallbackfunction pointer into the sink atopentime, analogous to howLogCallbackis already passed. The sink stores it and callscommit_callback(partition_id, offset)when its write to the target is durable. The runtime switches toAutoCommit::Disabledand trusts the sink to drive commits.Note: The above mentioned
CommitCallbackmechanism is a breaking change but a non-breaking alternative is proposed below.Implementation
Three pieces of work are required.
1. Configurable Commit Mode (
OffsetCommitMode)Add a new field to
SinkConfig(runtime/src/configs/connectors.rs:106):With the following variants:
Auto— default; current behaviour,AutoCommit::When(AutoCommitWhen::PollingMessages); existing sinks work unchangedSinkControlled—AutoCommit::Disabled; runtime passes aCommitCallbackto the sink atopentime, sink drives all offset commitssetup_sink_consumers(runtime/src/sink.rs:379) reads this field and sets the consumer'sAutoCommitaccordingly:2.
CommitCallbackin the ABIAdd to the SDK:
The SDK's
SinkContainerstores the callback and exposes it to the sink instance via aCommitHandlewrapper. Two options for how the callback is delivered:Option A: Extend
iggy_sink_open(breaking)iggy_sink_opengains acommit_callback: CommitCallbackparameter. All existing plugins break at the ABI level and must be recompiled. The runtime always passes either the real callback (whenSinkControlled) or a no-op stub (whenAuto).Option B: Separate
iggy_sink_set_commit_callbacksymbol (non-breaking)A new optional symbol is added:
The runtime calls it after
iggy_sink_open, only whenSinkControlled. Existing plugins that do not export this symbol continue to load and run unchanged — the runtime detects the symbol's absence viadlopen2and treats it asAutomode. Only plugins that opt intoSinkControlledin config need to export the new symbol.Option B is preferred as it preserves backwards compatibility with all existing plugins.
3. Global Offset Sender Registry
The
CommitCallbackisextern "C"— a bare function pointer that cannot capture state. It needs to route a commit call from the plugin back to the correctIggyConsumerinstance in the runtime. The solution follows the same pattern asSOURCE_SENDERS(runtime/src/source.rs:52), which solves the identical problem for sources:The registry is keyed by
(plugin_id, stream_id, topic_id, partition_id). Each part is necessary: a plugin can have multiple consumers across streams and topics, and partition IDs are only unique within a topic. The sink has all four values at hand —stream_idandtopic_idfromTopicMetadata,partition_idfromMessagesMetadata, andplugin_idbaked into theCommitHandle.The runtime, for each consumer created under
SinkControlled:store_offset_senderintoSINK_COMMIT_SENDERSkeyed by(plugin_id, stream_id, topic_id, partition_id)commit_callbackintoiggy_sink_openConsiderations:
SOURCE_SENDERS::cleanup_sender; stale entries do not cause correctness issues (the channel will be closed and sends will fail) but should be cleaned up to avoid unbounded growth-1so the sink can detect and handle the failureBeta Was this translation helpful? Give feedback.
All reactions