Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
810 changes: 756 additions & 54 deletions DEPS.bzl

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20260310054046-9c8b3586e4b2
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20260320060847-534bbfabf736
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9
github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
Expand Down Expand Up @@ -145,7 +145,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.1
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b
golang.org/x/net v0.50.0
golang.org/x/net v0.51.0
golang.org/x/oauth2 v0.33.0
golang.org/x/sync v0.20.0
golang.org/x/sys v0.41.0
Expand Down Expand Up @@ -368,7 +368,10 @@ replace (
// Downgrade grpc to v1.63.2, as well as other related modules.
github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20260401045836-48e3f28d9c74
github.com/pingcap/tidb/pkg/parser => ./pkg/parser
github.com/tikv/client-go/v2 => github.com/JmPotato/client-go/v2 v2.0.0-20260402030241-85eebdba76ab
github.com/tikv/pd/client => github.com/JmPotato/pd/client v0.0.0-20260409053522-fda21466b565

// TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed.
// Please remove these dependencies.
Expand Down
1,484 changes: 1,470 additions & 14 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type DistSQLContext struct {
EnablePaging bool
MinPagingSize int
MaxPagingSize int
PagingSizeBytes int
RequestSourceType string
ExplicitRequestSourceType string
StoreBatchSize int
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TestContextDetach(t *testing.T) {
EnablePaging: true,
MinPagingSize: 1,
MaxPagingSize: 1,
PagingSizeBytes: 1,
RequestSourceType: "a",
ExplicitRequestSourceType: "b",
StoreBatchSize: 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ func (builder *RequestBuilder) SetFromSessionVars(dctx *distsqlctx.DistSQLContex
builder.SetPaging(dctx.EnablePaging)
builder.Request.Paging.MinPagingSize = uint64(dctx.MinPagingSize)
builder.Request.Paging.MaxPagingSize = uint64(dctx.MaxPagingSize)
builder.Request.Paging.PagingSizeBytes = uint64(dctx.PagingSizeBytes)
}
builder.RequestSource.RequestSourceInternal = dctx.InRestrictedSQL
builder.RequestSource.RequestSourceType = dctx.RequestSourceType
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,9 @@ type Request struct {
// when enabled, this field is adjusted to be max(MaxPagingSize, paging.MinAllowedMaxPagingSize),
// see paging.GrowPagingSize
MaxPagingSize uint64
// PagingSizeBytes is the byte budget per page.
// 0 means disabled (no byte-budget paging).
PagingSizeBytes uint64
}
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func InitDistSQLMetrics() {
Namespace: "tidb",
Subsystem: "distsql",
Name: "copr_resp_size",
Help: "copr task response data size in bytes.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
Help: "copr task response data size in KiB.",
Buckets: prometheus.ExponentialBuckets(1, 2, 19),
}, []string{LblStore})
}
1 change: 1 addition & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3380,6 +3380,7 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
EnablePaging: vars.EnablePaging,
MinPagingSize: vars.MinPagingSize,
MaxPagingSize: vars.MaxPagingSize,
PagingSizeBytes: vars.PagingSizeBytes,
RequestSourceType: vars.RequestSourceType,
ExplicitRequestSourceType: vars.ExplicitRequestSourceType,
StoreBatchSize: vars.StoreBatchSize,
Expand Down
9 changes: 9 additions & 0 deletions pkg/sessionctx/vardef/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ const (
// TiDBMaxPagingSize is used to control the max paging size in the coprocessor paging protocol.
TiDBMaxPagingSize = "tidb_max_paging_size"

// TiDBPagingSizeBytes is the byte budget per coprocessor page.
// When non-zero, TiKV stops scanning a page once accumulated response bytes
// reach this limit. Setting a non-zero value implicitly enables row-count
// paging if it is not already enabled, because the TiKV paging protocol
// requires it.
// 0 means disabled (no byte-budget paging).
TiDBPagingSizeBytes = "tidb_paging_size_bytes"

// TiDBEnableCascadesPlanner is used to control whether to enable the cascades planner.
TiDBEnableCascadesPlanner = "tidb_enable_cascades_planner"

Expand Down Expand Up @@ -1474,6 +1482,7 @@ const (
DefInitChunkSize = 32
DefMinPagingSize = int(paging.MinPagingSize)
DefMaxPagingSize = int(paging.MinAllowedMaxPagingSize)
DefPagingSizeBytes = 0
DefMaxChunkSize = 1024
DefDMLBatchSize = 0
DefMaxPreparedStmtCount = -1
Expand Down
5 changes: 5 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,10 @@ type SessionVars struct {
// NOTE: please don't change it directly. Use `SetResourceGroupName`, because it'll need to inc/dec the metrics
ResourceGroupName string

// PagingSizeBytes is the byte budget per page.
// 0 means disabled.
PagingSizeBytes int

// PessimisticTransactionFairLocking controls whether fair locking for pessimistic transaction
// is enabled.
PessimisticTransactionFairLocking bool
Expand Down Expand Up @@ -2445,6 +2449,7 @@ func NewSessionVars(hctx HookContext) *SessionVars {
EnableLateMaterialization: vardef.DefTiDBOptEnableLateMaterialization,
TiFlashComputeDispatchPolicy: tiflashcompute.DispatchPolicyConsistentHash,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
PagingSizeBytes: vardef.DefPagingSizeBytes,
DefaultCollationForUTF8MB4: mysql.DefaultCollationName,
GroupConcatMaxLen: vardef.DefGroupConcatMaxLen,
EnableRedactLog: vardef.DefTiDBRedactLog,
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/setvar_affect.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ var isHintUpdatableVerified = map[string]struct{}{
"tidb_max_chunk_size": {},
"tidb_min_paging_size": {},
"tidb_max_paging_size": {},
"tidb_paging_size_bytes": {},
"tidb_enable_cascades_planner": {},
"tidb_merge_join_concurrency": {},
"tidb_index_merge_intersection_concurrency": {},
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2970,6 +2970,10 @@ var defaultSysVars = []*SysVar{
s.MaxPagingSize = tidbOptPositiveInt32(val, vardef.DefMaxPagingSize)
return nil
}},
{Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.TiDBPagingSizeBytes, Value: strconv.Itoa(vardef.DefPagingSizeBytes), Type: vardef.TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.PagingSizeBytes = int(TidbOptInt64(val, int64(vardef.DefPagingSizeBytes)))
return nil
}},
{Scope: vardef.ScopeSession, Name: vardef.TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: vardef.TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0)
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 40,
shard_count = 42,
deps = [
"//pkg/kv",
"//pkg/sessionctx/vardef",
"//pkg/store/driver/backoff",
"//pkg/testkit/testsetup",
"//pkg/util/logutil",
Expand Down
7 changes: 4 additions & 3 deletions pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
Concurrency: 15,
StoreBatchSize: 3,
Paging: struct {
Enable bool
MinPagingSize uint64
MaxPagingSize uint64
Enable bool
MinPagingSize uint64
MaxPagingSize uint64
PagingSizeBytes uint64
}{
Enable: true,
MinPagingSize: 1,
Expand Down
58 changes: 48 additions & 10 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
// coprocessor request but type is not DAG
req.Paging.Enable = false
}
// Byte-budget paging: if the request is eligible and has a byte budget,
// ensure paging is enabled so each page's scanned bytes are bounded.
// The TiKV coprocessor protocol requires paging to be enabled for the
// PagingSizeBytes field to take effect. When force-enabling paging, we
// use minimal row-count parameters so the byte budget becomes the
// dominant page-break signal.
// This must happen before checkStoreBatchCopr, which disables batch copr
// when paging is enabled.
pagingSizeBytes := uint64(0)
if pagingBytesEligible(req) && req.Paging.PagingSizeBytes > 0 {
if !req.Paging.Enable {
req.Paging.Enable = true
req.Paging.MinPagingSize = paging.MinPagingSize
req.Paging.MaxPagingSize = paging.MinAllowedMaxPagingSize
}
pagingSizeBytes = req.Paging.PagingSizeBytes
}
failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) {
if req.Paging.Enable {
if !req.KeyRanges.IsFullySorted() {
Expand Down Expand Up @@ -160,11 +177,12 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
tryRowHint := optRowHint(req)
elapsed := time.Duration(0)
buildOpt := &buildCopTaskOpt{
req: req,
cache: c.store.GetRegionCache(),
eventCb: eventCb,
respChan: req.KeepOrder,
elapsed: &elapsed,
req: req,
cache: c.store.GetRegionCache(),
eventCb: eventCb,
respChan: req.KeepOrder,
elapsed: &elapsed,
pagingSizeBytes: pagingSizeBytes,
}
buildTaskFunc := func(ranges []kv.KeyRange, hints []int) error {
keyRanges := NewKeyRanges(ranges)
Expand Down Expand Up @@ -287,10 +305,11 @@ type copTask struct {
cmdType tikvrpc.CmdType
storeType kv.StoreType

eventCb trxevents.EventCallback
paging bool
pagingSize uint64
pagingTaskIdx uint32
eventCb trxevents.EventCallback
paging bool
pagingSize uint64
pagingSizeBytes uint64
pagingTaskIdx uint32

partitionIndex int64 // used by balanceBatchCopTask in PartitionTableScan
requestSource util.RequestSource
Expand Down Expand Up @@ -367,6 +386,9 @@ type buildCopTaskOpt struct {
skipBuckets bool
// exceedsBoundRetry propagates bounded retry attempts to generated tasks.
exceedsBoundRetry int
// pagingSizeBytes is the byte budget per page.
// 0 means no byte-based limit.
pagingSizeBytes uint64
}

const (
Expand Down Expand Up @@ -690,6 +712,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
eventCb: eventCb,
paging: req.Paging.Enable,
pagingSize: pagingSize,
pagingSizeBytes: opt.pagingSizeBytes,
requestSource: req.RequestSource,
RowCountHint: hint,
busyThreshold: req.StoreBusyThreshold,
Expand Down Expand Up @@ -717,6 +740,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
// disable paging for small limit.
task.paging = false
task.pagingSize = 0
task.pagingSizeBytes = 0
} else {
pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize)
}
Expand Down Expand Up @@ -1662,6 +1686,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask) (*
Ranges: task.ranges.ToPBRanges(),
SchemaVer: worker.req.SchemaVar,
PagingSize: task.pagingSize,
PagingSizeBytes: task.pagingSizeBytes,
Tasks: task.ToPBBatchTasks(),
ConnectionId: worker.req.ConnID,
ConnectionAlias: worker.req.ConnAlias,
Expand Down Expand Up @@ -1761,7 +1786,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask) (*
}

if copResp != nil {
tidbmetrics.DistSQLCoprRespBodySize.WithLabelValues(storeAddr).Observe(float64(len(copResp.Data) / 1024))
tidbmetrics.DistSQLCoprRespBodySize.WithLabelValues(storeAddr).Observe(float64(len(copResp.Data)) / 1024)
}

var result *copTaskResult
Expand Down Expand Up @@ -1850,6 +1875,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
// If there is region error or lock error, keep the paging size and retry.
for _, remainedTask := range result.remains {
remainedTask.pagingSize = task.pagingSize
remainedTask.pagingSizeBytes = task.pagingSizeBytes
}
return result, nil
}
Expand Down Expand Up @@ -2892,6 +2918,18 @@ func optRowHint(req *kv.Request) bool {
return opt
}

// pagingBytesEligible checks whether byte-budget paging should be applied.
// Only DAG requests on TiKV support byte-budget paging.
func pagingBytesEligible(req *kv.Request) bool {
if req.StoreType != kv.TiKV {
return false
}
if req.Tp != kv.ReqTypeDAG {
return false
}
return true
}

func checkStoreBatchCopr(req *kv.Request) bool {
if req.Tp != kv.ReqTypeDAG || req.StoreType != kv.TiKV {
return false
Expand Down
64 changes: 64 additions & 0 deletions pkg/store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,70 @@ func TestBuildPagingTasksDisablePagingForSmallLimit(t *testing.T) {
require.Equal(t, tasks[0].pagingSize, uint64(0))
}

func TestPagingBytesEligible(t *testing.T) {
// Eligible: TiKV + DAG
req := &kv.Request{
Tp: kv.ReqTypeDAG,
StoreType: kv.TiKV,
}
require.True(t, pagingBytesEligible(req))

// Not eligible: TiFlash
req2 := &kv.Request{Tp: kv.ReqTypeDAG, StoreType: kv.TiFlash}
require.False(t, pagingBytesEligible(req2))

// Not eligible: non-DAG
req3 := &kv.Request{Tp: kv.ReqTypeAnalyze, StoreType: kv.TiKV}
require.False(t, pagingBytesEligible(req3))
}

func TestBuildCopTasksWithPagingSizeBytes(t *testing.T) {
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()
_, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))

pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
req.Paging.Enable = true
req.Paging.MinPagingSize = paging.MinPagingSize

// With pagingSizeBytes set, tasks should carry the byte budget.
tasks, err := buildCopTasks(bo, buildCopRanges("a", "c"), &buildCopTaskOpt{
req: req,
cache: cache,
respChan: true,
pagingSizeBytes: uint64(4 * 1024 * 1024),
})
require.NoError(t, err)
require.Len(t, tasks, 1)
taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c")
require.True(t, tasks[0].paging)
require.Equal(t, uint64(4 * 1024 * 1024), tasks[0].pagingSizeBytes)

// When small limit disables paging, pagingSizeBytes should also be cleared.
req.LimitSize = 1
tasks, err = buildCopTasks(bo, buildCopRanges("a", "c"), &buildCopTaskOpt{
req: req,
cache: cache,
respChan: true,
pagingSizeBytes: uint64(4 * 1024 * 1024),
})
require.NoError(t, err)
require.Len(t, tasks, 1)
require.False(t, tasks[0].paging)
require.Equal(t, uint64(0), tasks[0].pagingSizeBytes)
}

func toCopRange(r kv.KeyRange) *coprocessor.KeyRange {
coprRange := coprocessor.KeyRange{}
coprRange.Start = r.StartKey
Expand Down
22 changes: 22 additions & 0 deletions tests/integrationtest/r/sessionctx/setvar.result
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,28 @@ set @@tidb_max_paging_size=default;
select @@tidb_max_paging_size;
@@tidb_max_paging_size
50000
select /*+ set_var(tidb_paging_size_bytes=1048576) */ @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
1048576
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
set @@tidb_paging_size_bytes=default;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
set @@tidb_paging_size_bytes=0;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
set @@tidb_paging_size_bytes=4194304;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
4194304
set @@tidb_paging_size_bytes=default;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
select /*+ set_var(tidb_enable_cascades_planner=0) */ @@tidb_enable_cascades_planner;
@@tidb_enable_cascades_planner
0
Expand Down
Loading
Loading