Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func getResourceControlInfo(ctx context.Context, req *tikvrpc.Request) (
}
reqInfo := resourcecontrol.MakeRequestInfo(req)
if reqInfo.Bypass() {
return "", nil, nil
return resourceGroupName, nil, reqInfo
}
return resourceGroupName, resourceControlInterceptor, reqInfo
}
Expand Down
48 changes: 48 additions & 0 deletions internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/internal/resourcecontrol"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/util/async"
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
)

type emptyClient struct{}
Expand All @@ -48,6 +51,28 @@ func (c emptyClient) CloseAddr(addr string) error {

func (c emptyClient) SetEventListener(listener ClientEventListener) {}

type testResourceControlInterceptor struct{}

func (testResourceControlInterceptor) OnRequestWait(context.Context, string, resourceControlClient.RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
return nil, nil, 0, 0, nil
}

func (testResourceControlInterceptor) OnResponse(string, resourceControlClient.RequestInfo, resourceControlClient.ResponseInfo) (*rmpb.Consumption, error) {
return nil, nil
}

func (testResourceControlInterceptor) OnResponseWait(context.Context, string, resourceControlClient.RequestInfo, resourceControlClient.ResponseInfo) (*rmpb.Consumption, time.Duration, error) {
return nil, 0, nil
}

func (testResourceControlInterceptor) IsBackgroundRequest(context.Context, string, string) bool {
return false
}

func (testResourceControlInterceptor) GetRUVersion() resourceControlClient.RUVersion {
return 0
}

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{})
Expand Down Expand Up @@ -115,3 +140,26 @@ func TestBypassRUV2FollowsRequestInfoBypass(t *testing.T) {
backgroundReq.RequestSource = "background-task"
assert.False(t, resourcecontrol.MakeRequestInfo(backgroundReq).Bypass())
}

func TestGetResourceControlInfoBypassesResourceControl(t *testing.T) {
require := require.New(t)
ResourceControlSwitch.Store(true)
defer ResourceControlSwitch.Store(false)

interceptor := resourceControlClient.ResourceGroupKVInterceptor(testResourceControlInterceptor{})
ResourceControlInterceptor.Store(&interceptor)
defer ResourceControlInterceptor.Store(nil)

req := &tikvrpc.Request{
Context: kvrpcpb.Context{
ResourceControlContext: &kvrpcpb.ResourceControlContext{ResourceGroupName: "rg-1"},
RequestSource: "xxx_internal_others",
},
}

resourceGroupName, resourceControlInterceptor, reqInfo := getResourceControlInfo(context.Background(), req)
require.Equal("rg-1", resourceGroupName)
require.Nil(resourceControlInterceptor)
require.NotNil(reqInfo)
require.True(reqInfo.Bypass())
}
73 changes: 54 additions & 19 deletions internal/resourcecontrol/resource_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type RequestInfo struct {
replicaNumber int64
requestSize uint64
accessType controller.AccessLocationType
requestSource string
// bypass indicates whether the request should be bypassed.
// some internal request should be bypassed, such as Privilege request.
bypass bool
Expand All @@ -55,26 +56,54 @@ func toPDAccessLocationType(accessType kv.AccessLocationType) controller.AccessL
}

// reqTypeAnalyze is the type of analyze coprocessor request.
// ref: https://github.com/pingcap/tidb/blob/ee4eac2ccb83e1ea653b8131d9a43495019cb5ac/pkg/kv/kv.go#L340
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/kv.go#L340
const reqTypeAnalyze = 104

// bypassResourceSourceList maintains a list of resource sources that should be bypassed from the resource control.
var bypassResourceSourceList = []string{
/* DDL */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/ddl/job_worker.go#L503
util.InternalTxnAddIndex,
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/ddl/backfilling_operators.go#L1230
util.InternalTxnMergeTempIndex,
/* BR */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/option.go#L214
util.InternalTxnBR,
/* Import Into */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/option.go#L224
util.InternalImportInto,
/* Workload Learning */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/option.go#L201
util.InternalTxnWorkloadLearning,
}

func shouldBypass(req *tikvrpc.Request) bool {
requestSource := req.GetRequestSource()
// Check both coprocessor request type and the request source to ensure the request is an internal analyze request.
// Internal analyze request may consume a lot of resources, bypass it to avoid affecting the user experience.
// This bypass currently only works with NextGen.
if config.NextGen && strings.Contains(requestSource, util.InternalTxnStats) {
var tp int64
switch req.Type {
case tikvrpc.CmdBatchCop:
tp = req.BatchCop().GetTp()
case tikvrpc.CmdCop, tikvrpc.CmdCopStream:
tp = req.Cop().GetTp()

// These bypasses currently only works with NextGen.
if config.NextGen {
// Check both coprocessor request type and the request source to ensure the request is an internal analyze request.
// Internal analyze request may consume a lot of resources, bypass it to avoid affecting the user experience.
if strings.Contains(requestSource, util.InternalTxnStats) {
var tp int64
switch req.Type {
case tikvrpc.CmdBatchCop:
tp = req.BatchCop().GetTp()
case tikvrpc.CmdCop, tikvrpc.CmdCopStream:
tp = req.Cop().GetTp()
}
if tp == reqTypeAnalyze {
return true
}
}
if tp == reqTypeAnalyze {
return true
// Check other resource source types that should be bypassed.
for _, source := range bypassResourceSourceList {
if strings.Contains(requestSource, source) {
return true
}
}
}

// Some internal requests should be bypassed, which may affect the user experience.
// For example, the `alter user password` request completely bypasses resource control.
// Although it does not consume many resources, it can still impact the user experience.
Expand All @@ -87,11 +116,12 @@ func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo {
storeID := req.Context.GetPeer().GetStoreId()
if !req.IsTxnWriteRequest() && !req.IsRawWriteRequest() {
return &RequestInfo{
writeBytes: -1,
storeID: storeID,
bypass: bypass,
requestSize: uint64(req.GetSize()),
accessType: toPDAccessLocationType(req.AccessLocation),
writeBytes: -1,
storeID: storeID,
requestSize: uint64(req.GetSize()),
accessType: toPDAccessLocationType(req.AccessLocation),
requestSource: req.GetRequestSource(),
bypass: bypass,
}
}

Expand All @@ -114,9 +144,10 @@ func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo {
writeBytes: writeBytes,
storeID: storeID,
replicaNumber: req.ReplicaNumber,
bypass: bypass,
requestSize: uint64(req.GetSize()),
accessType: toPDAccessLocationType(req.AccessLocation),
requestSource: req.GetRequestSource(),
bypass: bypass,
}
}

Expand Down Expand Up @@ -155,6 +186,10 @@ func (req *RequestInfo) AccessLocationType() controller.AccessLocationType {
return req.accessType
}

func (req *RequestInfo) RequestSource() string {
return req.requestSource
}

// ResponseInfo contains information about a response that is able to calculate the RU cost
// after the response is received. Specifically, the read bytes RU cost of a read request
// could be calculated by its response size, and the KV CPU time RU cost of a request could
Expand Down
108 changes: 106 additions & 2 deletions internal/resourcecontrol/resource_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)

func TestMakeRequestInfo(t *testing.T) {
Expand All @@ -23,14 +24,22 @@ func TestMakeRequestInfo(t *testing.T) {
// Test a prewrite request.
mutation := &kvrpcpb.Mutation{Key: []byte("foo"), Value: []byte("bar")}
prewriteReq := &kvrpcpb.PrewriteRequest{Mutations: []*kvrpcpb.Mutation{mutation}, PrimaryLock: []byte("baz")}
req = &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, Req: prewriteReq, ReplicaNumber: 1, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 2}}}
requestSource := "xxx_internal_others"
req.RequestSource = requestSource
req = &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Req: prewriteReq,
ReplicaNumber: 1,
Context: kvrpcpb.Context{
Peer: &metapb.Peer{StoreId: 2},
RequestSource: requestSource,
},
}
info = MakeRequestInfo(req)
assert.True(t, info.IsWrite())
assert.Equal(t, uint64(9), info.WriteBytes())
assert.True(t, info.Bypass())
assert.Equal(t, uint64(2), info.StoreID())
assert.Equal(t, requestSource, info.RequestSource())
// Test a commit request.
commitReq := &kvrpcpb.CommitRequest{Keys: [][]byte{[]byte("qux")}}
req = &tikvrpc.Request{Type: tikvrpc.CmdCommit, Req: commitReq, ReplicaNumber: 2, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 3}}}
Expand All @@ -39,6 +48,7 @@ func TestMakeRequestInfo(t *testing.T) {
assert.Equal(t, uint64(3), info.WriteBytes())
assert.False(t, info.Bypass())
assert.Equal(t, uint64(3), info.StoreID())
assert.Empty(t, info.RequestSource())

// Test Nil Peer in Context
req = &tikvrpc.Request{Type: tikvrpc.CmdCommit, Req: commitReq, ReplicaNumber: 2, Context: kvrpcpb.Context{}}
Expand All @@ -49,6 +59,100 @@ func TestMakeRequestInfo(t *testing.T) {
assert.Equal(t, uint64(0), info.StoreID())
}

func TestMakeRequestInfoBypassCases(t *testing.T) {
tests := []struct {
name string
req *tikvrpc.Request
expectedBypass bool
nextGenOnly bool
}{
{
name: "internal others",
req: &tikvrpc.Request{Context: kvrpcpb.Context{RequestSource: "xxx_internal_others"}},
expectedBypass: true,
},
{
name: "add index",
req: &tikvrpc.Request{
Context: kvrpcpb.Context{RequestSource: util.InternalTxnAddIndex},
},
expectedBypass: true,
nextGenOnly: true,
},
{
name: "merge temp index",
req: &tikvrpc.Request{
Context: kvrpcpb.Context{RequestSource: util.InternalTxnMergeTempIndex},
},
expectedBypass: true,
nextGenOnly: true,
},
{
name: "br",
req: &tikvrpc.Request{
Context: kvrpcpb.Context{RequestSource: util.InternalTxnBR},
},
expectedBypass: true,
nextGenOnly: true,
},
{
name: "import into",
req: &tikvrpc.Request{
Context: kvrpcpb.Context{RequestSource: util.InternalImportInto},
},
expectedBypass: true,
nextGenOnly: true,
},
{
name: "workload learning",
req: &tikvrpc.Request{
Context: kvrpcpb.Context{RequestSource: util.InternalTxnWorkloadLearning},
},
expectedBypass: true,
nextGenOnly: true,
},
{
name: "analyze stats",
req: &tikvrpc.Request{
Type: tikvrpc.CmdCop,
Req: &coprocessor.Request{
Tp: reqTypeAnalyze,
},
Context: kvrpcpb.Context{
Peer: &metapb.Peer{StoreId: 4},
RequestSource: util.InternalTxnStats,
},
},
expectedBypass: true,
nextGenOnly: true,
},
{
name: "analyze stats without analyze cop",
req: &tikvrpc.Request{
Type: tikvrpc.CmdCop,
Req: &coprocessor.Request{
Tp: 1,
},
Context: kvrpcpb.Context{
RequestSource: util.InternalTxnStats,
},
},
expectedBypass: false,
nextGenOnly: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.nextGenOnly && !config.NextGen {
t.Skip("rule only applies to nextgen")
}
info := MakeRequestInfo(tt.req)
assert.Equal(t, tt.expectedBypass, info.Bypass())
})
}
}

func TestResponseInfoReadBytes(t *testing.T) {
resp := &tikvrpc.Response{
Resp: &coprocessor.Response{
Expand Down
10 changes: 10 additions & 0 deletions util/request_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ const (
InternalTxnMeta = InternalTxnOthers
// InternalTxnStats is the type of statistics txn.
InternalTxnStats = "stats"
// InternalTxnBR is the type of BR usage.
InternalTxnBR = "br"
// InternalImportInto is the type of IMPORT INTO usage
InternalImportInto = "ImportInto"
// InternalTxnAddIndex is the type of DDL add index backfill.
InternalTxnAddIndex = "add_index"
// InternalTxnMergeTempIndex is the type of DDL temp index merging.
InternalTxnMergeTempIndex = "merge_temp_index"
// InternalTxnWorkloadLearning is the type of workload-based cost learning.
InternalTxnWorkloadLearning = "WorkloadLearning"
)

// explicit source types.
Expand Down