Skip to content

coordinator: persist maintainer epochs before ownership changes#5434

Merged
ti-chi-bot[bot] merged 5 commits into
pingcap:masterfrom
hongyunyan:codex/pr-5182-epoch-persistence
Jun 23, 2026
Merged

coordinator: persist maintainer epochs before ownership changes#5434
ti-chi-bot[bot] merged 5 commits into
pingcap:masterfrom
hongyunyan:codex/pr-5182-epoch-persistence

Conversation

@hongyunyan

@hongyunyan hongyunyan commented Jun 18, 2026

Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: ref #5083

This is PR 1 of 3 split from #5182.

Background:

During maintainer failover, the coordinator can schedule a new maintainer while
delayed requests from the previous maintainer are still in flight. Receiver-side
fencing only works if every new ownership attempt has a persisted, monotonic
maintainer epoch before any add or move request is sent.

Motivation:

The old code generated owner epochs from in-memory state and resumed changefeeds
with a normal metadata write. That made it hard to prove that a new maintainer
owner is always ordered after the previously persisted owner, especially across
coordinator failover, resume, retry, and move paths.

What is changed and how it works?

This PR adds the coordinator-side epoch persistence foundation:

  • Adds optional maintainer epoch fields to maintainer lifecycle, scheduling,
    heartbeat, and close messages.
  • Adds common maintainer epoch matching helpers and PD TSO based epoch generation.
  • Adds BumpChangefeedEpoch to atomically persist a strictly newer changefeed
    epoch, optionally with status updates under the same etcd transaction.
  • Uses the persisted epoch bump before add/move operators, resume, and warning
    retry scheduling can create a new maintainer owner.
  • Keeps add-maintainer config bytes synchronized with the latest persisted
    ChangeFeedInfo.
  • Makes coordinator bootstrap select the current-epoch maintainer and stop stale
    reported owners with their original epoch.
  • Carries the origin epoch through stop and move operators so old owners are
    fenced by the epoch they actually own.

Stack:

  • #5434: coordinator epoch persistence and owner scheduling foundation.
  • #5435: maintainer and dispatcher-manager receiver-side epoch fence.
  • #5436: table trigger takeover hardening and integration coverage.

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No expected performance regression. Epoch bumps happen on ownership-changing
control paths such as add, move, resume, and retry scheduling, not in the event
write path.

The protobuf fields are optional. Epoch 0 remains the compatibility value used
by old components until later PRs enforce strict receiver-side fences.

Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

None

Validation

  • make generate-protobuf
  • make fmt
  • go test ./coordinator/changefeed ./coordinator ./coordinator/operator ./pkg/pdutil

Summary by CodeRabbit

  • New Features
    • Added maintainer epoch propagation through heartbeat and maintainer lifecycle requests, including resume and stop flows.
  • Bug Fixes
    • Prevented stale maintainer reports from being applied by dropping mismatched heartbeat status.
    • Updated bootstrap/stop/remove behavior to fence stale owners while preserving the reported maintainer epoch.
  • Improvements
    • Tightened changefeed initialization/resume validation and improved warning-state handling by bumping ownership epoch before stopping.
  • Tests
    • Expanded coverage for epoch bumping, resume behavior, and maintainer-epoch propagation.

@ti-chi-bot

ti-chi-bot Bot commented Jun 18, 2026

Copy link
Copy Markdown

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot Bot added release-note-none Denotes a PR that doesn't merit a release note. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. do-not-merge/needs-triage-completed labels Jun 18, 2026
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1654d702-57be-4aa1-829a-cd70b7f63b7a

📥 Commits

Reviewing files that changed from the base of the PR and between 06a6788 and 716e60f.

📒 Files selected for processing (1)
  • pkg/common/maintainer_epoch.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/common/maintainer_epoch.go

📝 Walkthrough

Walkthrough

This PR introduces a monotonically increasing maintainer_epoch field across all heartbeat protobuf messages, adds BumpChangefeedEpoch to the etcd backend with CAS-retry logic, replaces boolean flags in MoveMaintainerOperator with a state machine, adds epoch-bump coordination to OperatorController, and wires epoch fencing into coordinator bootstrap selection, heartbeat filtering, resume, and warning-state handling.

Changes

Maintainer Epoch Fencing

Layer / File(s) Summary
Epoch primitives and protobuf wire format
pkg/common/maintainer_epoch.go, pkg/common/maintainer_epoch_test.go, heartbeatpb/heartbeat.proto, pkg/common/format.go
Adds MaintainerEpochMatches (0-wildcard compatibility) and AdvanceChangefeedEpoch (with overflow guard). Adds maintainer_epoch field to all 12 protobuf message types. Extends FormatMaintainerStatus output header.
Backend interface: BumpChangefeedEpoch and updated ResumeChangefeed
coordinator/changefeed/changefeed_db_backend.go, coordinator/changefeed/etcd_backend.go, coordinator/changefeed/mock/changefeed_db_backend.go, coordinator/changefeed/etcd_backend_test.go
Defines EpochBumpOptions; adds BumpChangefeedEpoch and updates ResumeChangefeed to accept candidateEpoch in Backend interface. Implements atomic CAS-retry epoch bump in EtcdBackend. Updates mock and adds tests for basic bump, UpdateStatus mode, and CAS-conflict retry.
Changefeed message epoch propagation
coordinator/changefeed/changefeed.go, coordinator/changefeed/changefeed_test.go
Removes stored configBytes; NewChangefeed/SetInfo now panic on nil. NewAddMaintainerMessage marshals config on-demand with MaintainerEpoch. GetStatusForResume clone preserves MaintainerEpoch. RemoveMaintainerMessage gains maintainerEpoch parameter.
Stop and Add operator epoch enforcement
coordinator/operator/operator_stop.go, coordinator/operator/operator_stop_test.go, coordinator/operator/operator_add.go, coordinator/operator/operator_add_test.go
StopChangefeedOperator gains maintainerEpoch and gates Check on epoch match. AddMaintainerOperator.Check additionally requires epoch match. Constructors and tests updated; rolling-upgrade compatibility test added.
MoveMaintainerOperator state machine refactor
coordinator/operator/operator_move.go, coordinator/operator/operator_move_test.go
Replaces boolean flags with moveMaintainerState enum. Captures originMaintainerEpoch. Rewrites Check/Schedule/OnNodeRemove via state transitions and epoch gating. Introduces finishAsAbsentLocked to prevent dual live ownership.
OperatorController epoch bump coordination
coordinator/operator/operator_controller.go, coordinator/operator/operator_controller_test.go
Adds pdClient and epochBumping map to Controller; NewOperatorController gains pdClient parameter. AddOperator performs async BumpChangefeedEpoch outside mutex with pre/post slot checks. Stop methods route through stopChangefeed with maintainerEpoch including move origin epoch.
Coordinator bootstrap and heartbeat epoch filtering
coordinator/controller.go
handleBootstrapResponses aggregates multi-maintainer reports per changefeed. finishBootstrap uses selectBootstrapMaintainer to pick epoch-matched owner and stopStaleBootstrapMaintainers to fence stale owners. handleSingleMaintainerStatus drops mismatched epochs.
Coordinator resume and warning-state epoch bump
coordinator/coordinator.go
ResumeChangefeed generates a new owner epoch and applies backend-returned info. updateChangefeedEpoch accepts EpochBumpOptions and returns error. StateWarning in handleStateChange bumps epoch with UpdateError then stops with prior epoch.
Tests: controller, coordinator, and scheduler wiring
coordinator/controller_test.go, coordinator/coordinator_test.go, coordinator/controller_drain_test.go, coordinator/create_changefeed_gc_test.go, coordinator/scheduler/balance_test.go, coordinator/scheduler/basic_test.go, coordinator/scheduler/drain_test.go
Adds epoch-correctness tests for stale drop, bootstrap stale fencing, resume, and warning-state bump. Wires mockBumpChangefeedEpoch into scheduling tests. Updates all NewOperatorController call sites with the new nil pdClient argument.

Sequence Diagram(s)

sequenceDiagram
  participant Coordinator
  participant OperatorController
  participant Backend as EtcdBackend
  participant etcd

  rect rgba(70, 130, 180, 0.5)
    Note over Coordinator,etcd: AddOperator with epoch bump
    Coordinator->>OperatorController: AddOperator(addMaintainerOp)
    OperatorController->>OperatorController: precheck slot free, reserve epochBumping[id]
    OperatorController->>Backend: BumpChangefeedEpoch(id, candidateEpoch, opts)
    loop CAS retry
      Backend->>etcd: GET info + status
      Backend->>Backend: AdvanceChangefeedEpoch(candidate, current)
      Backend->>etcd: CAS PUT info [+ status]
    end
    Backend-->>OperatorController: updated ChangeFeedInfo (bumped epoch)
    OperatorController->>OperatorController: recheck slot, push operator, clear epochBumping[id]
  end

  rect rgba(60, 179, 113, 0.5)
    Note over Coordinator,OperatorController: Heartbeat epoch admission
    Coordinator->>Coordinator: handleSingleMaintainerStatus(status)
    Coordinator->>Coordinator: MaintainerEpochMatches(status.MaintainerEpoch, cf.Epoch)?
    alt mismatch
      Coordinator->>Coordinator: drop status with warning
    else match
      Coordinator->>OperatorController: UpdateOperatorStatus(status)
    end
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • pingcap/ticdc#4997: Modifies coordinator/coordinator.go's handleStateChange flow around warning-state handling and persistence, directly adjacent to the warning-state epoch-bump path added in this PR.

Suggested reviewers

  • wk989898
  • lidezhu

Poem

🐇 Hoppity-hop through the epoch lane,
Each maintainer sealed with a number's reign,
Stale owners fenced with a CAS-retry spin,
No ghost maintainers shall sneak back in!
The state machine ticks — origin, then target's done,
New epochs bloom and old epochs run. 🌸

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 10.75% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: persisting maintainer epochs before ownership changes, which is the core objective of this PR.
Description check ✅ Passed The PR description comprehensively addresses all required template sections: issue reference (ref #5083), detailed problem statement, comprehensive list of changes, test coverage (unit tests), compatibility considerations, and release notes.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.12.2)

Command failed


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jun 18, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a fencing mechanism using maintainer epochs to prevent stale maintainers from making decisions or blocking actions during coordinator failover or rolling upgrades. It replaces the resume changefeed operation with an atomic epoch-bumping mechanism in the backend, updates the coordinator and operators to validate epochs, and adds extensive tests. The review feedback highlights several critical reliability issues, including potential nil pointer dereference panics in NewAddMaintainerMessage and the Check methods of the stop and add operators when handling nil statuses, as well as an out-of-sync state issue in AddOperator if the operator is rejected after an epoch has already been bumped in etcd.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +153 to +158
if !oc.recheckAddOperatorLocked(op, cf) {
return false
}
cf.SetInfo(info)
oc.pushOperator(op)
return true

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If recheckAddOperatorLocked fails (e.g., because another operator was added concurrently), AddOperator returns false without updating the in-memory changefeed info. However, the epoch has already been successfully bumped in etcd via bumpChangefeedEpoch. This leaves the in-memory changefeed epoch out of sync with etcd, which can cause subsequent coordinator actions or messages to use a stale epoch and get rejected. We should update the in-memory info if the changefeed still exists and is the same instance.

	current := oc.changefeedDB.GetByID(op.ID())
	if current == cf {
		cf.SetInfo(info)
	}

	if !oc.recheckAddOperatorLocked(op, cf) {
		return false
	}
	oc.pushOperator(op)
	return true

Comment on lines 275 to 284
return messaging.NewSingleTargetMessage(server,
messaging.MaintainerManagerTopic,
&heartbeatpb.AddMaintainerRequest{
Id: c.ID.ToPB(),
CheckpointTs: c.GetStatus().CheckpointTs,
Config: c.configBytes,
Config: []byte(configData),
IsNewChangefeed: c.isNew,
KeyspaceId: c.GetKeyspaceID(),
KeyspaceId: info.KeyspaceID,
MaintainerEpoch: info.Epoch,
})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

c.GetStatus() can return nil (as explicitly handled in GetStatusForResume). Calling c.GetStatus().CheckpointTs directly without a nil check can cause a nil pointer dereference panic. We should safely fall back to c.GetLastSavedCheckPointTs() if the status is nil.

	checkpointTs := c.GetLastSavedCheckPointTs()
	if status := c.GetStatus(); status != nil {
		checkpointTs = status.CheckpointTs
	}
	return messaging.NewSingleTargetMessage(server,
		messaging.MaintainerManagerTopic,
		&heartbeatpb.AddMaintainerRequest{
			Id:              c.ID.ToPB(),
			CheckpointTs:    checkpointTs,
			Config:          []byte(configData),
			IsNewChangefeed: c.isNew,
			KeyspaceId:      info.KeyspaceID,
			MaintainerEpoch: info.Epoch,
		})

Comment on lines +63 to 72
func (m *StopChangefeedOperator) Check(from node.ID, status *heartbeatpb.MaintainerStatus) {
if !m.finished.Load() &&
from == m.nodeID &&
common.MaintainerEpochMatches(status.MaintainerEpoch, m.maintainerEpoch) &&
status.State != heartbeatpb.ComponentState_Working {
log.Info("maintainer report non-working status",
zap.Stringer("maintainer", m.cfID))
m.finished.Store(true)
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Check method does not check if status is nil before accessing status.MaintainerEpoch and status.State. This can lead to a nil pointer dereference panic. We should add a nil check at the beginning of the method.

func (m *StopChangefeedOperator) Check(from node.ID, status *heartbeatpb.MaintainerStatus) {
	if status == nil {
		return
	}
	if !m.finished.Load() &&
		from == m.nodeID &&
		common.MaintainerEpochMatches(status.MaintainerEpoch, m.maintainerEpoch) &&
		status.State != heartbeatpb.ComponentState_Working {
		log.Info("maintainer report non-working status",
			zap.Stringer("maintainer", m.cfID))
		m.finished.Store(true)
	}
}

Comment on lines 74 to 76
if !m.finished.Load() && from == m.dest &&
common.MaintainerEpochMatches(status.MaintainerEpoch, m.cf.GetInfo().Epoch) &&
status.State == heartbeatpb.ComponentState_Working &&

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Check method does not check if status is nil before accessing status.MaintainerEpoch and status.State. This can lead to a nil pointer dereference panic. We should add a nil check at the beginning of the method.

	if status == nil {
		return
	}
	if !m.finished.Load() && from == m.dest &&
		common.MaintainerEpochMatches(status.MaintainerEpoch, m.cf.GetInfo().Epoch) &&
		status.State == heartbeatpb.ComponentState_Working &&

@hongyunyan hongyunyan marked this pull request as ready for review June 18, 2026 02:51
@ti-chi-bot ti-chi-bot Bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jun 18, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (5)
coordinator/operator/operator_stop_test.go (1)

41-82: ⚡ Quick win

Add a focused test for stop-operator epoch gating.

These updates only adapt constructor callsites. Please add a direct Check test that confirms mismatched MaintainerEpoch does not finish, and matched epoch does.

As per coding guidelines, "Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/operator/operator_stop_test.go` around lines 41 - 82, Add a new
focused test function (e.g., TestStopChangefeedOperator_Check) that directly
tests the epoch gating behavior of the StopChangefeedOperator. This test should
create instances of StopChangefeedOperator with different MaintainerEpoch
values, invoke the Check method, and verify two scenarios: first, that when the
operator's MaintainerEpoch does not match the changefeed's epoch, the finished
flag remains false, and second, that when the epochs match, the Check method
properly completes the operation and sets the finished flag to true. This
ensures deterministic testing of the core epoch gating logic as referenced in
the testing guidelines.

Source: Coding guidelines

coordinator/operator/operator_controller_test.go (1)

91-91: ⚡ Quick win

Rename the new tests to avoid underscores.

These new function names conflict with the repository’s Go naming rule; e.g. use TestControllerStopChangefeedWithMaintainerEpoch.

Proposed rename diff
-func TestController_StopChangefeedWithMaintainerEpoch(t *testing.T) {
+func TestControllerStopChangefeedWithMaintainerEpoch(t *testing.T) {
@@
-func TestController_StopRemoteMaintainerWithMaintainerEpoch(t *testing.T) {
+func TestControllerStopRemoteMaintainerWithMaintainerEpoch(t *testing.T) {
@@
-func TestController_AddOperatorBumpsAndPersistsOwnershipEpoch(t *testing.T) {
+func TestControllerAddOperatorBumpsAndPersistsOwnershipEpoch(t *testing.T) {
@@
-func TestController_AddOperatorRejectsConcurrentEpochBump(t *testing.T) {
+func TestControllerAddOperatorRejectsConcurrentEpochBump(t *testing.T) {
@@
-func TestController_StopChangefeedDuringMoveUsesOriginEpoch(t *testing.T) {
+func TestControllerStopChangefeedDuringMoveUsesOriginEpoch(t *testing.T) {
@@
-func TestController_AddOperatorEpochBumpDoesNotBlockStatusAndStop(t *testing.T) {
+func TestControllerAddOperatorEpochBumpDoesNotBlockStatusAndStop(t *testing.T) {

As per coding guidelines, "Functions should use camelCase naming and do not include underscores (e.g., getPartitionNum, not get_partition_num)."

Also applies to: 111-111, 226-226, 309-309, 367-367, 445-445

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/operator/operator_controller_test.go` at line 91, The test
function names in the file use underscores which violates the repository's Go
naming convention requiring camelCase without underscores. Rename all test
functions to remove underscores: change
TestController_StopChangefeedWithMaintainerEpoch to
TestControllerStopChangefeedWithMaintainerEpoch, and apply the same camelCase
formatting to all other affected test functions at the specified locations.
Replace all underscores between words with direct concatenation while
maintaining proper camelCase capitalization.

Source: Coding guidelines

coordinator/controller_test.go (2)

744-751: ⚡ Quick win

Assert resume clears the persisted runtime error.

ResumeChangefeed sets UpdateError: true with a nil error; add that assertion to the shared helper so the resume tests catch regressions that leave stale errors persisted.

Proposed test assertion
 			require.NotNil(t, options.State)
 			require.Equal(t, config.StateNormal, *options.State)
 			require.True(t, options.UpdateStatus)
+			require.True(t, options.UpdateError)
+			require.Nil(t, options.Error)
 			require.Equal(t, checkpointTs, options.CheckpointTs)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/controller_test.go` around lines 744 - 751, The test for
ResumeChangefeed is missing an assertion to verify that resuming a changefeed
clears any persisted runtime errors. In the DoAndReturn closure of the
BumpChangefeedEpoch mock expectation, add two assertions after the existing
require statements: one to verify that options.UpdateError is true, and another
to verify that options.Error is nil. This ensures that the resume operation
properly clears stale errors that may have been persisted from previous
failures.

36-36: ⚡ Quick win

Use the default scheduler import name.

There is no conflicting scheduler import in this file, so the alias is unnecessary.

As per coding guidelines, **/*.go: “Do not rename imports unless required to resolve a package name conflict or to follow an existing local convention.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/controller_test.go` at line 36, The import statement for
github.com/pingcap/ticdc/pkg/scheduler is using an unnecessary alias
pkgscheduler when there are no conflicting scheduler imports in this file.
Remove the pkgscheduler alias prefix from the import and use the default
scheduler import name instead, following the coding guideline that aliases
should only be used to resolve package name conflicts or follow existing local
conventions.

Source: Coding guidelines

coordinator/coordinator_test.go (1)

291-298: ⚡ Quick win

Propagate the add-request epoch in the mock maintainer.

The mock reports epoch 0, so coordinator scheduling tests pass via compatibility matching and would miss a regression where AddMaintainerRequest.MaintainerEpoch is wrong. Copy req.MaintainerEpoch into the created status.

Proposed mock update
 			cf = &heartbeatpb.MaintainerStatus{
 				ChangefeedID: req.GetId(),
 				FeedState:    "normal",
 				State:        heartbeatpb.ComponentState_Working,
 				CheckpointTs: req.CheckpointTs,
+				MaintainerEpoch: req.MaintainerEpoch,
 				// In these coordinator tests, the mock maintainer is considered immediately bootstrapped.
 				BootstrapDone: true,
 			}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/coordinator_test.go` around lines 291 - 298, The mock maintainer
status in the heartbeatpb.MaintainerStatus struct creation is not propagating
the epoch from the request, which allows tests to pass even if MaintainerEpoch
is incorrectly set. Add a field assignment to the MaintainerStatus struct that
copies the epoch value from req.MaintainerEpoch to ensure the mock properly
reflects the request epoch and prevents regressions in epoch handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@coordinator/controller.go`:
- Around line 1190-1194: In the code block where BumpChangefeedEpoch is called,
after checking for the error returned by c.backend.BumpChangefeedEpoch, add a
nil check for the info result before calling cf.SetInfo(info). If info is nil,
return an error using errors.Trace or errors.New to prevent nil values from
being set, as this could cause nil dereferences later when cf.GetInfo() is
called in heartbeat or state operations. This guards against the case where
BumpChangefeedEpoch returns a nil result even though no error was reported,
mirroring the same validation pattern that ResumeChangefeed already implements.
- Around line 616-623: The RemoveMaintainerMessage command sent via
c.messageCenter.SendCommand is currently discarding any error returned (using
underscore assignment), which means stale maintainers will not be cleaned up if
the command fails. Replace the underscore assignment with proper error handling
by capturing the error returned from SendCommand, logging the failure with
appropriate context, and routing the failure into a retry mechanism for cleanup
instead of silently ignoring it. Apply this same fix to all occurrences at the
mentioned line ranges (760-767 and 847-854) where similar
RemoveMaintainerMessage commands are being sent.

In `@coordinator/operator/operator_stop.go`:
- Around line 63-67: The Check method in the StopChangefeedOperator class
dereferences the status parameter at line 66 when accessing
status.MaintainerEpoch without first verifying that status is not nil. Add a nil
check for the status parameter at the beginning of the Check method before any
dereferencing occurs, and early return if status is nil, following the same
pattern used in the Check methods of operator_add.go and operator_move.go to
maintain consistency across similar operator implementations.

---

Nitpick comments:
In `@coordinator/controller_test.go`:
- Around line 744-751: The test for ResumeChangefeed is missing an assertion to
verify that resuming a changefeed clears any persisted runtime errors. In the
DoAndReturn closure of the BumpChangefeedEpoch mock expectation, add two
assertions after the existing require statements: one to verify that
options.UpdateError is true, and another to verify that options.Error is nil.
This ensures that the resume operation properly clears stale errors that may
have been persisted from previous failures.
- Line 36: The import statement for github.com/pingcap/ticdc/pkg/scheduler is
using an unnecessary alias pkgscheduler when there are no conflicting scheduler
imports in this file. Remove the pkgscheduler alias prefix from the import and
use the default scheduler import name instead, following the coding guideline
that aliases should only be used to resolve package name conflicts or follow
existing local conventions.

In `@coordinator/coordinator_test.go`:
- Around line 291-298: The mock maintainer status in the
heartbeatpb.MaintainerStatus struct creation is not propagating the epoch from
the request, which allows tests to pass even if MaintainerEpoch is incorrectly
set. Add a field assignment to the MaintainerStatus struct that copies the epoch
value from req.MaintainerEpoch to ensure the mock properly reflects the request
epoch and prevents regressions in epoch handling.

In `@coordinator/operator/operator_controller_test.go`:
- Line 91: The test function names in the file use underscores which violates
the repository's Go naming convention requiring camelCase without underscores.
Rename all test functions to remove underscores: change
TestController_StopChangefeedWithMaintainerEpoch to
TestControllerStopChangefeedWithMaintainerEpoch, and apply the same camelCase
formatting to all other affected test functions at the specified locations.
Replace all underscores between words with direct concatenation while
maintaining proper camelCase capitalization.

In `@coordinator/operator/operator_stop_test.go`:
- Around line 41-82: Add a new focused test function (e.g.,
TestStopChangefeedOperator_Check) that directly tests the epoch gating behavior
of the StopChangefeedOperator. This test should create instances of
StopChangefeedOperator with different MaintainerEpoch values, invoke the Check
method, and verify two scenarios: first, that when the operator's
MaintainerEpoch does not match the changefeed's epoch, the finished flag remains
false, and second, that when the epochs match, the Check method properly
completes the operation and sets the finished flag to true. This ensures
deterministic testing of the core epoch gating logic as referenced in the
testing guidelines.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: aab53221-9828-4d4f-b61b-96ea9539967f

📥 Commits

Reviewing files that changed from the base of the PR and between 05aa985 and 57df930.

⛔ Files ignored due to path filters (1)
  • heartbeatpb/heartbeat.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (28)
  • coordinator/changefeed/changefeed.go
  • coordinator/changefeed/changefeed_db_backend.go
  • coordinator/changefeed/changefeed_test.go
  • coordinator/changefeed/etcd_backend.go
  • coordinator/changefeed/etcd_backend_test.go
  • coordinator/changefeed/mock/changefeed_db_backend.go
  • coordinator/controller.go
  • coordinator/controller_drain_test.go
  • coordinator/controller_test.go
  • coordinator/coordinator.go
  • coordinator/coordinator_test.go
  • coordinator/create_changefeed_gc_test.go
  • coordinator/operator/operator_add.go
  • coordinator/operator/operator_add_test.go
  • coordinator/operator/operator_controller.go
  • coordinator/operator/operator_controller_test.go
  • coordinator/operator/operator_move.go
  • coordinator/operator/operator_move_test.go
  • coordinator/operator/operator_stop.go
  • coordinator/operator/operator_stop_test.go
  • coordinator/scheduler/balance_test.go
  • coordinator/scheduler/basic_test.go
  • coordinator/scheduler/drain_test.go
  • heartbeatpb/heartbeat.proto
  • pkg/common/format.go
  • pkg/common/maintainer_epoch.go
  • pkg/pdutil/utils.go
  • pkg/pdutil/utils_test.go

Comment thread coordinator/controller.go
Comment on lines +616 to +623
_ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(
common.DefaultKeyspaceID,
cfID,
from,
true,
true,
status.MaintainerEpoch,
))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Retry or at least surface failed stale-maintainer removals.

These direct remove commands are the only cleanup path in the no-local-changefeed and operator-slot-occupied branches. If SendCommand fails, the stale maintainer can keep heartbeating while future reports are only ignored; log the failure and route it into a retried cleanup path instead of discarding the error.

Also applies to: 760-767, 847-854

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/controller.go` around lines 616 - 623, The
RemoveMaintainerMessage command sent via c.messageCenter.SendCommand is
currently discarding any error returned (using underscore assignment), which
means stale maintainers will not be cleaned up if the command fails. Replace the
underscore assignment with proper error handling by capturing the error returned
from SendCommand, logging the failure with appropriate context, and routing the
failure into a retry mechanism for cleanup instead of silently ignoring it.
Apply this same fix to all occurrences at the mentioned line ranges (760-767 and
847-854) where similar RemoveMaintainerMessage commands are being sent.

Comment thread coordinator/controller.go
Comment on lines +63 to +67
func (m *StopChangefeedOperator) Check(from node.ID, status *heartbeatpb.MaintainerStatus) {
if !m.finished.Load() &&
from == m.nodeID &&
common.MaintainerEpochMatches(status.MaintainerEpoch, m.maintainerEpoch) &&
status.State != heartbeatpb.ComponentState_Working {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard against nil maintainer status in Check.

Line 66 dereferences status without a nil check. coordinator/operator/operator_add.go (Line 68) and coordinator/operator/operator_move.go (Line 76) both early-return on nil, so this path can panic if the same callback contract passes a nil status.

💡 Suggested fix
 func (m *StopChangefeedOperator) Check(from node.ID, status *heartbeatpb.MaintainerStatus) {
+	if status == nil {
+		return
+	}
 	if !m.finished.Load() &&
 		from == m.nodeID &&
 		common.MaintainerEpochMatches(status.MaintainerEpoch, m.maintainerEpoch) &&
 		status.State != heartbeatpb.ComponentState_Working {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/operator/operator_stop.go` around lines 63 - 67, The Check method
in the StopChangefeedOperator class dereferences the status parameter at line 66
when accessing status.MaintainerEpoch without first verifying that status is not
nil. Add a nil check for the status parameter at the beginning of the Check
method before any dereferencing occurs, and early return if status is nil,
following the same pattern used in the Check methods of operator_add.go and
operator_move.go to maintain consistency across similar operator
implementations.

Comment thread pkg/pdutil/utils.go Outdated
}

// AdvanceChangefeedEpoch returns max(candidate, current+1).
func AdvanceChangefeedEpoch(candidate, current uint64) (uint64, error) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changefeed epoch should not be a part of the pd util?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I agree that the non-PD part should not live in pdutil. I moved AdvanceChangefeedEpoch to pkg/common because it only compares persisted/candidate epochs and has no PD dependency. GenerateChangefeedEpoch stays in pdutil since it is specifically backed by PD TSO generation.

Comment thread pkg/pdutil/utils.go Outdated
if candidate > current {
return candidate, nil
}
if current == ^uint64(0) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this really happen ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not happen in normal operation because the candidate epoch is based on PD TSO/local timestamp. I kept the guard as a defensive invariant check because wrapping MaxUint64 back to 0 would make an old owner look newer or compatible again. The updated comment now calls this out as defensive rather than expected behavior.

Comment thread coordinator/changefeed/changefeed.go Outdated
FeedState: status.FeedState,
State: status.State,
MaintainerEpoch: status.MaintainerEpoch,
// Old errors are meaningless for resume and can only block the resumed task.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment lack of context and misleading.

Comment thread coordinator/changefeed/changefeed.go Outdated

func (c *Changefeed) NewAddMaintainerMessage(server node.ID) *messaging.TargetMessage {
info := c.GetInfo()
if info == nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info is nil should be checked when new the changefeed ?

Comment thread coordinator/changefeed/changefeed.go
// SetChangefeedProgress persists the operation progress status to db for a changefeed
SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error
// ResumeChangefeed persists the resumed status to db for a changefeed and returns the resumed info.
ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this method ?

return nil
}

// BumpChangefeedEpoch atomically persists a strictly newer ownership epoch.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks the resume chanegefeed method is replaced by this one? Looks not ideal, bump changefeed epoch should be a internal operation of the maintainer.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the resume path should not be expressed as a raw bump at the controller call site. I restored Backend.ResumeChangefeed and made it own the resume-specific transition: StateNormal, ProgressNone, checkpoint update, error clear, and epoch bump in one backend operation. I kept BumpChangefeedEpoch as a lower-level backend boundary because other coordinator-driven ownership changes, such as add/move/retry, also must persist a fresh epoch before creating or moving a maintainer owner. So resume is semantic again, while the bump helper remains the shared metadata fence for non-resume owner-change paths.

@ti-chi-bot ti-chi-bot Bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Jun 22, 2026
@ti-chi-bot ti-chi-bot Bot added lgtm and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels Jun 22, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 22, 2026

Copy link
Copy Markdown

[LGTM Timeline notifier]

Timeline:

  • 2026-06-22 07:14:48.477122031 +0000 UTC m=+1980989.547439411: ☑️ agreed by 3AceShowHand.
  • 2026-06-22 13:07:54.105723255 +0000 UTC m=+2002175.176040635: ☑️ agreed by asddongmen.

@ti-chi-bot

ti-chi-bot Bot commented Jun 23, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: 3AceShowHand, asddongmen, lidezhu

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [3AceShowHand,asddongmen,lidezhu]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@hongyunyan

Copy link
Copy Markdown
Collaborator Author

/retest

@hongyunyan

Copy link
Copy Markdown
Collaborator Author

/test all

@ti-chi-bot ti-chi-bot Bot merged commit 0eec971 into pingcap:master Jun 23, 2026
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants