Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.21
github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20251109100001-1907922fbd18
github.com/pingcap/kvproto v0.0.0-20260622153037-f3bb7de680cd
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb v1.1.0-beta.0.20251121075944-8f2630e53d5d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20251109100001-1907922fbd18 h1:ZgebBNwgma8INCfexAX8dfqZo7TWQrvMXHGABEmAY2Y=
github.com/pingcap/kvproto v0.0.0-20251109100001-1907922fbd18/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20260622153037-f3bb7de680cd h1:PYIS5Hp5df4fSrSJ+76yjJnQGdt4ODmE9EHMNX2K+R8=
github.com/pingcap/kvproto v0.0.0-20260622153037-f3bb7de680cd/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
26 changes: 26 additions & 0 deletions logservice/logpuller/priority_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"time"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/tikv/client-go/v2/oracle"
)

Expand All @@ -42,6 +43,31 @@ func (t TaskType) String() string {
return fmt.Sprintf("%d", t)
}

func (t TaskType) scanPriority() cdcpb.ScanPriority {
switch t {
case TaskHighPrior:
return cdcpb.ScanPriority_SCAN_PRIORITY_HIGH
case TaskLowPrior:
return cdcpb.ScanPriority_SCAN_PRIORITY_LOW
default:
return cdcpb.ScanPriority_SCAN_PRIORITY_LOW
}
}

func taskTypeFromScanPriority(priority cdcpb.ScanPriority) TaskType {
if priority == cdcpb.ScanPriority_SCAN_PRIORITY_HIGH {
return TaskHighPrior
}
return TaskLowPrior
}

func normalizeScanPriority(priority cdcpb.ScanPriority) cdcpb.ScanPriority {
if priority == cdcpb.ScanPriority_SCAN_PRIORITY_HIGH {
return cdcpb.ScanPriority_SCAN_PRIORITY_HIGH
}
return cdcpb.ScanPriority_SCAN_PRIORITY_LOW
}

// PriorityTask is the interface for priority-based tasks
// It implements heap.Item interface
type PriorityTask interface {
Expand Down
10 changes: 10 additions & 0 deletions logservice/logpuller/priority_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ import (
"testing"
"time"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestTaskTypeScanPriorityMapping(t *testing.T) {
require.Equal(t, cdcpb.ScanPriority_SCAN_PRIORITY_HIGH, TaskHighPrior.scanPriority())
require.Equal(t, cdcpb.ScanPriority_SCAN_PRIORITY_LOW, TaskLowPrior.scanPriority())
require.Equal(t, TaskHighPrior, taskTypeFromScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_HIGH))
require.Equal(t, TaskLowPrior, taskTypeFromScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_LOW))
require.Equal(t, TaskLowPrior, taskTypeFromScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_UNKNOWN))
require.Equal(t, cdcpb.ScanPriority_SCAN_PRIORITY_LOW, normalizeScanPriority(cdcpb.ScanPriority_SCAN_PRIORITY_UNKNOWN))
}

// TestPriorityCalculationLogic tests the priority calculation logic in isolation
func TestPriorityCalculationLogic(t *testing.T) {
currentTime := time.Now()
Expand Down
1 change: 1 addition & 0 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func (s *regionRequestWorker) createRegionRequest(region regionInfo) *cdcpb.Chan
EndKey: region.span.EndKey,
ExtraOp: kvrpcpb.ExtraOp_ReadOldValue,
FilterLoop: region.filterLoop,
ScanPriority: normalizeScanPriority(region.scanPriority),
}
}

Expand Down
36 changes: 36 additions & 0 deletions logservice/logpuller/region_request_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,42 @@ func prepareRegionForSendTest(region regionInfo) regionInfo {
return region
}

func TestCreateRegionRequestScanPriority(t *testing.T) {
worker := &regionRequestWorker{
client: &subscriptionClient{clusterID: 1},
}

for _, tc := range []struct {
name string
priority cdcpb.ScanPriority
expected cdcpb.ScanPriority
}{
{
name: "high",
priority: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH,
expected: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH,
},
{
name: "low",
priority: cdcpb.ScanPriority_SCAN_PRIORITY_LOW,
expected: cdcpb.ScanPriority_SCAN_PRIORITY_LOW,
},
{
name: "unknown defaults to low",
priority: cdcpb.ScanPriority_SCAN_PRIORITY_UNKNOWN,
expected: cdcpb.ScanPriority_SCAN_PRIORITY_LOW,
},
} {
t.Run(tc.name, func(t *testing.T) {
region := prepareRegionForSendTest(createTestRegionInfo(1, 1))
region.scanPriority = tc.priority

req := worker.createRegionRequest(region)
require.Equal(t, tc.expected, req.GetScanPriority())
})
}
}

func TestRegionStatesOperation(t *testing.T) {
worker := &regionRequestWorker{}
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)
Expand Down
5 changes: 5 additions & 0 deletions logservice/logpuller/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package logpuller
import (
"sync"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -46,6 +47,9 @@ type regionInfo struct {
// Whether to filter out the value write by cdc itself.
// It should be `true` in BDR mode
filterLoop bool
// scanPriority is sent to TiKV/CSE so remote incremental scan admission can
// preserve TiCDC's business priority across retries.
scanPriority cdcpb.ScanPriority
}

func (s *regionInfo) isStopped() bool {
Expand All @@ -66,6 +70,7 @@ func newRegionInfo(
rpcCtx: rpcCtx,
subscribedSpan: subscribedSpan,
filterLoop: filterLoop,
scanPriority: TaskLowPrior.scanPriority(),
}
}

Expand Down
5 changes: 3 additions & 2 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests(
// scheduleRegionRequest locks the region's range and send the region to regionTaskQueue,
// which will be handled by handleRegions.
func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) {
region.scanPriority = priority.scanPriority()
lockRangeResult := region.subscribedSpan.rangeLock.LockRange(
ctx, region.span.StartKey, region.span.EndKey, region.verID.GetID(), region.verID.GetVer())

Expand Down Expand Up @@ -872,12 +873,12 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr
}
if innerErr.GetCongested() != nil {
metricKvCongestedCounter.Inc()
s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior)
s.scheduleRegionRequest(ctx, errInfo.regionInfo, taskTypeFromScanPriority(errInfo.scanPriority))
return nil
}
if innerErr.GetServerIsBusy() != nil {
metricKvIsBusyCounter.Inc()
s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior)
s.scheduleRegionRequest(ctx, errInfo.regionInfo, taskTypeFromScanPriority(errInfo.scanPriority))
return nil
}
if duplicated := innerErr.GetDuplicateRequest(); duplicated != nil {
Expand Down
64 changes: 64 additions & 0 deletions logservice/logpuller/subscription_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -226,6 +227,69 @@ func TestOnRegionFailQueuesCanceledErrorCache(t *testing.T) {
require.NotContains(t, client.totalSpans.spanMap, span.subID)
}

func TestBusyRetryPreservesScanPriority(t *testing.T) {
for _, tc := range []struct {
name string
priority cdcpb.ScanPriority
cdcErr *cdcpb.Error
expected TaskType
}{
{
name: "server is busy high",
priority: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH,
cdcErr: &cdcpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}},
expected: TaskHighPrior,
},
{
name: "server is busy low",
priority: cdcpb.ScanPriority_SCAN_PRIORITY_LOW,
cdcErr: &cdcpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}},
expected: TaskLowPrior,
},
{
name: "congested high",
priority: cdcpb.ScanPriority_SCAN_PRIORITY_HIGH,
cdcErr: &cdcpb.Error{Congested: &cdcpb.Congested{}},
expected: TaskHighPrior,
},
{
name: "congested low",
priority: cdcpb.ScanPriority_SCAN_PRIORITY_LOW,
cdcErr: &cdcpb.Error{Congested: &cdcpb.Congested{}},
expected: TaskLowPrior,
},
} {
t.Run(tc.name, func(t *testing.T) {
client := &subscriptionClient{
regionTaskQueue: NewPriorityQueue(),
}
client.pdClock = pdutil.NewClock4Test()
rawSpan := heartbeatpb.TableSpan{
TableID: 1,
StartKey: []byte("a"),
EndKey: []byte("z"),
}
span := &subscribedSpan{
subID: SubscriptionID(1),
span: rawSpan,
rangeLock: regionlock.NewRangeLock(1, rawSpan.StartKey, rawSpan.EndKey, 100),
}
region := newRegionInfo(tikv.NewRegionVerID(1, 1, 1), rawSpan, nil, span, false)
region.scanPriority = tc.priority

err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &eventError{err: tc.cdcErr}))
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
task, err := client.regionTaskQueue.Pop(ctx)
require.NoError(t, err)
require.Equal(t, tc.expected, task.(*regionPriorityTask).taskType)
require.Equal(t, tc.priority, task.GetRegionInfo().scanPriority)
})
}
}

type mockDynamicStream struct{}

func (s *mockDynamicStream) Start() {}
Expand Down