Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \
prepare_test_binaries \
unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar \
integration_test_weekly_rand_ddl_mysql \
generate-next-gen-grafana check-next-gen-grafana


Expand Down Expand Up @@ -258,6 +259,9 @@ integration_test_storage: check_third_party_binary
integration_test_pulsar: check_third_party_binary
tests/integration_tests/run.sh pulsar "$(CASE)" "$(START_AT)"

integration_test_weekly_rand_ddl_mysql: check_third_party_binary
tests/integration_tests/run_weekly_rand_ddl_it_in_ci.sh mysql

unit_test: check_failpoint_ctl generate-protobuf
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
Expand Down
37 changes: 35 additions & 2 deletions maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/maintainer/operator"
"github.com/pingcap/ticdc/maintainer/replica"
"github.com/pingcap/ticdc/maintainer/span"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/messaging"
Expand Down Expand Up @@ -252,7 +253,7 @@ func (b *Barrier) Resend() []*messaging.TargetMessage {
eventList := make([]*BarrierEvent, 0)
b.blockedEvents.Range(func(key eventKey, barrierEvent *BarrierEvent) bool {
// todo: we can limit the number of messages to send in one round here
msgs = append(msgs, barrierEvent.resend(b.mode)...)
msgs = append(msgs, barrierEvent.resendWithSchedule(b.mode, b.tryScheduleEvent)...)

eventList = append(eventList, barrierEvent)
return true
Expand Down Expand Up @@ -308,7 +309,7 @@ func (b *Barrier) handleOneStatus(changefeedID *heartbeatpb.ChangefeedID, status
Mode: status.Mode,
})
if status.State != nil {
span.UpdateBlockState(*status.State)
updateSpanBlockState(span, status.State)
}
}
if status.State.Stage == heartbeatpb.BlockStage_DONE {
Expand All @@ -317,6 +318,38 @@ func (b *Barrier) handleOneStatus(changefeedID *heartbeatpb.ChangefeedID, status
return b.handleBlockState(cfID, dispatcherID, status)
}

func updateSpanBlockState(span *replica.SpanReplication, newState *heartbeatpb.State) {
oldState := span.GetBlockState()
if oldState != nil && compareBlockState(oldState, newState) > 0 {
log.Debug("ignore stale block state",
zap.String("dispatcher", span.ID.String()),
zap.Uint64("oldBlockTs", oldState.BlockTs),
zap.Bool("oldIsSyncPoint", oldState.IsSyncPoint),
zap.String("oldStage", oldState.Stage.String()),
zap.Uint64("newBlockTs", newState.BlockTs),
zap.Bool("newIsSyncPoint", newState.IsSyncPoint),
zap.String("newStage", newState.Stage.String()))
return
}
span.UpdateBlockState(*newState)
}

func compareBlockState(a, b *heartbeatpb.State) int {
if a.BlockTs < b.BlockTs {
return -1
}
if a.BlockTs > b.BlockTs {
return 1
}
if a.IsSyncPoint != b.IsSyncPoint {
if !a.IsSyncPoint && b.IsSyncPoint {
return -1
}
return 1
}
return int(a.Stage) - int(b.Stage)
}

func (b *Barrier) handleEventDone(changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID, status *heartbeatpb.TableSpanBlockStatus) *BarrierEvent {
key := getEventKey(status.State.BlockTs, status.State.IsSyncPoint)
event, ok := b.blockedEvents.Get(key)
Expand Down
207 changes: 187 additions & 20 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,8 @@ func (be *BarrierEvent) onAllDispatcherReportedBlockEvent(dispatcherID common.Di
}

// Once the event enters selected state, we start a new reporting phase that
// tracks completion after write/pass rather than the initial WAITING
// coverage. Reset both structures so DONE reports are measured from scratch.
be.rangeChecker.Reset()
be.reportedDispatchers = make(map[common.DispatcherID]struct{})

// tracks completion after write/pass rather than the initial WAITING coverage.
be.resetProgressAfterSelection()
be.selected.Store(true)
be.writerDispatcher = dispatcher
be.lastResendTime = time.Now()
Expand Down Expand Up @@ -325,6 +322,152 @@ func (be *BarrierEvent) addDispatchersToRangeChecker() {
}
}

func (be *BarrierEvent) ensureRangeChecker() {
if be.rangeChecker != nil || be.blockedDispatchers == nil {
return
}

switch be.blockedDispatchers.InfluenceType {
case heartbeatpb.InfluenceType_Normal:
if be.dynamicSplitEnabled {
be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), be.blockedDispatchers.TableIDs)
} else {
be.rangeChecker = range_checker.NewTableCountChecker(be.blockedDispatchers.TableIDs)
}
case heartbeatpb.InfluenceType_DB:
be.createRangeCheckerForTypeDB()
case heartbeatpb.InfluenceType_All:
be.createRangeCheckerForTypeAll()
}
}

func (be *BarrierEvent) getTasksByBlockedTableID(tableID int64) []*replica.SpanReplication {
if tableID != common.DDLSpanTableID {
return be.spanController.GetTasksByTableID(tableID)
}
ddlReplication := be.spanController.GetTaskByID(be.spanController.GetDDLDispatcherID())
if ddlReplication == nil {
return nil
}
return []*replica.SpanReplication{ddlReplication}
}

func (be *BarrierEvent) relatedReplications() []*replica.SpanReplication {
if be.blockedDispatchers == nil {
return nil
}

switch be.blockedDispatchers.InfluenceType {
case heartbeatpb.InfluenceType_Normal:
replications := make([]*replica.SpanReplication, 0, len(be.blockedDispatchers.TableIDs))
for _, tableID := range be.blockedDispatchers.TableIDs {
replications = append(replications, be.getTasksByBlockedTableID(tableID)...)
}
return replications
case heartbeatpb.InfluenceType_DB:
replications := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID)
if ddlReplication := be.spanController.GetTaskByID(be.spanController.GetDDLDispatcherID()); ddlReplication != nil {
replications = append(replications, ddlReplication)
}
return replications
case heartbeatpb.InfluenceType_All:
return be.spanController.GetAllTasks()
}
return nil
}

func (be *BarrierEvent) addAdvancedReplicationsToRangeChecker() {
if be.rangeChecker == nil {
return
}

for _, replication := range be.relatedReplications() {
if replication == nil || !forwardBarrierEvent(replication, be) {
continue
}
be.reportedDispatchers[replication.ID] = struct{}{}
be.rangeChecker.AddSubRange(replication.Span.TableID, replication.Span.StartKey, replication.Span.EndKey)
}
}

func (be *BarrierEvent) refreshSelectedProgress() bool {
be.ensureRangeChecker()
be.addAdvancedReplicationsToRangeChecker()
if be.writerDispatcherAdvanced {
return false
}

writer := be.spanController.GetTaskByID(be.writerDispatcher)
if writer == nil || !forwardBarrierEvent(writer, be) {
return false
}
if be.needSchedule {
return true
}
be.writerDispatcherAdvanced = true
be.lastResendTime = time.Now().Add(-20 * time.Second)
return true
}

func (be *BarrierEvent) resetProgressAfterSelection() {
be.ensureRangeChecker()
if be.rangeChecker != nil {
be.rangeChecker.Reset()
}
be.reportedDispatchers = make(map[common.DispatcherID]struct{})
be.addAdvancedReplicationsToRangeChecker()
}

func (be *BarrierEvent) selectByForwardedDispatcher() {
be.resetProgressAfterSelection()
be.selected.Store(true)
be.writerDispatcherAdvanced = true
be.passActionSent = false
}

func (be *BarrierEvent) markMissingDroppedTablesDone() bool {
if be.blockedDispatchers == nil || be.blockedDispatchers.InfluenceType != heartbeatpb.InfluenceType_Normal ||
be.dropDispatchers == nil || be.dropDispatchers.InfluenceType != heartbeatpb.InfluenceType_Normal {
return false
}

be.ensureRangeChecker()
if be.rangeChecker == nil {
return false
}

marked := false
for _, tableID := range be.dropDispatchers.TableIDs {
if tableID == common.DDLSpanTableID || !containsTableID(be.blockedDispatchers.TableIDs, tableID) {
continue
}
if len(be.spanController.GetTasksByTableID(tableID)) != 0 {
continue
}
if be.spanController.GetTaskByID(be.spanController.GetDDLDispatcherID()) == nil {
continue
}

be.markTableDone(tableID)
marked = true
log.Info("blocked table has no active dispatcher, mark it done",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("tableID", tableID),
zap.Int64("mode", be.mode))
}
return marked
}

func containsTableID(tableIDs []int64, target int64) bool {
for _, tableID := range tableIDs {
if tableID == target {
return true
}
}
return false
}

func (be *BarrierEvent) markDispatcherEventDone(dispatcherID common.DispatcherID) {
if be.selected.Load() {
// After selection, every accepted status means the chosen write/pass path
Expand Down Expand Up @@ -480,7 +623,7 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage {
}
case heartbeatpb.InfluenceType_Normal:
for _, tableID := range be.blockedDispatchers.TableIDs {
spans := be.spanController.GetTasksByTableID(tableID)
spans := be.getTasksByBlockedTableID(tableID)
if len(spans) == 0 {
be.markTableDone(tableID)
} else {
Expand Down Expand Up @@ -521,17 +664,30 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage {
func (be *BarrierEvent) checkBlockedDispatchers() {
switch be.blockedDispatchers.InfluenceType {
case heartbeatpb.InfluenceType_Normal:
for _, tableId := range be.blockedDispatchers.TableIDs {
replications := be.spanController.GetTasksByTableID(tableId)
if be.markMissingDroppedTablesDone() && be.allDispatcherReported() {
// A normal DDL barrier can be recreated by a late WAITING status after the
// original barrier has already scheduled the drop and removed the table
// dispatcher. The removed table cannot report again, so advance the
// recreated barrier and let sendPassAction notify the remaining DDL span.
be.selectByForwardedDispatcher()
log.Info("all missing dropped blocked tables are removed, advance block event",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Any("blocker", be.blockedDispatchers),
zap.Int64("mode", be.mode))
return
}

for _, tableID := range be.blockedDispatchers.TableIDs {
replications := be.getTasksByBlockedTableID(tableID)
for _, replication := range replications {
if forwardBarrierEvent(replication, be) {
// one related table has forward checkpointTs, means the block event can be advanced
be.selected.Store(true)
be.writerDispatcherAdvanced = true
be.selectByForwardedDispatcher()
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("tableId", tableId),
zap.Int64("tableID", tableID),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
Expand All @@ -546,8 +702,7 @@ func (be *BarrierEvent) checkBlockedDispatchers() {
for _, replication := range replications {
if forwardBarrierEvent(replication, be) {
// One related dispatcher has moved past the barrier, so the block event can advance.
be.selected.Store(true)
be.writerDispatcherAdvanced = true
be.selectByForwardedDispatcher()
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
Expand All @@ -564,8 +719,7 @@ func (be *BarrierEvent) checkBlockedDispatchers() {
for _, replication := range replications {
if forwardBarrierEvent(replication, be) {
// One related dispatcher has moved past the barrier, so the block event can advance.
be.selected.Store(true)
be.writerDispatcherAdvanced = true
be.selectByForwardedDispatcher()
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
Expand All @@ -582,10 +736,10 @@ func (be *BarrierEvent) checkBlockedDispatchers() {
// forwardBarrierEvent returns true if `replication` is known to have passed `event`.
//
// We intentionally avoid `checkpointTs >= commitTs`: a dispatcher may be recreated with
// `startTs == commitTs` and not skip the syncpoint at that ts, so it may report
// `checkpointTs == commitTs` before the syncpoint is actually flushed. We only forward when the
// replication is strictly beyond the barrier, or when ordering guarantees it (replication is in a
// syncpoint barrier at the same ts while `event` is a DDL barrier).
// `startTs == commitTs` and still need to flush the syncpoint at that ts. We only forward when the
// replication is strictly beyond the barrier, when it already reported DONE for this exact barrier,
// or when ordering guarantees it (replication is in a syncpoint barrier at the same ts while `event`
// is a DDL barrier).
func forwardBarrierEvent(replication *replica.SpanReplication, event *BarrierEvent) bool {
if replication.GetStatus().CheckpointTs > event.commitTs {
return true
Expand All @@ -596,6 +750,9 @@ func forwardBarrierEvent(replication *replica.SpanReplication, event *BarrierEve
if blockState.BlockTs > event.commitTs {
return true
} else if blockState.BlockTs == event.commitTs {
if blockState.Stage == heartbeatpb.BlockStage_DONE && blockState.IsSyncPoint == event.isSyncPoint {
return true
}
// If the replication is already blocked by a syncpoint at the same ts, it must have
// processed the DDL barrier at that ts already (barrier events are ordered by (commitTs, isSyncPoint)).
if blockState.IsSyncPoint && !event.isSyncPoint {
Expand All @@ -607,6 +764,10 @@ func forwardBarrierEvent(replication *replica.SpanReplication, event *BarrierEve
}

func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage {
return be.resendWithSchedule(mode, nil)
}

func (be *BarrierEvent) resendWithSchedule(mode int64, trySchedule func(*BarrierEvent) bool) []*messaging.TargetMessage {
now := time.Now()
if now.Sub(be.lastResendTime) < time.Second {
return nil
Expand Down Expand Up @@ -657,6 +818,12 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage {
be.checkBlockedDispatchers()
return nil
}
writerForwarded := be.refreshSelectedProgress()
if writerForwarded && be.needSchedule && !be.writerDispatcherAdvanced {
if trySchedule == nil || !trySchedule(be) {
return nil
}
}
// we select a dispatcher as the writer, still waiting for that dispatcher advance its checkpoint ts
if !be.writerDispatcherAdvanced {
be.lastResendTime = now
Expand Down Expand Up @@ -684,7 +851,7 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage {
}

tableID := be.blockedDispatchers.TableIDs[0]
replications := be.spanController.GetTasksByTableID(tableID)
replications := be.getTasksByBlockedTableID(tableID)

if len(replications) == 0 {
log.Panic("replications for this block event should not be empty",
Expand Down
Loading