Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type DispatcherService interface {
GetFilterConfig() *eventpb.FilterConfig
EnableSyncPoint() bool
GetSyncPointInterval() time.Duration
GetEnableScanWindow() bool
GetSkipSyncpointAtStartTs() bool
GetTxnAtomicity() config.AtomicityLevel
GetResolvedTs() uint64
Expand Down Expand Up @@ -526,8 +527,8 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
log.Info("dispatcher receive ddl event",
zap.Stringer("dispatcher", d.id),
zap.String("query", ddl.Query),
zap.Any("tableSpan", d.GetTableSpan()),
zap.Int64("table", ddl.GetTableID()),
zap.Int64("oldTableID", d.tableSpan.GetTableID()),
zap.Int64("currentTableID", ddl.GetTableID()),
Comment on lines +530 to +531

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Accessing d.tableSpan directly may bypass concurrency synchronization or nil-safety checks that are normally provided by d.GetTableSpan(). To prevent potential data races or nil-pointer panics, it is safer to use d.GetTableSpan().GetTableID().

Suggested change
zap.Int64("oldTableID", d.tableSpan.GetTableID()),
zap.Int64("currentTableID", ddl.GetTableID()),
zap.Int64("oldTableID", d.GetTableSpan().GetTableID()),
zap.Int64("currentTableID", ddl.GetTableID()),

zap.Uint64("commitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()))
now := time.Now()
Expand Down
10 changes: 10 additions & 0 deletions downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type SharedInfo struct {
// will break the splittability of this table.
enableSplittableCheck bool

// enableScanWindow controls whether the event service applies the adaptive scan
// window (memory control + adaptive scan interval) for this changefeed.
enableScanWindow bool

// router is used to route source schema/table names to target schema/table names.
// It is used to apply routing to TableInfo before storing it.
router routing.Router
Expand Down Expand Up @@ -90,6 +94,7 @@ func NewSharedInfo(
syncPointConfig *syncpoint.SyncPointConfig,
txnAtomicity *config.AtomicityLevel,
enableSplittableCheck bool,
enableScanWindow bool,
router routing.Router,
statusesChan chan TableSpanStatusWithSeq,
blockStatusBufferSize int,
Expand All @@ -104,6 +109,7 @@ func NewSharedInfo(
filterConfig: filterConfig,
syncPointConfig: syncPointConfig,
enableSplittableCheck: enableSplittableCheck,
enableScanWindow: enableScanWindow,
router: router,
statusesChan: statusesChan,
blockStatusBuffer: NewBlockStatusBuffer(blockStatusBufferSize),
Expand Down Expand Up @@ -215,6 +221,10 @@ func (d *BasicDispatcher) GetSyncPointInterval() time.Duration {
return time.Duration(0)
}

func (d *BasicDispatcher) GetEnableScanWindow() bool {
return d.sharedInfo.enableScanWindow
}

func (d *BasicDispatcher) GetTableSpan() *heartbeatpb.TableSpan {
return d.tableSpan
}
Expand Down
7 changes: 6 additions & 1 deletion downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func newTestSharedInfo(
syncPointConfig,
&defaultAtomicity,
enableSplittableCheck,
false, // enableScanWindow
routing.Router{},
make(chan TableSpanStatusWithSeq, 128),
128,
Expand Down Expand Up @@ -128,6 +129,7 @@ func newDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) *Eve
}, // syncPointConfig
&defaultAtomicity,
false, // enableSplittableCheck
false, // enableScanWindow
routing.Router{},
make(chan TableSpanStatusWithSeq, 128),
128,
Expand Down Expand Up @@ -1090,7 +1092,8 @@ func TestDispatcherSplittableCheck(t *testing.T) {
SyncPointRetention: time.Duration(10 * time.Minute),
},
&defaultAtomicity,
true, // enableSplittableCheck = true
true, // enableSplittableCheck = true
false, // enableScanWindow
routing.Router{},
make(chan TableSpanStatusWithSeq, 128),
128,
Expand Down Expand Up @@ -1201,6 +1204,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) {
},
&defaultAtomicity,
false,
false, // enableScanWindow
routing.Router{},
make(chan TableSpanStatusWithSeq, 128),
128,
Expand Down Expand Up @@ -1281,6 +1285,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) {
},
&defaultAtomicity,
false,
false, // enableScanWindow
routing.Router{},
make(chan TableSpanStatusWithSeq, 128),
128,
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/redo_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func newRedoDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan)
nil, // redo dispatcher doesn't need syncPointConfig
&defaultAtomicity,
false, // enableSplittableCheck
false, // enableScanWindow
routing.Router{},
make(chan TableSpanStatusWithSeq, 128),
128,
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func NewDispatcherManager(
syncPointConfig,
manager.config.SinkConfig.TxnAtomicity,
manager.config.EnableSplittableCheck,
manager.config.EnableScanWindow,
router,
make(chan dispatcher.TableSpanStatusWithSeq, 8192),
blockStatusBufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func createTestManager(t *testing.T) *DispatcherManager {
nil, // syncPointConfig
&defaultAtomicity,
false,
false, // enableScanWindow
routing.Router{},
make(chan dispatcher.TableSpanStatusWithSeq, 8192),
blockStatusBufferSize,
Expand Down
2 changes: 2 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func (s *dispatcherSession) newDispatcherRegisterRequest(serverID string, onlyRe
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
EnableScanWindow: s.target.GetEnableScanWindow(),
},
}
}
Expand Down Expand Up @@ -568,6 +569,7 @@ func (s *dispatcherSession) newDispatcherResetRequest(serverID string, resetTs u
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
EnableScanWindow: s.target.GetEnableScanWindow(),
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (m *mockDispatcher) GetSyncPointInterval() time.Duration {
return time.Second * 10
}

func (m *mockDispatcher) GetEnableScanWindow() bool {
return false
}

func (m *mockDispatcher) GetSkipSyncpointAtStartTs() bool {
return m.skipSyncpointAtStartTs
}
Expand Down
84 changes: 75 additions & 9 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
receiveChanSize = 1024 * 8
commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests.
eventServiceHeartbeatInterval = 10 * time.Second

// scanWindowReleaseMemoryRatio is the fraction of pending memory the event collector
// releases per cycle for changefeeds with the adaptive scan window enabled. It is
// more aggressive than the dynstream default to keep up with the larger scan windows.
scanWindowReleaseMemoryRatio = 0.6
)

// DispatcherMessage is the message send to EventService.
Expand Down Expand Up @@ -84,6 +89,7 @@ type changefeedStat struct {
metricMemoryUsageMaxRedo prometheus.Gauge
metricMemoryUsageUsedRedo prometheus.Gauge
dispatcherCount atomic.Int32
memoryReleaseCount atomic.Uint32
}

func newChangefeedStat(changefeedID common.ChangeFeedID) *changefeedStat {
Expand Down Expand Up @@ -273,6 +279,12 @@ func (c *EventCollector) PrepareAddDispatcher(

ds := c.getDynamicStream(target.GetMode())
areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(memoryQuota, dynstream.MemoryControlForEventCollector, "eventCollector")
// The adaptive scan window releases memory more aggressively to keep up with the
// larger scan windows. Only apply it when the changefeed enables the scan window,
// so a changefeed with the feature off keeps the pre-scan-window release ratio.
if target.GetEnableScanWindow() {
areaSetting.SetReleaseMemoryRatio(scanWindowReleaseMemoryRatio)
}
err := ds.AddPath(target.GetId(), stat, areaSetting)
if err != nil {
log.Warn("add dispatcher to dynamic stream failed", zap.Error(err))
Expand Down Expand Up @@ -440,11 +452,17 @@ func (c *EventCollector) processDSFeedback(ctx context.Context) error {
return context.Cause(ctx)
case feedback := <-c.ds.Feedback():
if feedback.FeedbackType == dynstream.ReleasePath {
if v, ok := c.changefeedMap.Load(feedback.Area); ok {
v.(*changefeedStat).memoryReleaseCount.Add(1)
}
log.Info("release dispatcher memory in DS", zap.Any("dispatcherID", feedback.Path))
c.ds.Release(feedback.Path)
}
case feedback := <-c.redoDs.Feedback():
if feedback.FeedbackType == dynstream.ReleasePath {
if v, ok := c.changefeedMap.Load(feedback.Area); ok {
v.(*changefeedStat).memoryReleaseCount.Add(1)
}
log.Info("release dispatcher memory in redo DS", zap.Any("dispatcherID", feedback.Path))
c.redoDs.Release(feedback.Path)
}
Expand Down Expand Up @@ -617,9 +635,24 @@ func (c *EventCollector) controlCongestion(ctx context.Context) error {
}

func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.CongestionControl {
changefeedMemoryReleaseCount := make(map[common.ChangeFeedID]uint32)
getAndResetMemoryReleaseCount := func(changefeedID common.ChangeFeedID) uint32 {
if count, ok := changefeedMemoryReleaseCount[changefeedID]; ok {
return count
}
v, ok := c.changefeedMap.Load(changefeedID.ID())
if !ok {
return 0
}
count := v.(*changefeedStat).memoryReleaseCount.Swap(0)
changefeedMemoryReleaseCount[changefeedID] = count
return count
}

// collect path-level available memory and total available memory for each changefeed
changefeedPathMemory := make(map[common.ChangeFeedID]map[common.DispatcherID]uint64)
changefeedTotalMemory := make(map[common.ChangeFeedID]uint64)
changefeedUsageRatio := make(map[common.ChangeFeedID]float64)

// collect from main dynamic stream
for _, quota := range c.ds.GetMetrics().MemoryControl.AreaMemoryMetrics {
Expand All @@ -637,6 +670,7 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
}
// store total available memory from AreaMemoryMetric
changefeedTotalMemory[cfID] = uint64(quota.AvailableMemory())
changefeedUsageRatio[cfID] = calcUsageRatio(quota.MemoryUsage(), quota.MaxMemory())
}

// collect from redo dynamic stream and take minimum
Expand All @@ -658,11 +692,9 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
}
}
// take minimum total available memory between main and redo streams
if existing, exists := changefeedTotalMemory[cfID]; exists {
changefeedTotalMemory[cfID] = min(existing, uint64(quota.AvailableMemory()))
} else {
changefeedTotalMemory[cfID] = uint64(quota.AvailableMemory())
}
updateMinUint64MapValue(changefeedTotalMemory, cfID, uint64(quota.AvailableMemory()))
// take maximum usage ratio between main and redo streams
changefeedUsageRatio[cfID] = max(changefeedUsageRatio[cfID], calcUsageRatio(quota.MemoryUsage(), quota.MaxMemory()))
}

if len(changefeedPathMemory) == 0 {
Expand Down Expand Up @@ -699,30 +731,64 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
// build congestion control messages for each node
result := make(map[node.ID]*event.CongestionControl)
for nodeID, changefeedDispatchers := range nodeDispatcherMemory {
congestionControl := event.NewCongestionControl()
congestionControl := event.NewCongestionControlWithVersion(event.CongestionControlVersion2)

for changefeedID, dispatcherMemory := range changefeedDispatchers {
if len(dispatcherMemory) == 0 {
continue
}

// get total available memory directly from AreaMemoryMetric
totalAvailable := uint64(changefeedTotalMemory[changefeedID])
congestionControl.AddAvailableMemoryWithDispatchers(
totalAvailable, ok := changefeedTotalMemory[changefeedID]
if !ok {
continue
}
congestionControl.AddAvailableMemoryWithDispatchersAndUsageAndReleaseCount(
changefeedID.ID(),
totalAvailable,
changefeedUsageRatio[changefeedID],
dispatcherMemory,
getAndResetMemoryReleaseCount(changefeedID),
)
}

if len(congestionControl.GetAvailables()) > 0 {
result[nodeID] = congestionControl
}
}

return result
}

func updateMinUint64MapValue(m map[common.ChangeFeedID]uint64, key common.ChangeFeedID, value uint64) {
if existing, exists := m[key]; exists {
m[key] = min(existing, value)
} else {
m[key] = value
}
}

func updateMaxUint64MapValue(m map[common.ChangeFeedID]uint64, key common.ChangeFeedID, value uint64) {
if existing, exists := m[key]; exists {
m[key] = max(existing, value)
} else {
m[key] = value
}
}

func calcUsageRatio(usedMemory int64, maxMemory int64) float64 {
if maxMemory <= 0 {
return 0
}
ratio := float64(usedMemory) / float64(maxMemory)
if ratio < 0 {
return 0
}
if ratio > 1 {
return 1
}
return ratio
}

func (c *EventCollector) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/eventcollector/event_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (m *mockEventDispatcher) GetSyncPointInterval() time.Duration {
return time.Second
}

func (m *mockEventDispatcher) GetEnableScanWindow() bool {
return false
}

func (m *mockEventDispatcher) GetSkipSyncpointAtStartTs() bool {
return false
}
Expand Down
Loading
Loading