Skip to content

Commit 99b871e

Browse files
authored
planner: add projection pushdown (#27029)
close #26242
1 parent 886dc81 commit 99b871e

12 files changed

Lines changed: 528 additions & 7 deletions

File tree

config/config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,10 @@ type Performance struct {
484484
CrossJoin bool `toml:"cross-join" json:"cross-join"`
485485
RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"`
486486
DistinctAggPushDown bool `toml:"distinct-agg-push-down" json:"distinct-agg-push-down"`
487-
CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"`
488-
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
487+
// Whether enable projection push down for coprocessors (both tikv & tiflash), default false.
488+
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
489+
CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"`
490+
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
489491
// Deprecated
490492
MemProfileInterval string `toml:"-" json:"-"`
491493
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
@@ -693,6 +695,7 @@ var defaultConf = Config{
693695
TxnEntrySizeLimit: DefTxnEntrySizeLimit,
694696
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
695697
DistinctAggPushDown: false,
698+
ProjectionPushDown: false,
696699
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
697700
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
698701
// TODO: set indexUsageSyncLease to 60s.

planner/core/exhaust_physical_plans.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2050,6 +2050,13 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
20502050
mppProp.TaskTp = property.MppTaskType
20512051
newProps = append(newProps, mppProp)
20522052
}
2053+
if newProp.TaskTp != property.CopSingleReadTaskType && p.SCtx().GetSessionVars().AllowProjectionPushDown && p.canPushToCop(kv.TiKV) &&
2054+
expression.CanExprsPushDown(p.SCtx().GetSessionVars().StmtCtx, p.Exprs, p.SCtx().GetClient(), kv.TiKV) && !expression.ContainVirtualColumn(p.Exprs) {
2055+
copProp := newProp.CloneEssentialFields()
2056+
copProp.TaskTp = property.CopSingleReadTaskType
2057+
newProps = append(newProps, copProp)
2058+
}
2059+
20532060
ret := make([]PhysicalPlan, 0, len(newProps))
20542061
for _, newProp := range newProps {
20552062
proj := PhysicalProjection{

planner/core/integration_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4078,6 +4078,74 @@ func TestIssue32428(t *testing.T) {
40784078
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows()) // empty result
40794079
}
40804080

4081+
func TestPushDownProjectionForTiKV(t *testing.T) {
4082+
store, clean := testkit.CreateMockStore(t)
4083+
defer clean()
4084+
tk := testkit.NewTestKit(t, store)
4085+
tk.MustExec("use test")
4086+
tk.MustExec("drop table if exists t")
4087+
tk.MustExec("create table t (a int, b real, i int, id int, value decimal(6,3), name char(128), d decimal(6,3), s char(128), t datetime, c bigint as ((a+1)) virtual, e real as ((b+a)))")
4088+
tk.MustExec("analyze table t")
4089+
tk.MustExec("set session tidb_opt_projection_push_down=1")
4090+
4091+
var input []string
4092+
var output []struct {
4093+
SQL string
4094+
Plan []string
4095+
}
4096+
integrationSuiteData := core.GetIntegrationSuiteData()
4097+
integrationSuiteData.GetTestCases(t, &input, &output)
4098+
for i, tt := range input {
4099+
testdata.OnRecord(func() {
4100+
output[i].SQL = tt
4101+
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
4102+
})
4103+
res := tk.MustQuery(tt)
4104+
res.Check(testkit.Rows(output[i].Plan...))
4105+
}
4106+
}
4107+
4108+
func TestPushDownProjectionForTiFlashCoprocessor(t *testing.T) {
4109+
store, clean := testkit.CreateMockStore(t)
4110+
defer clean()
4111+
tk := testkit.NewTestKit(t, store)
4112+
tk.MustExec("use test")
4113+
tk.MustExec("drop table if exists t")
4114+
tk.MustExec("create table t (a int, b real, i int, id int, value decimal(6,3), name char(128), d decimal(6,3), s char(128), t datetime, c bigint as ((a+1)) virtual, e real as ((b+a)))")
4115+
tk.MustExec("analyze table t")
4116+
tk.MustExec("set session tidb_opt_projection_push_down=1")
4117+
4118+
// Create virtual tiflash replica info.
4119+
dom := domain.GetDomain(tk.Session())
4120+
is := dom.InfoSchema()
4121+
db, exists := is.SchemaByName(model.NewCIStr("test"))
4122+
require.True(t, exists)
4123+
for _, tblInfo := range db.Tables {
4124+
if tblInfo.Name.L == "t" {
4125+
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
4126+
Count: 1,
4127+
Available: true,
4128+
}
4129+
}
4130+
}
4131+
4132+
var input []string
4133+
var output []struct {
4134+
SQL string
4135+
Plan []string
4136+
}
4137+
integrationSuiteData := core.GetIntegrationSuiteData()
4138+
integrationSuiteData.GetTestCases(t, &input, &output)
4139+
for i, tt := range input {
4140+
testdata.OnRecord(func() {
4141+
output[i].SQL = tt
4142+
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
4143+
})
4144+
res := tk.MustQuery(tt)
4145+
res.Check(testkit.Rows(output[i].Plan...))
4146+
}
4147+
}
4148+
40814149
func TestPushDownProjectionForTiFlash(t *testing.T) {
40824150
store, clean := testkit.CreateMockStore(t)
40834151
defer clean()

planner/core/physical_plans.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (p *PhysicalIndexReader) SetSchema(_ *expression.Schema) {
261261
if p.indexPlan != nil {
262262
p.IndexPlans = flattenPushDownPlan(p.indexPlan)
263263
switch p.indexPlan.(type) {
264-
case *PhysicalHashAgg, *PhysicalStreamAgg:
264+
case *PhysicalHashAgg, *PhysicalStreamAgg, *PhysicalProjection:
265265
p.schema = p.indexPlan.Schema()
266266
default:
267267
is := p.IndexPlans[0].(*PhysicalIndexScan)

planner/core/plan_to_pb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,15 @@ func (p *PhysicalProjection) ToPB(ctx sessionctx.Context, storeType kv.StoreType
122122
Exprs: exprs,
123123
}
124124
executorID := ""
125-
if storeType == kv.TiFlash {
125+
if storeType == kv.TiFlash || storeType == kv.TiKV {
126126
var err error
127127
projExec.Child, err = p.children[0].ToPB(ctx, storeType)
128128
if err != nil {
129129
return nil, errors.Trace(err)
130130
}
131131
executorID = p.ExplainID().String()
132132
} else {
133-
return nil, errors.Errorf("The projection can only be pushed down to TiFlash now, not %s", storeType.Name())
133+
return nil, errors.Errorf("the projection can only be pushed down to TiFlash or TiKV now, not %s", storeType.Name())
134134
}
135135
return &tipb.Executor{Tp: tipb.ExecType_TypeProjection, Projection: projExec, ExecutorId: &executorID}, nil
136136
}

planner/core/task.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,7 +1460,7 @@ func (p *PhysicalProjection) GetCost(count float64) float64 {
14601460
func (p *PhysicalProjection) attach2Task(tasks ...task) task {
14611461
t := tasks[0].copy()
14621462
if cop, ok := t.(*copTask); ok {
1463-
if len(cop.rootTaskConds) == 0 && cop.getStoreType() == kv.TiFlash && expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.Exprs, p.ctx.GetClient(), cop.getStoreType()) {
1463+
if len(cop.rootTaskConds) == 0 && expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.Exprs, p.ctx.GetClient(), cop.getStoreType()) {
14641464
copTask := attachPlan2Task(p, cop)
14651465
copTask.addCost(p.GetCost(t.count()))
14661466
p.cost = copTask.cost()
@@ -1475,7 +1475,6 @@ func (p *PhysicalProjection) attach2Task(tasks ...task) task {
14751475
return mpp
14761476
}
14771477
}
1478-
// TODO: support projection push down for TiKV.
14791478
t = t.convertToRootTask(p.ctx)
14801479
t = attachPlan2Task(p, t)
14811480
t.addCost(p.GetCost(t.count()))

planner/core/testdata/integration_suite_in.json

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,52 @@
670670
"explain format = 'brief' select /*+ inl_hash_join(s) */ * from t join s on t.a=s.a and t.a = s.b"
671671
]
672672
},
673+
{
674+
"name": "TestPushDownProjectionForTiKV",
675+
"cases": [
676+
"desc format = 'brief' select i * 2 from t",
677+
"desc format = 'brief' select DATE_FORMAT(t, '%Y-%m-%d %H') as date from t",
678+
"desc format = 'brief' select md5(s) from t",
679+
"desc format = 'brief' select c from t where a+1=3",
680+
"desc format = 'brief' select /*+ hash_agg()*/ count(b) from (select id + 1 as b from t)A",
681+
"desc format = 'brief' select /*+ hash_agg()*/ count(*) from (select id + 1 as b from t)A",
682+
"desc format = 'brief' select /*+ hash_agg()*/ sum(b) from (select id + 1 as b from t)A",
683+
"desc format = 'brief' select /*+ stream_agg()*/ count(b) from (select id + 1 as b from t)A",
684+
"desc format = 'brief' select /*+ stream_agg()*/ count(*) from (select id + 1 as b from t)A",
685+
"desc format = 'brief' select /*+ stream_agg()*/ sum(b) from (select id + 1 as b from t)A",
686+
"desc format = 'brief' select * from (select id-2 as b from t) B join (select id-2 as b from t) A on A.b=B.b",
687+
"desc format = 'brief' select * from t join (select id-2 as b from t) A on A.b=t.id",
688+
"desc format = 'brief' select * from t left join (select id-2 as b from t) A on A.b=t.id",
689+
"desc format = 'brief' select * from t right join (select id-2 as b from t) A on A.b=t.id",
690+
"desc format = 'brief' select A.b, B.b from (select id-2 as b from t) B join (select id-2 as b from t) A on A.b=B.b",
691+
"desc format = 'brief' select A.id from t as A where exists (select 1 from t where t.id=A.id)",
692+
"desc format = 'brief' select A.id from t as A where not exists (select 1 from t where t.id=A.id)",
693+
"desc format = 'brief' SELECT FROM_UNIXTIME(name,'%Y-%m-%d') FROM t;"
694+
]
695+
},
696+
{
697+
"name": "TestPushDownProjectionForTiFlashCoprocessor",
698+
"cases": [
699+
"desc format = 'brief' select i * 2 from t",
700+
"desc format = 'brief' select DATE_FORMAT(t, '%Y-%m-%d %H') as date from t",
701+
"desc format = 'brief' select md5(s) from t",
702+
"desc format = 'brief' select c from t where a+1=3",
703+
"desc format = 'brief' select /*+ hash_agg()*/ count(b) from (select id + 1 as b from t)A",
704+
"desc format = 'brief' select /*+ hash_agg()*/ count(*) from (select id + 1 as b from t)A",
705+
"desc format = 'brief' select /*+ hash_agg()*/ sum(b) from (select id + 1 as b from t)A",
706+
"desc format = 'brief' select /*+ stream_agg()*/ count(b) from (select id + 1 as b from t)A",
707+
"desc format = 'brief' select /*+ stream_agg()*/ count(*) from (select id + 1 as b from t)A",
708+
"desc format = 'brief' select /*+ stream_agg()*/ sum(b) from (select id + 1 as b from t)A",
709+
"desc format = 'brief' select * from (select id-2 as b from t) B join (select id-2 as b from t) A on A.b=B.b",
710+
"desc format = 'brief' select * from t join (select id-2 as b from t) A on A.b=t.id",
711+
"desc format = 'brief' select * from t left join (select id-2 as b from t) A on A.b=t.id",
712+
"desc format = 'brief' select * from t right join (select id-2 as b from t) A on A.b=t.id",
713+
"desc format = 'brief' select A.b, B.b from (select id-2 as b from t) B join (select id-2 as b from t) A on A.b=B.b",
714+
"desc format = 'brief' select A.id from t as A where exists (select 1 from t where t.id=A.id)",
715+
"desc format = 'brief' select A.id from t as A where not exists (select 1 from t where t.id=A.id)",
716+
"desc format = 'brief' SELECT FROM_UNIXTIME(name,'%Y-%m-%d') FROM t;"
717+
]
718+
},
673719
{
674720
"name": "TestPushDownProjectionForTiFlash",
675721
"cases": [

0 commit comments

Comments
 (0)