diff --git a/go.mod b/go.mod index 3264e2b11b..ca663b9a21 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.21 github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20251109100001-1907922fbd18 + github.com/pingcap/kvproto v0.0.0-20260622153037-f3bb7de680cd github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 github.com/pingcap/tidb v1.1.0-beta.0.20251121075944-8f2630e53d5d diff --git a/go.sum b/go.sum index 1cf2cfe1ce..c49192a9e5 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20251109100001-1907922fbd18 h1:ZgebBNwgma8INCfexAX8dfqZo7TWQrvMXHGABEmAY2Y= -github.com/pingcap/kvproto v0.0.0-20251109100001-1907922fbd18/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20260622153037-f3bb7de680cd h1:PYIS5Hp5df4fSrSJ+76yjJnQGdt4ODmE9EHMNX2K+R8= +github.com/pingcap/kvproto v0.0.0-20260622153037-f3bb7de680cd/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/logservice/logpuller/priority_task.go b/logservice/logpuller/priority_task.go index 68ef9e885d..5dcbd02faf 100644 --- a/logservice/logpuller/priority_task.go +++ b/logservice/logpuller/priority_task.go @@ -17,6 +17,7 @@ import ( "fmt" "time" + "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/tikv/client-go/v2/oracle" ) @@ -42,6 +43,31 @@ func (t TaskType) String() string { return fmt.Sprintf("%d", t) } +func (t TaskType) scanPriority() cdcpb.ScanPriority { + switch t { + case TaskHighPrior: + return cdcpb.ScanPriority_SCAN_PRIORITY_HIGH + case TaskLowPrior: + return cdcpb.ScanPriority_SCAN_PRIORITY_LOW + default: + return cdcpb.ScanPriority_SCAN_PRIORITY_LOW + } +} + +func taskTypeFromScanPriority(priority cdcpb.ScanPriority) TaskType { + if priority == cdcpb.ScanPriority_SCAN_PRIORITY_HIGH { + return TaskHighPrior + } + return TaskLowPrior +} + +func normalizeScanPriority(priority cdcpb.ScanPriority) cdcpb.ScanPriority { + if priority == cdcpb.ScanPriority_SCAN_PRIORITY_HIGH { + return cdcpb.ScanPriority_SCAN_PRIORITY_HIGH + } + return cdcpb.ScanPriority_SCAN_PRIORITY_LOW +} + // PriorityTask is the interface for priority-based tasks // It implements heap.Item interface type PriorityTask interface { diff --git a/logservice/logpuller/priority_task_test.go b/logservice/logpuller/priority_task_test.go index 5b1b373afb..2b8e3cde11 100644 --- a/logservice/logpuller/priority_task_test.go +++ b/logservice/logpuller/priority_task_test.go @@ -17,10 +17,20 @@ import ( "testing" "time" + "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) +func TestTaskTypeScanPriorityMapping(t *testing.T) { + require.Equal(t, cdcpb.ScanPriority_SCAN_PRIORITY_HIGH, TaskHighPrior.scanPriority()) + require.Equal(t, cdcpb.ScanPriority_SCAN_PRIORITY_LOW, TaskLowPrior.scanPriority()) + require.Equal(t, TaskHighPrior, taskTypeFromScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_HIGH)) + require.Equal(t, TaskLowPrior, taskTypeFromScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_LOW)) + require.Equal(t, TaskLowPrior, taskTypeFromScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_UNKNOWN)) + require.Equal(t, cdcpb.ScanPriority_SCAN_PRIORITY_LOW, normalizeScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_UNKNOWN)) +} + // TestPriorityCalculationLogic tests the priority calculation logic in isolation func TestPriorityCalculationLogic(t *testing.T) { currentTime := time.Now() diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 2049335446..309a732522 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -458,6 +458,7 @@ func (s *regionRequestWorker) createRegionRequest(region regionInfo) *cdcpb.Chan EndKey: region.span.EndKey, ExtraOp: kvrpcpb.ExtraOp_ReadOldValue, FilterLoop: region.filterLoop, + ScanPriority: normalizeScanPriority(region.scanPriority), } } diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index f9752a0eb3..a7231da794 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -57,6 +57,42 @@ func prepareRegionForSendTest(region regionInfo) regionInfo { return region } +func TestCreateRegionRequestScanPriority(t *testing.T) { + worker := ®ionRequestWorker{ + client: &subscriptionClient{clusterID: 1}, + } + + for _, tc := range []struct { + name string + priority cdcpb.ScanPriority + expected cdcpb.ScanPriority + }{ + { + name: "high", + priority: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH, + expected: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH, + }, + { + name: "low", + priority: cdcpb.ScanPriority_SCAN_PRIORITY_LOW, + expected: cdcpb.ScanPriority_SCAN_PRIORITY_LOW, + }, + { + name: "unknown defaults to low", + priority: cdcpb.ScanPriority_SCAN_PRIORITY_UNKNOWN, + expected: cdcpb.ScanPriority_SCAN_PRIORITY_LOW, + }, + } { + t.Run(tc.name, func(t *testing.T) { + region := prepareRegionForSendTest(createTestRegionInfo(1, 1)) + region.scanPriority = tc.priority + + req := worker.createRegionRequest(region) + require.Equal(t, tc.expected, req.GetScanPriority()) + }) + } +} + func TestRegionStatesOperation(t *testing.T) { worker := ®ionRequestWorker{} worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index e9c21a7aad..40e98bdb26 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -16,6 +16,7 @@ package logpuller import ( "sync" + "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/tikv/client-go/v2/tikv" @@ -46,6 +47,9 @@ type regionInfo struct { // Whether to filter out the value write by cdc itself. // It should be `true` in BDR mode filterLoop bool + // scanPriority is sent to TiKV/CSE so remote incremental scan admission can + // preserve TiCDC's business priority across retries. + scanPriority cdcpb.ScanPriority } func (s *regionInfo) isStopped() bool { @@ -66,6 +70,7 @@ func newRegionInfo( rpcCtx: rpcCtx, subscribedSpan: subscribedSpan, filterLoop: filterLoop, + scanPriority: TaskLowPrior.scanPriority(), } } diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 1aa199bc9a..ad7bea6a5c 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -796,6 +796,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( // scheduleRegionRequest locks the region's range and send the region to regionTaskQueue, // which will be handled by handleRegions. func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) { + region.scanPriority = priority.scanPriority() lockRangeResult := region.subscribedSpan.rangeLock.LockRange( ctx, region.span.StartKey, region.span.EndKey, region.verID.GetID(), region.verID.GetVer()) @@ -872,12 +873,12 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr } if innerErr.GetCongested() != nil { metricKvCongestedCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) + s.scheduleRegionRequest(ctx, errInfo.regionInfo, taskTypeFromScanPriority(errInfo.scanPriority)) return nil } if innerErr.GetServerIsBusy() != nil { metricKvIsBusyCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) + s.scheduleRegionRequest(ctx, errInfo.regionInfo, taskTypeFromScanPriority(errInfo.scanPriority)) return nil } if duplicated := innerErr.GetDuplicateRequest(); duplicated != nil { diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index be5ff52675..495a50dbc1 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/pingcap/ticdc/pkg/common" @@ -226,6 +227,69 @@ func TestOnRegionFailQueuesCanceledErrorCache(t *testing.T) { require.NotContains(t, client.totalSpans.spanMap, span.subID) } +func TestBusyRetryPreservesScanPriority(t *testing.T) { + for _, tc := range []struct { + name string + priority cdcpb.ScanPriority + cdcErr *cdcpb.Error + expected TaskType + }{ + { + name: "server is busy high", + priority: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH, + cdcErr: &cdcpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}, + expected: TaskHighPrior, + }, + { + name: "server is busy low", + priority: cdcpb.ScanPriority_SCAN_PRIORITY_LOW, + cdcErr: &cdcpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}, + expected: TaskLowPrior, + }, + { + name: "congested high", + priority: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH, + cdcErr: &cdcpb.Error{Congested: &cdcpb.Congested{}}, + expected: TaskHighPrior, + }, + { + name: "congested low", + priority: cdcpb.ScanPriority_SCAN_PRIORITY_LOW, + cdcErr: &cdcpb.Error{Congested: &cdcpb.Congested{}}, + expected: TaskLowPrior, + }, + } { + t.Run(tc.name, func(t *testing.T) { + client := &subscriptionClient{ + regionTaskQueue: NewPriorityQueue(), + } + client.pdClock = pdutil.NewClock4Test() + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte("a"), + EndKey: []byte("z"), + } + span := &subscribedSpan{ + subID: SubscriptionID(1), + span: rawSpan, + rangeLock: regionlock.NewRangeLock(1, rawSpan.StartKey, rawSpan.EndKey, 100), + } + region := newRegionInfo(tikv.NewRegionVerID(1, 1, 1), rawSpan, nil, span, false) + region.scanPriority = tc.priority + + err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &eventError{err: tc.cdcErr})) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + task, err := client.regionTaskQueue.Pop(ctx) + require.NoError(t, err) + require.Equal(t, tc.expected, task.(*regionPriorityTask).taskType) + require.Equal(t, tc.priority, task.GetRegionInfo().scanPriority) + }) + } +} + type mockDynamicStream struct{} func (s *mockDynamicStream) Start() {}