Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type DispatcherService interface {
GetFilterConfig() *eventpb.FilterConfig
EnableSyncPoint() bool
GetSyncPointInterval() time.Duration
GetEnableScanWindow() bool
GetSkipSyncpointAtStartTs() bool
GetTxnAtomicity() config.AtomicityLevel
GetResolvedTs() uint64
Expand Down
10 changes: 10 additions & 0 deletions downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type SharedInfo struct {
// will break the splittability of this table.
enableSplittableCheck bool

// enableScanWindow controls whether the event service applies the adaptive scan
// window (memory control + adaptive scan interval) for this changefeed.
enableScanWindow bool

// router is used to route source schema/table names to target schema/table names.
// It is used to apply routing to TableInfo before storing it.
router routing.Router
Expand Down Expand Up @@ -96,6 +100,7 @@ func NewSharedInfo(
syncPointConfig *syncpoint.SyncPointConfig,
txnAtomicity *config.AtomicityLevel,
enableSplittableCheck bool,
enableScanWindow bool,
router routing.Router,
eventCollectorBatchCount int,
eventCollectorBatchBytes int,
Expand All @@ -113,6 +118,7 @@ func NewSharedInfo(
filterConfig: filterConfig,
syncPointConfig: syncPointConfig,
enableSplittableCheck: enableSplittableCheck,
enableScanWindow: enableScanWindow,
router: router,
eventCollectorBatchCount: eventCollectorBatchCount,
eventCollectorBatchBytes: eventCollectorBatchBytes,
Expand Down Expand Up @@ -234,6 +240,10 @@ func (d *BasicDispatcher) GetSyncPointInterval() time.Duration {
return time.Duration(0)
}

func (d *BasicDispatcher) GetEnableScanWindow() bool {
return d.sharedInfo.enableScanWindow
}

func (d *BasicDispatcher) GetTableSpan() *heartbeatpb.TableSpan {
return d.tableSpan
}
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func newTestSharedInfo(
syncPointConfig,
&defaultAtomicity,
enableSplittableCheck,
false, // enableScanWindow
routing.Router{},
0,
0,
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func NewDispatcherManager(
syncPointConfig,
manager.config.SinkConfig.TxnAtomicity,
manager.config.EnableSplittableCheck,
manager.config.EnableScanWindow,
router,
batchCounts,
batchBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func createTestManager(t *testing.T) *DispatcherManager {
nil, // syncPointConfig
&defaultAtomicity,
false,
false, // enableScanWindow
routing.Router{},
0,
0,
Expand Down
2 changes: 2 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func (s *dispatcherSession) newDispatcherRegisterRequest(serverID string, onlyRe
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
EnableScanWindow: s.target.GetEnableScanWindow(),
},
}
}
Expand Down Expand Up @@ -568,6 +569,7 @@ func (s *dispatcherSession) newDispatcherResetRequest(serverID string, resetTs u
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
EnableScanWindow: s.target.GetEnableScanWindow(),
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (m *mockDispatcher) GetSyncPointInterval() time.Duration {
return time.Second * 10
}

func (m *mockDispatcher) GetEnableScanWindow() bool {
return false
}

func (m *mockDispatcher) GetSkipSyncpointAtStartTs() bool {
return m.skipSyncpointAtStartTs
}
Expand Down
11 changes: 11 additions & 0 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
receiveChanSize = 1024 * 8
commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests.
eventServiceHeartbeatInterval = 10 * time.Second

// scanWindowReleaseMemoryRatio is the fraction of pending memory the event collector
// releases per cycle for changefeeds with the adaptive scan window enabled. It is
// more aggressive than the dynstream default to keep up with the larger scan windows.
scanWindowReleaseMemoryRatio = 0.6
)

// DispatcherMessage is the message send to EventService.
Expand Down Expand Up @@ -281,6 +286,12 @@ func (c *EventCollector) PrepareAddDispatcher(
batchCount,
batchBytes,
)
// The adaptive scan window releases memory more aggressively to keep up with the
// larger scan windows. Only apply it when the changefeed enables the scan window,
// so a changefeed with the feature off keeps the pre-scan-window release ratio.
if target.GetEnableScanWindow() {
areaSetting.SetReleaseMemoryRatio(scanWindowReleaseMemoryRatio)
}
err := ds.AddPath(target.GetId(), stat, areaSetting)
if err != nil {
log.Warn("add dispatcher to dynamic stream failed", zap.Error(err))
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/eventcollector/event_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (m *mockEventDispatcher) GetSyncPointInterval() time.Duration {
return time.Second
}

func (m *mockEventDispatcher) GetEnableScanWindow() bool {
return false
}

func (m *mockEventDispatcher) GetSkipSyncpointAtStartTs() bool {
return false
}
Expand Down
Loading
Loading