Skip to content

[flink] Decouple Committer.Context from operator-only assumptions#8221

Open
ifndef-SleePy wants to merge 5 commits into
apache:masterfrom
ifndef-SleePy:coordinator-commit-pr1-state-store-refactor
Open

[flink] Decouple Committer.Context from operator-only assumptions#8221
ifndef-SleePy wants to merge 5 commits into
apache:masterfrom
ifndef-SleePy:coordinator-commit-pr1-state-store-refactor

Conversation

@ifndef-SleePy

Copy link
Copy Markdown

Purpose

Tracked under: #8220

Remove the assumption that Committer always runs inside a Flink operator, so the same committer logic can be hosted by either an operator or a OperatorCoordinator (introduced in a follow-up PR under #8220 ).

This PR is a pure refactor — no behavior change.

Changes

  • Introduce a StateStore abstraction for committer state access, with operator-backed and memory-backed implementations.
  • Route Committer.Context state access through StateStore; decouple CommittableStateManager from Flink's state initialization context.
  • Move the listener-state snapshot into StoreCommitter.snapshotState, so the committer owns its state lifecycle end-to-end.
  • Relax the metric group exposed via Committer.Context so it is not tied to OperatorMetricGroup, and adapt CommitterMetrics accordingly.

Tests

Existing committer / sink unit tests pass unchanged.

liubiao.leo added 5 commits May 29, 2026 20:41
Adds the StateStore interface and an OperatorStateStore-backed implementation; no existing code path is touched yet.
Replaces direct OperatorStateStore usage in Committer.Context and the commit listeners with the new StateStore, and adds a no-op default Committer.snapshotState().
Interface now takes Committer.Context and committables instead of Flink state contexts; implementations acquire state via Committer.Context.stateStore().
CommitterOperator now invokes committer.snapshotState() at the snapshot boundary, and the listener-state persistence is relocated from groupByCheckpoint to the dedicated snapshotState override (forwarded by StoreMultiCommitter).
…text

Widens Committer.Context.metricGroup() to MetricGroup; CommitterMetrics adapts to either an OperatorMetricGroup (using its IO counters) or a plain MetricGroup (using local SimpleCounters).
@ifndef-SleePy ifndef-SleePy force-pushed the coordinator-commit-pr1-state-store-refactor branch from ba19a05 to 546a2dd Compare June 12, 2026 13:08
@ifndef-SleePy

Copy link
Copy Markdown
Author

Please take a look @JingsongLi .

BTW I couldn't reproduce the MySqlSyncTableActionITCase.testAllTypes failure locally — could you help re-run that job?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant