diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 6cfcd9aac6..9ecc16037a 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -52,6 +52,7 @@ type DispatcherService interface { GetFilterConfig() *eventpb.FilterConfig EnableSyncPoint() bool GetSyncPointInterval() time.Duration + GetEnableScanWindow() bool GetSkipSyncpointAtStartTs() bool GetTxnAtomicity() config.AtomicityLevel GetResolvedTs() uint64 diff --git a/downstreamadapter/dispatcher/basic_dispatcher_info.go b/downstreamadapter/dispatcher/basic_dispatcher_info.go index dd9eab1a0a..7c4b8951c4 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_info.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_info.go @@ -56,6 +56,10 @@ type SharedInfo struct { // will break the splittability of this table. enableSplittableCheck bool + // enableScanWindow controls whether the event service applies the adaptive scan + // window (memory control + adaptive scan interval) for this changefeed. + enableScanWindow bool + // router is used to route source schema/table names to target schema/table names. // It is used to apply routing to TableInfo before storing it. router routing.Router @@ -96,6 +100,7 @@ func NewSharedInfo( syncPointConfig *syncpoint.SyncPointConfig, txnAtomicity *config.AtomicityLevel, enableSplittableCheck bool, + enableScanWindow bool, router routing.Router, eventCollectorBatchCount int, eventCollectorBatchBytes int, @@ -113,6 +118,7 @@ func NewSharedInfo( filterConfig: filterConfig, syncPointConfig: syncPointConfig, enableSplittableCheck: enableSplittableCheck, + enableScanWindow: enableScanWindow, router: router, eventCollectorBatchCount: eventCollectorBatchCount, eventCollectorBatchBytes: eventCollectorBatchBytes, @@ -234,6 +240,10 @@ func (d *BasicDispatcher) GetSyncPointInterval() time.Duration { return time.Duration(0) } +func (d *BasicDispatcher) GetEnableScanWindow() bool { + return d.sharedInfo.enableScanWindow +} + func (d *BasicDispatcher) GetTableSpan() *heartbeatpb.TableSpan { return d.tableSpan } diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 0b81ca0c1a..693cf4879d 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -76,6 +76,7 @@ func newTestSharedInfo( syncPointConfig, &defaultAtomicity, enableSplittableCheck, + false, // enableScanWindow routing.Router{}, 0, 0, diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 52a36c9e43..c6c65c0980 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -324,6 +324,7 @@ func NewDispatcherManager( syncPointConfig, manager.config.SinkConfig.TxnAtomicity, manager.config.EnableSplittableCheck, + manager.config.EnableScanWindow, router, batchCounts, batchBytes, diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go index 0d0897ab0e..53f1bd50db 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go @@ -133,6 +133,7 @@ func createTestManager(t *testing.T) *DispatcherManager { nil, // syncPointConfig &defaultAtomicity, false, + false, // enableScanWindow routing.Router{}, 0, 0, diff --git a/downstreamadapter/eventcollector/dispatcher_session.go b/downstreamadapter/eventcollector/dispatcher_session.go index e3ec7de114..8d32753c98 100644 --- a/downstreamadapter/eventcollector/dispatcher_session.go +++ b/downstreamadapter/eventcollector/dispatcher_session.go @@ -533,6 +533,7 @@ func (s *dispatcherSession) newDispatcherRegisterRequest(serverID string, onlyRe Integrity: s.target.GetIntegrityConfig(), OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(), TxnAtomicity: string(s.target.GetTxnAtomicity()), + EnableScanWindow: s.target.GetEnableScanWindow(), }, } } @@ -568,6 +569,7 @@ func (s *dispatcherSession) newDispatcherResetRequest(serverID string, resetTs u Integrity: s.target.GetIntegrityConfig(), OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(), TxnAtomicity: string(s.target.GetTxnAtomicity()), + EnableScanWindow: s.target.GetEnableScanWindow(), }, } } diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index 9c3088a6eb..1aad935ab1 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -118,6 +118,10 @@ func (m *mockDispatcher) GetSyncPointInterval() time.Duration { return time.Second * 10 } +func (m *mockDispatcher) GetEnableScanWindow() bool { + return false +} + func (m *mockDispatcher) GetSkipSyncpointAtStartTs() bool { return m.skipSyncpointAtStartTs } diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6456f1ada2..7e930a5261 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -40,6 +40,11 @@ const ( receiveChanSize = 1024 * 8 commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests. eventServiceHeartbeatInterval = 10 * time.Second + + // scanWindowReleaseMemoryRatio is the fraction of pending memory the event collector + // releases per cycle for changefeeds with the adaptive scan window enabled. It is + // more aggressive than the dynstream default to keep up with the larger scan windows. + scanWindowReleaseMemoryRatio = 0.6 ) // DispatcherMessage is the message send to EventService. @@ -281,6 +286,12 @@ func (c *EventCollector) PrepareAddDispatcher( batchCount, batchBytes, ) + // The adaptive scan window releases memory more aggressively to keep up with the + // larger scan windows. Only apply it when the changefeed enables the scan window, + // so a changefeed with the feature off keeps the pre-scan-window release ratio. + if target.GetEnableScanWindow() { + areaSetting.SetReleaseMemoryRatio(scanWindowReleaseMemoryRatio) + } err := ds.AddPath(target.GetId(), stat, areaSetting) if err != nil { log.Warn("add dispatcher to dynamic stream failed", zap.Error(err)) diff --git a/downstreamadapter/eventcollector/event_collector_test.go b/downstreamadapter/eventcollector/event_collector_test.go index eceea06518..d53fe8179f 100644 --- a/downstreamadapter/eventcollector/event_collector_test.go +++ b/downstreamadapter/eventcollector/event_collector_test.go @@ -105,6 +105,10 @@ func (m *mockEventDispatcher) GetSyncPointInterval() time.Duration { return time.Second } +func (m *mockEventDispatcher) GetEnableScanWindow() bool { + return false +} + func (m *mockEventDispatcher) GetSkipSyncpointAtStartTs() bool { return false } diff --git a/eventpb/event.pb.go b/eventpb/event.pb.go index 485ed22007..d0986d6313 100644 --- a/eventpb/event.pb.go +++ b/eventpb/event.pb.go @@ -646,6 +646,10 @@ type DispatcherRequest struct { OutputRawChangeEvent bool `protobuf:"varint,17,opt,name=output_raw_change_event,json=outputRawChangeEvent,proto3" json:"output_raw_change_event,omitempty"` Mode int64 `protobuf:"varint,18,opt,name=mode,proto3" json:"mode,omitempty"` TxnAtomicity string `protobuf:"bytes,19,opt,name=txn_atomicity,json=txnAtomicity,proto3" json:"txn_atomicity,omitempty"` + // enable_scan_window controls whether the event service applies the adaptive + // scan window (memory control + scan interval) for this changefeed. + // It defaults to false so the feature behaves as if it was never introduced. + EnableScanWindow bool `protobuf:"varint,20,opt,name=enable_scan_window,json=enableScanWindow,proto3" json:"enable_scan_window,omitempty"` } func (m *DispatcherRequest) Reset() { *m = DispatcherRequest{} } @@ -814,6 +818,13 @@ func (m *DispatcherRequest) GetTxnAtomicity() string { return "" } +func (m *DispatcherRequest) GetEnableScanWindow() bool { + if m != nil { + return m.EnableScanWindow + } + return false +} + func init() { proto.RegisterEnum("eventpb.OpType", OpType_name, OpType_value) proto.RegisterEnum("eventpb.ActionType", ActionType_name, ActionType_value) @@ -832,80 +843,81 @@ func init() { func init() { proto.RegisterFile("eventpb/event.proto", fileDescriptor_d7fb2554dfcf7f7d) } var fileDescriptor_d7fb2554dfcf7f7d = []byte{ - // 1157 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4f, 0x73, 0xdb, 0x44, - 0x14, 0x8f, 0x9c, 0x3f, 0xb6, 0x9e, 0xed, 0xc4, 0xde, 0x34, 0xad, 0xda, 0x42, 0x08, 0x86, 0xe9, - 0x98, 0xce, 0xe0, 0x94, 0xd0, 0xc2, 0x4c, 0x87, 0xe9, 0x4c, 0x49, 0x5c, 0xd0, 0x0c, 0x6d, 0x32, - 0x6b, 0xb5, 0x33, 0x70, 0xd1, 0xc8, 0xd2, 0x4b, 0x22, 0x2a, 0xef, 0xaa, 0xab, 0x95, 0x13, 0xf3, - 0x29, 0x38, 0x71, 0xe2, 0xce, 0x57, 0xe1, 0xd8, 0x23, 0x37, 0x98, 0xf6, 0xc0, 0xd7, 0x60, 0x76, - 0x57, 0x96, 0xad, 0xb6, 0x70, 0xe1, 0xe4, 0xdd, 0xf7, 0xfb, 0xbd, 0xdd, 0xb7, 0xbf, 0xf7, 0x47, - 0x86, 0x6d, 0x9c, 0x22, 0x93, 0xe9, 0x78, 0x5f, 0xff, 0x0e, 0x52, 0xc1, 0x25, 0x27, 0xf5, 0xc2, - 0x78, 0xe3, 0xe6, 0x39, 0x06, 0x42, 0x8e, 0x31, 0x50, 0x8c, 0x72, 0x6d, 0x58, 0xbd, 0x3f, 0x6b, - 0xb0, 0x35, 0x54, 0xc4, 0x47, 0x71, 0x22, 0x51, 0xd0, 0x3c, 0x41, 0xe2, 0x40, 0x7d, 0x12, 0xc8, - 0xf0, 0x1c, 0x85, 0x63, 0xed, 0xad, 0xf6, 0x6d, 0x3a, 0xdf, 0x92, 0x0f, 0xa1, 0x15, 0x9f, 0x31, - 0x2e, 0xd0, 0xd7, 0x87, 0x3b, 0x35, 0x0d, 0x37, 0x8d, 0x4d, 0x1f, 0x43, 0xde, 0x07, 0x28, 0x28, - 0xd9, 0x8b, 0xc4, 0x59, 0xd5, 0x04, 0xdb, 0x58, 0x46, 0x2f, 0x12, 0xf2, 0x25, 0x38, 0x05, 0x1c, - 0xb3, 0x0c, 0x85, 0xf4, 0xa7, 0x41, 0x92, 0xa3, 0x8f, 0x97, 0xa9, 0x70, 0xd6, 0xf6, 0xac, 0xbe, - 0x4d, 0x77, 0x0c, 0xee, 0x6a, 0xf8, 0x99, 0x42, 0x87, 0x97, 0xa9, 0x20, 0x0f, 0xe0, 0xbd, 0xc2, - 0x31, 0x4f, 0xa3, 0x40, 0xa2, 0xcf, 0xf0, 0x62, 0xd9, 0x79, 0x5d, 0x3b, 0x17, 0x87, 0x3f, 0xd5, - 0x94, 0x27, 0x78, 0xf1, 0x1f, 0xfe, 0x3c, 0x89, 0x96, 0xfd, 0x37, 0xde, 0xf6, 0x3f, 0x4e, 0xa2, - 0x85, 0xff, 0x22, 0xf0, 0x08, 0x13, 0x94, 0xb8, 0xec, 0x5b, 0x5f, 0x0e, 0xfc, 0x48, 0xc3, 0xa5, - 0x63, 0xef, 0x17, 0x0b, 0xba, 0x2e, 0x63, 0x28, 0x8c, 0xc2, 0x87, 0x9c, 0x9d, 0xc6, 0x67, 0xe4, - 0x0a, 0xac, 0x8b, 0x3c, 0xc1, 0xac, 0x50, 0xd8, 0x6c, 0xc8, 0xa7, 0xb0, 0x5d, 0x5c, 0x22, 0x2f, - 0x99, 0x9f, 0xc9, 0x40, 0x48, 0x5f, 0x66, 0x5a, 0xe6, 0x35, 0xda, 0x31, 0x90, 0x77, 0xc9, 0x46, - 0x0a, 0xf0, 0x32, 0xf2, 0x15, 0xb4, 0x96, 0x72, 0x97, 0x69, 0xb5, 0x9b, 0x07, 0xce, 0xa0, 0xc8, - 0xfc, 0xe0, 0x8d, 0xc4, 0xd2, 0x0a, 0xbb, 0xf7, 0xab, 0x05, 0xad, 0x4a, 0x4c, 0x1f, 0x43, 0x3b, - 0x0c, 0x32, 0x1c, 0x21, 0xcb, 0x62, 0x19, 0x4f, 0xd1, 0xb1, 0xf6, 0xac, 0x7e, 0x83, 0x56, 0x8d, - 0xe4, 0x16, 0x6c, 0x9e, 0x72, 0x11, 0x22, 0xc5, 0x34, 0x89, 0xc3, 0x40, 0xa2, 0x53, 0xd3, 0xb4, - 0x37, 0xac, 0xe4, 0x01, 0xb4, 0x4e, 0x97, 0x4e, 0x77, 0x56, 0xf7, 0xac, 0x7e, 0xf3, 0xe0, 0x46, - 0x19, 0xdc, 0x5b, 0x9a, 0xd0, 0x0a, 0xbf, 0xd7, 0x02, 0xa0, 0x98, 0xf1, 0x64, 0x8a, 0x91, 0x97, - 0xf5, 0x72, 0x58, 0x37, 0xf5, 0xd5, 0x81, 0xd5, 0xe7, 0x38, 0xd3, 0xa1, 0xb5, 0xa8, 0x5a, 0x2a, - 0x29, 0x75, 0x2e, 0x74, 0x1c, 0x2d, 0x6a, 0x36, 0xe4, 0x06, 0x34, 0xe6, 0xf9, 0xd3, 0x57, 0xb7, - 0x68, 0xb9, 0x27, 0x7d, 0xa8, 0xf3, 0xd4, 0x97, 0xb3, 0x14, 0x75, 0xcd, 0x6d, 0x1e, 0x6c, 0x95, - 0x51, 0x1d, 0xa7, 0xde, 0x2c, 0x45, 0xba, 0xc1, 0xf5, 0x6f, 0xef, 0x47, 0x68, 0x78, 0x97, 0xcc, - 0xdc, 0x7c, 0x0b, 0x36, 0x34, 0xcb, 0xe4, 0xac, 0x79, 0xb0, 0x59, 0xd5, 0x99, 0x16, 0x28, 0xb9, - 0x09, 0x76, 0xc8, 0x27, 0x93, 0xb8, 0x48, 0x9d, 0xd5, 0x5f, 0xa3, 0x0d, 0x63, 0xf0, 0x32, 0x72, - 0x1d, 0x1a, 0x65, 0x5a, 0x57, 0x35, 0x56, 0xcf, 0x4c, 0x36, 0x7b, 0x4d, 0xb0, 0xbd, 0x60, 0x9c, - 0xa0, 0xcb, 0x4e, 0x79, 0xef, 0x6f, 0x0b, 0x6c, 0x93, 0x2d, 0xc4, 0x88, 0xdc, 0x01, 0x50, 0x05, - 0x51, 0xb9, 0xbe, 0x5b, 0x5e, 0x3f, 0x8f, 0x90, 0xda, 0xb2, 0x58, 0x65, 0xe4, 0x03, 0x68, 0x8a, - 0x42, 0xbd, 0x45, 0x18, 0x20, 0x4a, 0x41, 0xc9, 0x03, 0x68, 0x47, 0x71, 0x96, 0x9a, 0xc6, 0xf6, - 0xe3, 0xa8, 0xc8, 0xcf, 0xf5, 0xc1, 0xd2, 0xb4, 0x18, 0x1c, 0x95, 0x0c, 0xf7, 0x88, 0xb6, 0x16, - 0x7c, 0x37, 0xd2, 0x05, 0x1c, 0xc8, 0x98, 0x6b, 0x05, 0x6b, 0xd4, 0x6c, 0xc8, 0x67, 0x00, 0x52, - 0xbd, 0xc1, 0x8f, 0xd9, 0x29, 0xd7, 0x3d, 0xd9, 0x3c, 0x20, 0x8b, 0x40, 0xe7, 0xcf, 0xa3, 0xb6, - 0x2c, 0x5f, 0x3a, 0x83, 0x2d, 0x97, 0x49, 0x3c, 0x13, 0xb1, 0x9c, 0x15, 0x85, 0x78, 0x07, 0xb6, - 0x17, 0xa6, 0x73, 0x0c, 0x9f, 0x7f, 0x87, 0x53, 0x4c, 0x74, 0xce, 0x6d, 0xfa, 0x2e, 0x88, 0xdc, - 0x85, 0x9d, 0x43, 0x2e, 0x44, 0x9e, 0xca, 0x98, 0xb3, 0x6f, 0x03, 0x16, 0x25, 0x68, 0x7c, 0x6a, - 0xa6, 0x35, 0xdf, 0x09, 0xf6, 0x7e, 0xdb, 0x80, 0xee, 0xe2, 0x89, 0x14, 0x5f, 0xe4, 0x98, 0xe9, - 0x09, 0x16, 0x26, 0x79, 0x26, 0x8d, 0x2c, 0x96, 0x56, 0xce, 0x2e, 0x2c, 0x6e, 0xa4, 0x84, 0x0b, - 0xcf, 0x03, 0x76, 0x86, 0xa7, 0x88, 0x91, 0x62, 0xd4, 0xde, 0x21, 0xdc, 0x61, 0xc9, 0x50, 0xc2, - 0x2d, 0xf8, 0xc6, 0xff, 0x7f, 0x09, 0x7f, 0x6f, 0x2e, 0x71, 0x96, 0x06, 0x4c, 0xab, 0xdf, 0x3c, - 0xb8, 0x5a, 0x71, 0xd6, 0x32, 0x8f, 0xd2, 0x80, 0x15, 0x32, 0xab, 0x65, 0xa5, 0xf0, 0xd6, 0x2b, - 0x85, 0xa7, 0x0a, 0x36, 0x43, 0x31, 0x35, 0xd1, 0x98, 0x39, 0xd8, 0x30, 0x06, 0x37, 0x22, 0x77, - 0xa1, 0x19, 0x84, 0x4a, 0x38, 0xd3, 0x2f, 0x75, 0xdd, 0x2f, 0xdb, 0x65, 0x4a, 0x1f, 0x6a, 0x4c, - 0xf7, 0x0c, 0x04, 0xe5, 0x9a, 0xdc, 0x87, 0xb6, 0x69, 0x66, 0x3f, 0x34, 0xdd, 0xdf, 0xd0, 0x71, - 0xee, 0x94, 0x7e, 0xff, 0xde, 0xf8, 0xe4, 0x36, 0x74, 0x91, 0x99, 0x17, 0xce, 0x58, 0xe8, 0xa7, - 0x3c, 0x66, 0xd2, 0xb1, 0xf5, 0x8c, 0xd9, 0x32, 0xc0, 0x68, 0xc6, 0xc2, 0x13, 0x65, 0x26, 0x3d, - 0x68, 0x2f, 0x48, 0xea, 0x69, 0xa0, 0x9f, 0xd6, 0xcc, 0xe6, 0x0c, 0x2f, 0x23, 0x03, 0xd8, 0x5e, - 0xe2, 0xc4, 0x4c, 0xa2, 0x98, 0x06, 0x89, 0xd3, 0xd4, 0xcc, 0x6e, 0xc9, 0x74, 0x0b, 0x40, 0xe5, - 0x9f, 0xb3, 0x64, 0xe6, 0x0b, 0xcc, 0x33, 0x74, 0x5a, 0xfa, 0x62, 0x5b, 0x59, 0xa8, 0x32, 0x28, - 0x21, 0xc7, 0x91, 0xf0, 0x27, 0x3c, 0x42, 0xa7, 0xad, 0xc1, 0xfa, 0x38, 0x12, 0x8f, 0x79, 0x84, - 0xe4, 0x0b, 0xb0, 0xe3, 0x79, 0x71, 0x3a, 0x9b, 0xfa, 0xc5, 0xce, 0xd2, 0xbc, 0xab, 0x14, 0x39, - 0x5d, 0x50, 0xd5, 0xac, 0x92, 0xf1, 0x04, 0x7f, 0xe2, 0x0c, 0x9d, 0x2d, 0xa3, 0xff, 0x7c, 0xaf, - 0xfa, 0x0c, 0x53, 0x1e, 0x9e, 0x3b, 0x1d, 0x1d, 0xaf, 0xd9, 0x90, 0x7b, 0x70, 0x8d, 0xe7, 0x32, - 0xcd, 0xa5, 0x2f, 0x82, 0x0b, 0xdf, 0xd4, 0x57, 0xf1, 0x4d, 0xee, 0xea, 0x98, 0xae, 0x18, 0x98, - 0x06, 0x17, 0xa6, 0x14, 0xcd, 0x08, 0x23, 0xb0, 0xa6, 0xe3, 0x26, 0x7b, 0x56, 0x7f, 0x95, 0xea, - 0x35, 0xf9, 0x08, 0xda, 0x6a, 0xb6, 0x04, 0x92, 0x4f, 0xe2, 0x50, 0x05, 0xbe, 0xad, 0x23, 0x68, - 0xc9, 0x4b, 0xf6, 0x70, 0x6e, 0xbb, 0xfd, 0x09, 0x6c, 0x98, 0xc9, 0x48, 0xda, 0x60, 0x9b, 0xd5, - 0x49, 0x2e, 0x3b, 0x2b, 0xa4, 0x03, 0x2d, 0xb3, 0x35, 0x9f, 0xbd, 0x8e, 0x75, 0x5b, 0x00, 0x2c, - 0x8a, 0x82, 0xdc, 0x84, 0x6b, 0x0f, 0x0f, 0x3d, 0xf7, 0xf8, 0x89, 0xef, 0x7d, 0x7f, 0x32, 0xf4, - 0x9f, 0x3e, 0x19, 0x9d, 0x0c, 0x0f, 0xdd, 0x47, 0xee, 0xf0, 0xa8, 0xb3, 0x42, 0x1c, 0xb8, 0xb2, - 0x0c, 0xd2, 0xe1, 0x37, 0xee, 0xc8, 0x1b, 0xd2, 0x8e, 0x45, 0xae, 0x02, 0xa9, 0x22, 0x8f, 0x8f, - 0x9f, 0x0d, 0x3b, 0x35, 0xb2, 0x03, 0xdd, 0xaa, 0x7d, 0x34, 0xf4, 0x3a, 0xeb, 0x5f, 0xdf, 0xff, - 0xfd, 0xd5, 0xae, 0xf5, 0xf2, 0xd5, 0xae, 0xf5, 0xd7, 0xab, 0x5d, 0xeb, 0xe7, 0xd7, 0xbb, 0x2b, - 0x2f, 0x5f, 0xef, 0xae, 0xfc, 0xf1, 0x7a, 0x77, 0xe5, 0x87, 0xbd, 0xb3, 0x58, 0x9e, 0xe7, 0xe3, - 0x41, 0xc8, 0x27, 0xfb, 0x69, 0xcc, 0xce, 0xc2, 0x20, 0xdd, 0x97, 0x71, 0x18, 0x85, 0xfb, 0x45, - 0x5e, 0xc6, 0x1b, 0xfa, 0x8f, 0xd0, 0xe7, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0xce, 0xc1, 0x69, - 0x79, 0x45, 0x09, 0x00, 0x00, + // 1181 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4f, 0x6f, 0x1b, 0x45, + 0x14, 0xcf, 0x3a, 0x7f, 0xec, 0x7d, 0xb6, 0x13, 0x7b, 0x92, 0xb4, 0xdb, 0x16, 0x42, 0x30, 0xa8, + 0x32, 0x15, 0x38, 0x25, 0xb4, 0x20, 0x55, 0xa8, 0x52, 0x49, 0x5c, 0x58, 0x89, 0x36, 0xd1, 0xd8, + 0x2d, 0x82, 0xcb, 0x6a, 0xbd, 0xfb, 0x92, 0x2c, 0x5d, 0xcf, 0x6c, 0x67, 0x67, 0x1d, 0x9b, 0x4f, + 0xc1, 0x89, 0x13, 0x1f, 0x88, 0x63, 0x2f, 0x48, 0xdc, 0x40, 0xed, 0x81, 0xaf, 0x81, 0x66, 0x66, + 0xbd, 0xf6, 0xb6, 0x85, 0x0b, 0x27, 0xcf, 0xbc, 0xdf, 0xef, 0xcd, 0xbc, 0xf9, 0xbd, 0x3f, 0x6b, + 0xd8, 0xc6, 0x09, 0x32, 0x99, 0x8c, 0x0e, 0xf4, 0x6f, 0x2f, 0x11, 0x5c, 0x72, 0x52, 0xcd, 0x8d, + 0xd7, 0x6f, 0x5c, 0xa0, 0x2f, 0xe4, 0x08, 0x7d, 0xc5, 0x28, 0xd6, 0x86, 0xd5, 0xf9, 0xb3, 0x02, + 0x5b, 0x7d, 0x45, 0x7c, 0x18, 0xc5, 0x12, 0x05, 0xcd, 0x62, 0x24, 0x0e, 0x54, 0xc7, 0xbe, 0x0c, + 0x2e, 0x50, 0x38, 0xd6, 0xfe, 0x6a, 0xd7, 0xa6, 0xf3, 0x2d, 0x79, 0x1f, 0x1a, 0xd1, 0x39, 0xe3, + 0x02, 0x3d, 0x7d, 0xb8, 0x53, 0xd1, 0x70, 0xdd, 0xd8, 0xf4, 0x31, 0xe4, 0x5d, 0x80, 0x9c, 0x92, + 0x3e, 0x8f, 0x9d, 0x55, 0x4d, 0xb0, 0x8d, 0x65, 0xf0, 0x3c, 0x26, 0x5f, 0x80, 0x93, 0xc3, 0x11, + 0x4b, 0x51, 0x48, 0x6f, 0xe2, 0xc7, 0x19, 0x7a, 0x38, 0x4d, 0x84, 0xb3, 0xb6, 0x6f, 0x75, 0x6d, + 0xba, 0x6b, 0x70, 0x57, 0xc3, 0x4f, 0x15, 0xda, 0x9f, 0x26, 0x82, 0xdc, 0x87, 0x77, 0x72, 0xc7, + 0x2c, 0x09, 0x7d, 0x89, 0x1e, 0xc3, 0xcb, 0x65, 0xe7, 0x75, 0xed, 0x9c, 0x1f, 0xfe, 0x44, 0x53, + 0x1e, 0xe3, 0xe5, 0x7f, 0xf8, 0xf3, 0x38, 0x5c, 0xf6, 0xdf, 0x78, 0xd3, 0xff, 0x24, 0x0e, 0x17, + 0xfe, 0x8b, 0xc0, 0x43, 0x8c, 0x51, 0xe2, 0xb2, 0x6f, 0x75, 0x39, 0xf0, 0x63, 0x0d, 0x17, 0x8e, + 0x9d, 0x5f, 0x2c, 0x68, 0xbb, 0x8c, 0xa1, 0x30, 0x0a, 0x1f, 0x71, 0x76, 0x16, 0x9d, 0x93, 0x1d, + 0x58, 0x17, 0x59, 0x8c, 0x69, 0xae, 0xb0, 0xd9, 0x90, 0x4f, 0x60, 0x3b, 0xbf, 0x44, 0x4e, 0x99, + 0x97, 0x4a, 0x5f, 0x48, 0x4f, 0xa6, 0x5a, 0xe6, 0x35, 0xda, 0x32, 0xd0, 0x70, 0xca, 0x06, 0x0a, + 0x18, 0xa6, 0xe4, 0x4b, 0x68, 0x2c, 0xe5, 0x2e, 0xd5, 0x6a, 0xd7, 0x0f, 0x9d, 0x5e, 0x9e, 0xf9, + 0xde, 0x6b, 0x89, 0xa5, 0x25, 0x76, 0xe7, 0x57, 0x0b, 0x1a, 0xa5, 0x98, 0x3e, 0x84, 0x66, 0xe0, + 0xa7, 0x38, 0x40, 0x96, 0x46, 0x32, 0x9a, 0xa0, 0x63, 0xed, 0x5b, 0xdd, 0x1a, 0x2d, 0x1b, 0xc9, + 0x4d, 0xd8, 0x3c, 0xe3, 0x22, 0x40, 0x8a, 0x49, 0x1c, 0x05, 0xbe, 0x44, 0xa7, 0xa2, 0x69, 0xaf, + 0x59, 0xc9, 0x7d, 0x68, 0x9c, 0x2d, 0x9d, 0xee, 0xac, 0xee, 0x5b, 0xdd, 0xfa, 0xe1, 0xf5, 0x22, + 0xb8, 0x37, 0x34, 0xa1, 0x25, 0x7e, 0xa7, 0x01, 0x40, 0x31, 0xe5, 0xf1, 0x04, 0xc3, 0x61, 0xda, + 0xc9, 0x60, 0xdd, 0xd4, 0x57, 0x0b, 0x56, 0x9f, 0xe1, 0x4c, 0x87, 0xd6, 0xa0, 0x6a, 0xa9, 0xa4, + 0xd4, 0xb9, 0xd0, 0x71, 0x34, 0xa8, 0xd9, 0x90, 0xeb, 0x50, 0x9b, 0xe7, 0x4f, 0x5f, 0xdd, 0xa0, + 0xc5, 0x9e, 0x74, 0xa1, 0xca, 0x13, 0x4f, 0xce, 0x12, 0xd4, 0x35, 0xb7, 0x79, 0xb8, 0x55, 0x44, + 0x75, 0x92, 0x0c, 0x67, 0x09, 0xd2, 0x0d, 0xae, 0x7f, 0x3b, 0x3f, 0x42, 0x6d, 0x38, 0x65, 0xe6, + 0xe6, 0x9b, 0xb0, 0xa1, 0x59, 0x26, 0x67, 0xf5, 0xc3, 0xcd, 0xb2, 0xce, 0x34, 0x47, 0xc9, 0x0d, + 0xb0, 0x03, 0x3e, 0x1e, 0x47, 0x79, 0xea, 0xac, 0xee, 0x1a, 0xad, 0x19, 0xc3, 0x30, 0x25, 0xd7, + 0xa0, 0x56, 0xa4, 0x75, 0x55, 0x63, 0xd5, 0xd4, 0x64, 0xb3, 0x53, 0x07, 0x7b, 0xe8, 0x8f, 0x62, + 0x74, 0xd9, 0x19, 0xef, 0xfc, 0x6d, 0x81, 0x6d, 0xb2, 0x85, 0x18, 0x92, 0xdb, 0x00, 0xaa, 0x20, + 0x4a, 0xd7, 0xb7, 0x8b, 0xeb, 0xe7, 0x11, 0x52, 0x5b, 0xe6, 0xab, 0x94, 0xbc, 0x07, 0x75, 0x91, + 0xab, 0xb7, 0x08, 0x03, 0x44, 0x21, 0x28, 0xb9, 0x0f, 0xcd, 0x30, 0x4a, 0x13, 0xd3, 0xd8, 0x5e, + 0x14, 0xe6, 0xf9, 0xb9, 0xd6, 0x5b, 0x9a, 0x16, 0xbd, 0xe3, 0x82, 0xe1, 0x1e, 0xd3, 0xc6, 0x82, + 0xef, 0x86, 0xba, 0x80, 0x7d, 0x19, 0x71, 0xad, 0x60, 0x85, 0x9a, 0x0d, 0xf9, 0x14, 0x40, 0xaa, + 0x37, 0x78, 0x11, 0x3b, 0xe3, 0xba, 0x27, 0xeb, 0x87, 0x64, 0x11, 0xe8, 0xfc, 0x79, 0xd4, 0x96, + 0xc5, 0x4b, 0x67, 0xb0, 0xe5, 0x32, 0x89, 0xe7, 0x22, 0x92, 0xb3, 0xbc, 0x10, 0x6f, 0xc3, 0xf6, + 0xc2, 0x74, 0x81, 0xc1, 0xb3, 0x6f, 0x71, 0x82, 0xb1, 0xce, 0xb9, 0x4d, 0xdf, 0x06, 0x91, 0x3b, + 0xb0, 0x7b, 0xc4, 0x85, 0xc8, 0x12, 0x19, 0x71, 0xf6, 0x8d, 0xcf, 0xc2, 0x18, 0x8d, 0x4f, 0xc5, + 0xb4, 0xe6, 0x5b, 0xc1, 0xce, 0xef, 0x1b, 0xd0, 0x5e, 0x3c, 0x91, 0xe2, 0xf3, 0x0c, 0x53, 0x3d, + 0xc1, 0x82, 0x38, 0x4b, 0xa5, 0x91, 0xc5, 0xd2, 0xca, 0xd9, 0xb9, 0xc5, 0x0d, 0x95, 0x70, 0xc1, + 0x85, 0xcf, 0xce, 0xf1, 0x0c, 0x31, 0x54, 0x8c, 0xca, 0x5b, 0x84, 0x3b, 0x2a, 0x18, 0x4a, 0xb8, + 0x05, 0xdf, 0xf8, 0xff, 0x2f, 0xe1, 0xef, 0xce, 0x25, 0x4e, 0x13, 0x9f, 0x69, 0xf5, 0xeb, 0x87, + 0x57, 0x4a, 0xce, 0x5a, 0xe6, 0x41, 0xe2, 0xb3, 0x5c, 0x66, 0xb5, 0x2c, 0x15, 0xde, 0x7a, 0xa9, + 0xf0, 0x54, 0xc1, 0xa6, 0x28, 0x26, 0x26, 0x1a, 0x33, 0x07, 0x6b, 0xc6, 0xe0, 0x86, 0xe4, 0x0e, + 0xd4, 0xfd, 0x40, 0x09, 0x67, 0xfa, 0xa5, 0xaa, 0xfb, 0x65, 0xbb, 0x48, 0xe9, 0x03, 0x8d, 0xe9, + 0x9e, 0x01, 0xbf, 0x58, 0x93, 0x7b, 0xd0, 0x34, 0xcd, 0xec, 0x05, 0xa6, 0xfb, 0x6b, 0x3a, 0xce, + 0xdd, 0xc2, 0xef, 0xdf, 0x1b, 0x9f, 0xdc, 0x82, 0x36, 0x32, 0xf3, 0xc2, 0x19, 0x0b, 0xbc, 0x84, + 0x47, 0x4c, 0x3a, 0xb6, 0x9e, 0x31, 0x5b, 0x06, 0x18, 0xcc, 0x58, 0x70, 0xaa, 0xcc, 0xa4, 0x03, + 0xcd, 0x05, 0x49, 0x3d, 0x0d, 0xf4, 0xd3, 0xea, 0xe9, 0x9c, 0x31, 0x4c, 0x49, 0x0f, 0xb6, 0x97, + 0x38, 0x11, 0x93, 0x28, 0x26, 0x7e, 0xec, 0xd4, 0x35, 0xb3, 0x5d, 0x30, 0xdd, 0x1c, 0x50, 0xf9, + 0xe7, 0x2c, 0x9e, 0x79, 0x02, 0xb3, 0x14, 0x9d, 0x86, 0xbe, 0xd8, 0x56, 0x16, 0xaa, 0x0c, 0x4a, + 0xc8, 0x51, 0x28, 0xbc, 0x31, 0x0f, 0xd1, 0x69, 0x6a, 0xb0, 0x3a, 0x0a, 0xc5, 0x23, 0x1e, 0x22, + 0xf9, 0x1c, 0xec, 0x68, 0x5e, 0x9c, 0xce, 0xa6, 0x7e, 0xb1, 0xb3, 0x34, 0xef, 0x4a, 0x45, 0x4e, + 0x17, 0x54, 0x35, 0xab, 0x64, 0x34, 0xc6, 0x9f, 0x38, 0x43, 0x67, 0xcb, 0xe8, 0x3f, 0xdf, 0xab, + 0x3e, 0xc3, 0x84, 0x07, 0x17, 0x4e, 0x4b, 0xc7, 0x6b, 0x36, 0xe4, 0x2e, 0x5c, 0xe5, 0x99, 0x4c, + 0x32, 0xe9, 0x09, 0xff, 0xd2, 0x33, 0xf5, 0x95, 0x7f, 0x93, 0xdb, 0x3a, 0xa6, 0x1d, 0x03, 0x53, + 0xff, 0xd2, 0x94, 0xa2, 0x19, 0x61, 0x04, 0xd6, 0x74, 0xdc, 0x64, 0xdf, 0xea, 0xae, 0x52, 0xbd, + 0x26, 0x1f, 0x40, 0x53, 0xcd, 0x16, 0x5f, 0xf2, 0x71, 0x14, 0xa8, 0xc0, 0xb7, 0x75, 0x04, 0x0d, + 0x39, 0x65, 0x0f, 0xe6, 0x36, 0xf2, 0x31, 0x90, 0x79, 0x4e, 0x02, 0x9f, 0x79, 0x97, 0x11, 0x0b, + 0xf9, 0xa5, 0xb3, 0xa3, 0xaf, 0x6a, 0xe5, 0x49, 0x09, 0x7c, 0xf6, 0x9d, 0xb6, 0xdf, 0xfa, 0x08, + 0x36, 0xcc, 0x1c, 0x25, 0x4d, 0xb0, 0xcd, 0xea, 0x34, 0x93, 0xad, 0x15, 0xd2, 0x82, 0x86, 0xd9, + 0x9a, 0x8f, 0x64, 0xcb, 0xba, 0x25, 0x00, 0x16, 0x25, 0x44, 0x6e, 0xc0, 0xd5, 0x07, 0x47, 0x43, + 0xf7, 0xe4, 0xb1, 0x37, 0xfc, 0xfe, 0xb4, 0xef, 0x3d, 0x79, 0x3c, 0x38, 0xed, 0x1f, 0xb9, 0x0f, + 0xdd, 0xfe, 0x71, 0x6b, 0x85, 0x38, 0xb0, 0xb3, 0x0c, 0xd2, 0xfe, 0xd7, 0xee, 0x60, 0xd8, 0xa7, + 0x2d, 0x8b, 0x5c, 0x01, 0x52, 0x46, 0x1e, 0x9d, 0x3c, 0xed, 0xb7, 0x2a, 0x64, 0x17, 0xda, 0x65, + 0xfb, 0xa0, 0x3f, 0x6c, 0xad, 0x7f, 0x75, 0xef, 0xb7, 0x97, 0x7b, 0xd6, 0x8b, 0x97, 0x7b, 0xd6, + 0x5f, 0x2f, 0xf7, 0xac, 0x9f, 0x5f, 0xed, 0xad, 0xbc, 0x78, 0xb5, 0xb7, 0xf2, 0xc7, 0xab, 0xbd, + 0x95, 0x1f, 0xf6, 0xcf, 0x23, 0x79, 0x91, 0x8d, 0x7a, 0x01, 0x1f, 0x1f, 0x24, 0x11, 0x3b, 0x0f, + 0xfc, 0xe4, 0x40, 0x46, 0x41, 0x18, 0x1c, 0xe4, 0x59, 0x1c, 0x6d, 0xe8, 0xbf, 0x4d, 0x9f, 0xfd, + 0x13, 0x00, 0x00, 0xff, 0xff, 0x5a, 0xb3, 0xd4, 0x66, 0x73, 0x09, 0x00, 0x00, } func (m *EventFilterRule) Marshal() (dAtA []byte, err error) { @@ -1376,6 +1388,18 @@ func (m *DispatcherRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.EnableScanWindow { + i-- + if m.EnableScanWindow { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xa0 + } if len(m.TxnAtomicity) > 0 { i -= len(m.TxnAtomicity) copy(dAtA[i:], m.TxnAtomicity) @@ -1824,6 +1848,9 @@ func (m *DispatcherRequest) Size() (n int) { if l > 0 { n += 2 + l + sovEvent(uint64(l)) } + if m.EnableScanWindow { + n += 3 + } return n } @@ -3636,6 +3663,26 @@ func (m *DispatcherRequest) Unmarshal(dAtA []byte) error { } m.TxnAtomicity = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 20: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnableScanWindow", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnableScanWindow = bool(v != 0) default: iNdEx = preIndex skippy, err := skipEvent(dAtA[iNdEx:]) diff --git a/eventpb/event.proto b/eventpb/event.proto index ff9b155aef..6e836992d1 100644 --- a/eventpb/event.proto +++ b/eventpb/event.proto @@ -100,4 +100,8 @@ message DispatcherRequest { bool output_raw_change_event = 17; int64 mode = 18; string txn_atomicity = 19; + // enable_scan_window controls whether the event service applies the adaptive + // scan window (memory control + scan interval) for this changefeed. + // It defaults to false so the feature behaves as if it was never introduced. + bool enable_scan_window = 20; } diff --git a/pkg/config/changefeed.go b/pkg/config/changefeed.go index 40d8e27366..2909f2d1d4 100644 --- a/pkg/config/changefeed.go +++ b/pkg/config/changefeed.go @@ -219,6 +219,9 @@ type ChangefeedConfig struct { // redo releated Consistent *ConsistentConfig `toml:"consistent" json:"consistent,omitempty"` EnableTableAcrossNodes bool `toml:"enable-table-across-nodes" json:"enable-table-across-nodes,omitempty"` + // EnableScanWindow controls whether the event service applies the adaptive scan + // window (memory control + adaptive scan interval) for this changefeed. + EnableScanWindow bool `json:"enable_scan_window" default:"false"` } // String implements fmt.Stringer interface, but hide some sensitive information @@ -300,6 +303,7 @@ func (info *ChangeFeedInfo) ToChangefeedConfig() *ChangefeedConfig { TimeZone: GetGlobalServerConfig().TZ, Consistent: info.Config.Consistent, EnableTableAcrossNodes: util.GetOrZero(info.Config.Scheduler.EnableTableAcrossNodes), + EnableScanWindow: util.GetOrZero(info.Config.EnableScanWindow), // other fields are not necessary for dispatcherManager } } diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index c2c54c766c..d4e09a0b06 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -55,6 +55,7 @@ var defaultReplicaConfig = &ReplicaConfig{ SyncPointInterval: util.AddressOf(10 * time.Minute), SyncPointRetention: util.AddressOf(24 * time.Hour), BDRMode: util.AddressOf(false), + EnableScanWindow: util.AddressOf(false), Filter: NewDefaultFilterConfig(), EnableActiveActive: util.AddressOf(false), Mounter: &MounterConfig{ @@ -197,6 +198,11 @@ type replicaConfig struct { EventCollectorBatchCount *int `toml:"event-collector-batch-count" json:"event-collector-batch-count,omitempty"` EventCollectorBatchBytes *int `toml:"event-collector-batch-bytes" json:"event-collector-batch-bytes,omitempty"` + // EnableScanWindow controls whether the event service applies the adaptive scan + // window (memory control + adaptive scan interval) for this changefeed. + // It defaults to false so the feature behaves as if it was never introduced. + EnableScanWindow *bool `toml:"enable-scan-window" json:"enable-scan-window,omitempty"` + // Deprecated: we don't use this field since v8.0.0. SQLMode string `toml:"sql-mode" json:"sql-mode"` } diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 45ceb0fc8c..a4411e9be8 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -428,13 +428,19 @@ type changefeedStatus struct { scanWindowController *adaptiveScanWindowController syncPointInterval time.Duration + + // enableScanWindow controls whether the adaptive scan window (memory control + + // adaptive scan interval) takes effect for this changefeed. When false, the + // feature behaves as if it was never introduced. + enableScanWindow bool } -func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { +func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration, enableScanWindow bool) *changefeedStatus { status := &changefeedStatus{ changefeedID: changefeedID, scanWindowController: newAdaptiveScanWindowController(time.Now()), syncPointInterval: syncPointInterval, + enableScanWindow: enableScanWindow, } status.scanInterval.Store(int64(defaultScanInterval)) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 713ea26e3c..f9c41eac3c 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -448,7 +448,7 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang } } - if dataRange.CommitTsEnd <= dataRange.CommitTsStart && hasPendingDDLEventInCurrentRange { + if task.changefeedStat.enableScanWindow && dataRange.CommitTsEnd <= dataRange.CommitTsStart && hasPendingDDLEventInCurrentRange { // Global scan window base can be pinned by other lagging dispatchers. // For a table with pending ddl in current range, use a local bounded step to keep // this dispatcher making forward progress, so barrier coverage can eventually complete. @@ -472,10 +472,15 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang if dataRange.CommitTsEnd <= dataRange.CommitTsStart { updateMetricEventServiceSkipResolvedTsCount(task.info.GetMode()) - // Scan range can become empty after applying capping (for example, scan window). - // Send a signal resolved-ts event (rate limited) to keep downstream responsive, - // but do not advance the watermark here. - c.sendSignalResolvedTs(task) + // Scan range can become empty after applying scan-window capping. In that case send a + // signal resolved-ts event (rate limited) to keep downstream responsive, but do not + // advance the watermark here. + // When the scan window is disabled, an empty range can only come from the pre-existing + // DDL-state capping, where the baseline behavior is to just return without sending a + // signal. Gate the signal on enableScanWindow to keep that parity. + if task.changefeedStat.enableScanWindow { + c.sendSignalResolvedTs(task) + } return false, common.DataRange{} } @@ -1251,14 +1256,18 @@ func (c *eventBroker) getOrSetChangefeedStatus(info DispatcherInfo) *changefeedS zap.Error(err)) } - status := newChangefeedStatus(changefeedID, info.GetSyncPointInterval()) + status := newChangefeedStatus(changefeedID, info.GetSyncPointInterval(), info.GetEnableScanWindow()) status.filter = changefeedFilter actual, loaded := c.changefeedMap.LoadOrStore(changefeedID, status) if loaded { return actual.(*changefeedStatus) } log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) - initializeScanWindowMetrics(changefeedID.String()) + // Only emit scan-window metrics when the feature is enabled, so a changefeed + // running with the feature off stays silent on the scan-window dashboards. + if status.enableScanWindow { + initializeScanWindowMetrics(changefeedID.String()) + } return status } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index da6a03b9cf..0abdecb5e1 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -951,3 +951,60 @@ func TestAddDispatcherFailure(t *testing.T) { _, ok := broker.changefeedMap.Load(dispInfo.GetChangefeedID()) require.False(t, ok, "changefeedStatus should be removed after failed registration") } + +// TestGetScanTaskDataRangeEmptySignalGatedByScanWindow verifies gate 6 of the +// enable-scan-window switch (see ai_context/switch_4030_4460_4950.md): when the scan +// range becomes empty due to the pre-existing DDL-state capping, the rate-limited +// signal resolved-ts is emitted only when the scan window is enabled. With the +// feature off the behavior matches the baseline (no signal is sent). +func TestGetScanTaskDataRangeEmptySignalGatedByScanWindow(t *testing.T) { + cases := []struct { + name string + enableScanWindow bool + wantSignal bool + }{ + {"enabled sends signal", true, true}, + {"disabled stays silent", false, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + broker, _, ss, _ := newEventBrokerForTest() + // Close the broker so the send-message workers stop; emitted messages then + // stay buffered in messageCh and can be observed by length. + broker.close() + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + // syncPointInterval=0 keeps emitSyncPointEventIfNeeded a no-op. + status := newChangefeedStatus(info.GetChangefeedID(), 0, tc.enableScanWindow) + disp := newDispatcherStat(info, 1, 1, nil, status) + disp.seq.Store(1) + + baseTime := time.Now() + commitStart := oracle.GoTimeToTS(baseTime.Add(20 * time.Second)) + disp.sentResolvedTs.Store(oracle.GoTimeToTS(baseTime)) + disp.receivedResolvedTs.Store(oracle.GoTimeToTS(baseTime.Add(40 * time.Second))) + disp.eventStoreCommitTs.Store(commitStart) + disp.lastScannedCommitTs.Store(commitStart) // CommitTsStart + disp.lastScannedStartTs.Store(0) // allow the signal resolved-ts + // Make sure the rate limiter does not suppress the signal. + disp.lastSentResolvedTsTime.Store(baseTime.Add(-time.Hour)) + + // DDL-state ResolvedTs caps CommitTsEnd down to CommitTsStart, producing an empty + // range regardless of the scan window. maxDDLCommitTs <= CommitTsStart avoids the + // pending-ddl local advance branch. + ss.resolvedTs = commitStart + ss.maxDDLCommitTs = commitStart + + needScan, _ := broker.getScanTaskDataRange(disp) + require.False(t, needScan) + + got := len(broker.messageCh[disp.messageWorkerIndex]) + if tc.wantSignal { + require.Equal(t, 1, got, "expected a signal resolved-ts message when scan window is enabled") + } else { + require.Equal(t, 0, got, "expected no message when scan window is disabled") + } + }) + } +} diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 7a76adeae9..77b0a4e9d9 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -60,7 +60,7 @@ func (m *mockMounter) DecodeToChunk(rawKV *common.RawKVEntry, tableInfo *common. func TestEventScannerReturnsIteratorErrors(t *testing.T) { disInfo := newMockDispatcherInfoForTest(t) - changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0) + changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0, true) disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) makeDispatcherReady(disp) diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index efea08ea24..9532ad598d 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -53,6 +53,10 @@ type DispatcherInfo interface { GetSyncPointTs() uint64 GetSyncPointInterval() time.Duration + // GetEnableScanWindow reports whether the adaptive scan window feature is + // enabled for this changefeed. + GetEnableScanWindow() bool + IsOnlyReuse() bool GetBdrMode() bool GetIntegrity() *integrity.Config diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index a64a00d5d6..86c2ffb553 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -398,6 +398,7 @@ type mockDispatcherInfo struct { enableSyncPoint bool nextSyncPoint uint64 syncPointInterval time.Duration + enableScanWindow bool } func newMockDispatcherInfo(t *testing.T, startTs uint64, dispatcherID common.DispatcherID, tableID int64, actionType eventpb.ActionType) *mockDispatcherInfo { @@ -421,6 +422,8 @@ func newMockDispatcherInfo(t *testing.T, startTs uint64, dispatcherID common.Dis }, bdrMode: false, integrity: config.GetDefaultReplicaConfig().Integrity, + // Tests exercise the scan window behavior, so enable it by default here. + enableScanWindow: true, } } @@ -472,6 +475,10 @@ func (m *mockDispatcherInfo) GetSyncPointInterval() time.Duration { return m.syncPointInterval } +func (m *mockDispatcherInfo) GetEnableScanWindow() bool { + return m.enableScanWindow +} + func (m *mockDispatcherInfo) IsOnlyReuse() bool { return false } @@ -503,7 +510,7 @@ func (m *mockDispatcherInfo) GetTxnAtomicity() config.AtomicityLevel { func newChangefeedStatusForTest(t testing.TB, info DispatcherInfo) *changefeedStatus { t.Helper() - status := newChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + status := newChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval(), info.GetEnableScanWindow()) status.filter = newChangefeedFilterForTest(t, info, time.UTC.String()) return status } @@ -516,7 +523,7 @@ func addChangefeedStatusToBrokerForTest( ) *changefeedStatus { t.Helper() - status := newChangefeedStatus(changefeedID, syncPointInterval) + status := newChangefeedStatus(changefeedID, syncPointInterval, true) broker.changefeedMap.Store(changefeedID, status) return status } diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 5bcb8e16c9..8c629475a5 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -296,6 +296,11 @@ func (w *memoryUsageWindow) pruneLocked(now time.Time) { } func (c *changefeedStatus) updateMemoryUsage(now time.Time, usageRatio float64, memoryReleaseCount uint32) { + // When the scan window feature is disabled, keep the scan interval untouched so + // the changefeed behaves as if the adaptive scan window was never introduced. + if !c.enableScanWindow { + return + } if c.scanWindowController == nil { return } @@ -811,6 +816,13 @@ func (c *changefeedStatus) maxScanInterval() time.Duration { } func (c *changefeedStatus) refreshMinSentResolvedTs() { + // When the scan window feature is disabled, minSentTs is never consumed + // (getScanMaxTs returns 0 unconditionally), so skip the per-changefeed scan and + // the base-ts gauge update entirely. This keeps the feature fully inert and + // avoids emitting scan-window metrics when it is off. + if !c.enableScanWindow { + return + } now := time.Now() minSentResolvedTs := ^uint64(0) minSentResolvedTsWithStale := ^uint64(0) @@ -854,6 +866,11 @@ func (c *changefeedStatus) refreshMinSentResolvedTs() { } func (c *changefeedStatus) getScanMaxTs() uint64 { + // When the scan window feature is disabled, return 0 so that the scan range is + // never capped, behaving as if the scan window was never introduced. + if !c.enableScanWindow { + return 0 + } baseTs := c.minSentTs.Load() if baseTs == 0 { return 0 diff --git a/pkg/eventservice/scan_window_test.go b/pkg/eventservice/scan_window_test.go index 712fe9d789..56f050088b 100644 --- a/pkg/eventservice/scan_window_test.go +++ b/pkg/eventservice/scan_window_test.go @@ -37,7 +37,7 @@ func markScanWindowReadyForDecrease(status *changefeedStatus, now time.Time) { func TestAdjustScanIntervalLowPressureSlowsRecoveryForLargeWindow(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) now := time.Now() markScanWindowReadyForIncrease(status, now) @@ -52,7 +52,7 @@ func TestAdjustScanIntervalLowPressureSlowsRecoveryForLargeWindow(t *testing.T) func TestAdjustScanIntervalVeryLowPressureSlowsRecoveryForVeryLargeWindow(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) now := time.Now() markScanWindowReadyForIncrease(status, now) @@ -67,7 +67,7 @@ func TestAdjustScanIntervalVeryLowPressureSlowsRecoveryForVeryLargeWindow(t *tes func TestAdjustScanIntervalHighPressureUsesBoundedReduction(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) now := time.Now() markScanWindowReadyForDecrease(status, now) @@ -79,7 +79,7 @@ func TestAdjustScanIntervalHighPressureUsesBoundedReduction(t *testing.T) { func TestAdjustScanIntervalDoesNotKeepReducingAfterTransientHighPressure(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute, true) changefeed := status.changefeedID.String() t.Cleanup(func() { deleteScanWindowMetrics(changefeed) @@ -101,7 +101,7 @@ func TestAdjustScanIntervalDoesNotKeepReducingAfterTransientHighPressure(t *test func TestAdjustScanIntervalCriticalPressure(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) status.scanInterval.Store(int64(40 * time.Second)) status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) require.Equal(t, int64(scanWindowEmergencyBrakePlateauInterval), status.scanInterval.Load()) @@ -110,7 +110,7 @@ func TestAdjustScanIntervalCriticalPressure(t *testing.T) { func TestAdjustScanIntervalCriticalPressureIgnoresLowPressureHistory(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 10*time.Minute, true) changefeed := status.changefeedID.String() t.Cleanup(func() { deleteScanWindowMetrics(changefeed) @@ -130,7 +130,7 @@ func TestAdjustScanIntervalCriticalPressureIgnoresLowPressureHistory(t *testing. func TestAdjustScanIntervalCriticalPressureUsesDefaultFloor(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) status.scanInterval.Store(int64(8 * time.Second)) status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 0.95, 0) require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) @@ -139,7 +139,7 @@ func TestAdjustScanIntervalCriticalPressureUsesDefaultFloor(t *testing.T) { func TestAdjustScanIntervalHighPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) now := time.Now() markScanWindowReadyForDecrease(status, now) @@ -151,7 +151,7 @@ func TestAdjustScanIntervalHighPressureDoesNotIncreaseBelowDefaultFloor(t *testi func TestAdjustScanIntervalCriticalPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) status.scanInterval.Store(int64(2 * time.Second)) status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 0.95, 0) require.Equal(t, int64(2*time.Second), status.scanInterval.Load()) @@ -160,7 +160,7 @@ func TestAdjustScanIntervalCriticalPressureDoesNotIncreaseBelowDefaultFloor(t *t func TestAdjustScanIntervalEmergencyPressureUsesModerateBrakeForSmallWindow(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) status.scanInterval.Store(int64(20 * time.Second)) status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) require.Equal(t, int64(10*time.Second), status.scanInterval.Load()) @@ -183,7 +183,7 @@ func TestScanWindowEmergencyBrakeIntervalUsesStrongBrakeForLargeWindow(t *testin func TestAdjustScanIntervalEmergencyPressureUsesDefaultFloorForVerySmallWindow(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) status.scanInterval.Store(int64(8 * time.Second)) status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) @@ -192,7 +192,7 @@ func TestAdjustScanIntervalEmergencyPressureUsesDefaultFloorForVerySmallWindow(t func TestAdjustScanIntervalEmergencyPressureDoesNotImmediatelyDropBelowDefaultFloor(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) status.scanInterval.Store(int64(defaultScanInterval)) status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) @@ -201,7 +201,7 @@ func TestAdjustScanIntervalEmergencyPressureDoesNotImmediatelyDropBelowDefaultFl func TestAdjustScanIntervalEmergencyPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) status.scanInterval.Store(int64(2 * time.Second)) status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) require.Equal(t, int64(2*time.Second), status.scanInterval.Load()) @@ -210,7 +210,7 @@ func TestAdjustScanIntervalEmergencyPressureDoesNotIncreaseBelowDefaultFloor(t * func TestAdjustScanIntervalEmergencyPressureCanReachMinFloorWhenSustained(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) status.scanInterval.Store(int64(defaultScanInterval)) start := time.Now() @@ -224,7 +224,7 @@ func TestAdjustScanIntervalEmergencyPressureCanReachMinFloorWhenSustained(t *tes func TestAdjustScanIntervalRecoversFromFloorBeforeNormalIncreaseCooldown(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute, true) now := time.Now() status.scanInterval.Store(int64(defaultScanInterval)) status.scanWindowController.setLastAdjustTimeForTest(now.Add(-scanWindowFloorRecoveryCooldown - time.Second)) @@ -239,7 +239,7 @@ func TestAdjustScanIntervalRecoversFromFloorBeforeNormalIncreaseCooldown(t *test func TestUpdateMemoryUsageDoesNotResetScanIntervalOnMemoryRelease(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) now := time.Now() status.scanInterval.Store(int64(40 * time.Second)) @@ -248,7 +248,7 @@ func TestUpdateMemoryUsageDoesNotResetScanIntervalOnMemoryRelease(t *testing.T) } func TestUpdateMemoryUsageRecordsScanWindowObservationMetrics(t *testing.T) { - status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute, true) changefeed := status.changefeedID.String() t.Cleanup(func() { deleteScanWindowMetrics(changefeed) @@ -272,7 +272,7 @@ func TestUpdateMemoryUsageRecordsScanWindowObservationMetrics(t *testing.T) { } func TestUpdateMemoryUsageRecordsScanWindowAdjustCount(t *testing.T) { - status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute, true) changefeed := status.changefeedID.String() t.Cleanup(func() { deleteScanWindowMetrics(changefeed) @@ -289,7 +289,7 @@ func TestUpdateMemoryUsageRecordsScanWindowAdjustCount(t *testing.T) { } func TestUpdateMemoryUsageRecordsScanWindowTargetBandMetrics(t *testing.T) { - status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 10*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 10*time.Minute, true) changefeed := status.changefeedID.String() t.Cleanup(func() { deleteScanWindowMetrics(changefeed) @@ -311,7 +311,7 @@ func TestUpdateMemoryUsageRecordsScanWindowTargetBandMetrics(t *testing.T) { func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) start := time.Now() markScanWindowReadyForIncrease(status, start) @@ -330,7 +330,7 @@ func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { func TestAdjustScanIntervalReducesOnSustainedPressure(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) now := time.Now() markScanWindowReadyForDecrease(status, now) @@ -345,7 +345,7 @@ func TestAdjustScanIntervalReducesOnSustainedPressure(t *testing.T) { func TestAdjustScanIntervalSustainedPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) now := time.Now() markScanWindowReadyForDecrease(status, now) @@ -360,7 +360,7 @@ func TestAdjustScanIntervalSustainedPressureDoesNotIncreaseBelowDefaultFloor(t * func TestAdjustScanIntervalDoesNotIncreaseBeforeCooldown(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) now := time.Now() status.scanInterval.Store(int64(40 * time.Second)) @@ -373,7 +373,7 @@ func TestAdjustScanIntervalDoesNotIncreaseBeforeCooldown(t *testing.T) { func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) stale := &dispatcherStat{} stale.seq.Store(1) @@ -433,7 +433,7 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) stale := &dispatcherStat{} stale.seq.Store(1) @@ -451,7 +451,7 @@ func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { func TestGetScanMaxTsFallbackInterval(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute, true) baseTime := time.Unix(1234, 0) baseTs := oracle.GoTimeToTS(baseTime) @@ -466,3 +466,31 @@ func TestGetScanMaxTsFallbackInterval(t *testing.T) { status.minSentTs.Store(0) require.Equal(t, uint64(0), status.getScanMaxTs()) } + +// TestScanWindowDisabledIsNoOp verifies that when the scan window feature is +// disabled (the default), getScanMaxTs never caps the range and updateMemoryUsage +// keeps the scan interval untouched, so the changefeed behaves as if the feature +// was never introduced. +func TestScanWindowDisabledIsNoOp(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute, false) + now := time.Now() + + // Even with a non-zero base ts, the scan range must not be capped. + status.minSentTs.Store(oracle.GoTimeToTS(now)) + require.Equal(t, uint64(0), status.getScanMaxTs()) + + // Memory usage reports must not adjust the scan interval. + status.scanInterval.Store(int64(40 * time.Second)) + markScanWindowReadyForDecrease(status, now) + status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 1.0, 0) + require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) + + // refreshMinSentResolvedTs must be a no-op: it leaves minSentTs untouched + // (an enabled status would recompute it to 0 here since there is no dispatcher). + sentinel := oracle.GoTimeToTS(now) + status.minSentTs.Store(sentinel) + status.refreshMinSentResolvedTs() + require.Equal(t, sentinel, status.minSentTs.Load()) +} diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index eb891fa13d..64d981b1fa 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -259,6 +259,17 @@ type AreaSettings struct { // control how to batch events batchConfig batchConfig + + // releaseMemoryRatio overrides the fraction of pending memory released on each + // release cycle. A value <= 0 falls back to defaultReleaseMemoryRatio, keeping the + // pre-scan-window behavior. + releaseMemoryRatio float64 +} + +// SetReleaseMemoryRatio overrides the fraction of pending memory the area releases on +// each release cycle. A value <= 0 keeps the default (defaultReleaseMemoryRatio). +func (s *AreaSettings) SetReleaseMemoryRatio(ratio float64) { + s.releaseMemoryRatio = ratio } func (s *AreaSettings) fix() { diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index 93af18b8ed..5282ea1e4c 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -33,7 +33,14 @@ const ( // For now, we only use it in event collector. MemoryControlForEventCollector = 1 - defaultReleaseMemoryRatio = 0.6 + // defaultReleaseMemoryRatio is the fraction of an area's pending memory released on + // each release cycle when the area does not override it (i.e. the adaptive scan + // window is disabled). It matches the pre-scan-window behavior. + defaultReleaseMemoryRatio = 0.4 + // deadlockMemoryHighWaterMark is the memory usage ratio above which a stalled area + // is treated as deadlocked. It is independent of the release ratio and stays the + // same regardless of whether the scan window is enabled. + deadlockMemoryHighWaterMark = 0.6 defaultDeadlockDuration = 5 * time.Second defaultReleaseMemoryThreshold = 256 ) @@ -171,7 +178,7 @@ func (as *areaMemStat[A, P, T, D, H]) checkDeadlock() bool { hasEventComeButNotOut := time.Since(as.lastAppendEventTime.Load().(time.Time)) < defaultDeadlockDuration && time.Since(as.lastSizeDecreaseTime.Load().(time.Time)) > defaultDeadlockDuration - memoryHighWaterMark := as.memoryUsageRatio() > defaultReleaseMemoryRatio + memoryHighWaterMark := as.memoryUsageRatio() > deadlockMemoryHighWaterMark return hasEventComeButNotOut && memoryHighWaterMark } @@ -193,11 +200,12 @@ func (as *areaMemStat[A, P, T, D, H]) releaseMemory() { return paths[i].lastHandleEventTs.Load() > paths[j].lastHandleEventTs.Load() }) - sizeToRelease := int64(float64(as.totalPendingSize.Load()) * defaultReleaseMemoryRatio) + releaseRatio := as.releaseMemoryRatio() + sizeToRelease := int64(float64(as.totalPendingSize.Load()) * releaseRatio) releasedSize := int64(0) releasedPaths := make([]*pathInfo[A, P, T, D, H], 0) - log.Info("release memory", zap.Any("area", as.area), zap.Int64("sizeToRelease", sizeToRelease), zap.Int64("totalPendingSize", as.totalPendingSize.Load()), zap.Float64("releaseMemoryRatio", defaultReleaseMemoryRatio)) + log.Info("release memory", zap.Any("area", as.area), zap.Int64("sizeToRelease", sizeToRelease), zap.Int64("totalPendingSize", as.totalPendingSize.Load()), zap.Float64("releaseMemoryRatio", releaseRatio)) for _, path := range paths { // Only release path that is blocking and has pending size larger than the threshold. @@ -227,6 +235,17 @@ func (as *areaMemStat[A, P, T, D, H]) memoryUsageRatio() float64 { return float64(as.totalPendingSize.Load()) / float64(as.settings.Load().maxPendingSize) } +// releaseMemoryRatio returns the fraction of pending memory to release on each cycle. +// It honors the per-area override (set when the scan window is enabled) and falls back +// to defaultReleaseMemoryRatio when unset, so a non-overriding area keeps the +// pre-scan-window behavior. +func (as *areaMemStat[A, P, T, D, H]) releaseMemoryRatio() float64 { + if ratio := as.settings.Load().releaseMemoryRatio; ratio > 0 { + return ratio + } + return defaultReleaseMemoryRatio +} + func (as *areaMemStat[A, P, T, D, H]) updateAreaPauseState(path *pathInfo[A, P, T, D, H]) { pause, resume, memoryUsageRatio := as.algorithm.ShouldPauseArea( as.paused.Load(),