Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 8 additions & 39 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,54 +614,23 @@ func buildPersistedDDLEventForDropView(args buildPersistedDDLEventFuncArgs) Pers
// Value assignment in CREATE VIEW:
// https://github.com/pingcap/tidb/blob/8f2630e53d5d/pkg/ddl/create_table.go#L1668-L1678
func normalizeCreateViewQueryWithStoredSelect(event *PersistedDDLEvent) {
if event.Query == "" || event.TableInfo == nil || event.TableInfo.View == nil || event.TableInfo.View.SelectStmt == "" {
if event.TableInfo == nil || event.TableInfo.View == nil {
return
}

stmt, err := parser.New().ParseOneStmt(event.Query, "", "")
if err != nil {
log.Warn("parse create view query failed when normalizing select statement",
zap.String("query", event.Query),
zap.Error(err))
return
}
createViewStmt, ok := stmt.(*ast.CreateViewStmt)
if !ok {
return
}

selectStmt, err := parser.New().ParseOneStmt(event.TableInfo.View.SelectStmt, "", "")
if err != nil {
log.Warn("parse stored create view select statement failed",
zap.String("selectStmt", event.TableInfo.View.SelectStmt),
zap.String("query", event.Query),
zap.Error(err))
return
}
// Keep the original CREATE VIEW text when the stored SELECT only qualifies tables in the view's own schema.
if createViewSelectUsesCurrentSchemaOnly(selectStmt, event.SchemaName) {
return
}

createViewStmt.Select = selectStmt
normalizedQuery, err := commonEvent.Restore(createViewStmt)
query, err := commonEvent.NormalizeCreateViewQueryWithStoredSelect(
event.Query,
event.TableInfo.View.SelectStmt,
event.SchemaName,
)
if err != nil {
log.Warn("restore normalized create view query failed",
log.Warn("normalize create view query with stored select failed",
zap.String("query", event.Query),
zap.String("selectStmt", event.TableInfo.View.SelectStmt),
zap.Error(err))
return
}
event.Query = normalizedQuery
}

func createViewSelectUsesCurrentSchemaOnly(selectStmt ast.StmtNode, currentSchema string) bool {
for _, schema := range extractTableSchemas(selectStmt) {
if schema != "" && !strings.EqualFold(schema, currentSchema) {
return false
}
}
return true
event.Query = query
}

func buildPersistedDDLEventForCreateTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
Expand Down
71 changes: 71 additions & 0 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3698,6 +3698,77 @@ func TestBuildPersistedDDLEventForCreateViewKeepsOriginalQueryForSameSchemaSelec
require.Equal(t, "v", ddl.TableName)
}

func TestBuildPersistedDDLEventForCreateViewQualifiesTableColumnReferences(t *testing.T) {
Comment thread
3AceShowHand marked this conversation as resolved.
cases := []struct {
name string
query string
selectStmt string
expected string
}{
{
name: "cross schema unaliased table qualifier",
query: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `orders`.`id` FROM `orders`",
selectStmt: "SELECT `orders`.`id` AS `id` FROM `source_db`.`orders`",
expected: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `source_db`.`orders`.`id` AS `id` FROM `source_db`.`orders`",
},
{
name: "same schema unaliased table qualifier",
query: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `users`.`id` FROM `users`",
selectStmt: "SELECT `users`.`id` AS `id` FROM `target_db`.`users`",
expected: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `target_db`.`users`.`id` AS `id` FROM `target_db`.`users`",
},
{
name: "alias is preserved",
query: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `orders`.`id` FROM `orders` AS `orders`",
selectStmt: "SELECT `orders`.`id` AS `id` FROM `source_db`.`orders` AS `orders`",
expected: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `orders`.`id` AS `id` FROM `source_db`.`orders` AS `orders`",
},
{
name: "ambiguous table qualifier is preserved",
query: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `t`.`id` FROM `t`",
selectStmt: "SELECT `t`.`id` AS `id` FROM `db1`.`t`, `db2`.`t`",
expected: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `t`.`id` AS `id` FROM (`db1`.`t`) JOIN `db2`.`t`",
},
{
name: "subquery scopes are independent",
query: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `q`.`id` FROM (SELECT `t`.`id` FROM `t`) AS `q`",
selectStmt: "SELECT `q`.`id` AS `id` FROM (SELECT `t`.`id` AS `id` FROM `source_db`.`t`) AS `q`",
expected: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `q`.`id` AS `id` FROM (SELECT `source_db`.`t`.`id` AS `id` FROM `source_db`.`t`) AS `q`",
},
{
name: "join table qualifier",
query: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `orders`.`id`, `customers`.`name` FROM `orders` JOIN `customers` ON `orders`.`customer_id` = `customers`.`id`",
selectStmt: "SELECT `orders`.`id` AS `id`, `customers`.`name` AS `name` FROM `source_db`.`orders` JOIN `crm_db`.`customers` ON `orders`.`customer_id` = `customers`.`id`",
expected: "CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `target_db`.`v` AS SELECT `source_db`.`orders`.`id` AS `id`,`crm_db`.`customers`.`name` AS `name` FROM `source_db`.`orders` JOIN `crm_db`.`customers` ON `source_db`.`orders`.`customer_id`=`crm_db`.`customers`.`id`",
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
job := buildCreateViewJobForTest(101, 100)
job.TableName = "v"
job.Query = tc.query
job.BinlogInfo.TableInfo = &model.TableInfo{
Name: ast.NewCIStr("v"),
View: &model.ViewInfo{
SelectStmt: tc.selectStmt,
},
}

ddl := buildPersistedDDLEventForCreateView(buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{
101: {Name: "target_db", Tables: map[int64]bool{}},
},
})

require.Equal(t, tc.expected, ddl.Query)
require.Equal(t, "target_db", ddl.SchemaName)
require.Equal(t, "v", ddl.TableName)
})
}
}

func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) {
cases := []struct {
name string
Expand Down
31 changes: 0 additions & 31 deletions logservice/schemastore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -121,33 +120,3 @@ func extractAddIndexIDs(job *model.Job) []int64 {
}
return res
}

type tableSchemaExtractor struct {
schemas []string
}

func (e *tableSchemaExtractor) Enter(in ast.Node) (ast.Node, bool) {
if t, ok := in.(*ast.TableName); ok {
e.schemas = append(e.schemas, t.Schema.O)
return in, true
}
return in, false
}

func (e *tableSchemaExtractor) Leave(in ast.Node) (ast.Node, bool) {
return in, true
}

// extractTableSchemas returns schema qualifiers from all *ast.TableName nodes in
// AST visit order. Unqualified tables contribute an empty schema name.
func extractTableSchemas(node ast.Node) []string {
if node == nil {
return nil
}

extractor := &tableSchemaExtractor{
schemas: make([]string, 0),
}
node.Accept(extractor)
return extractor.schemas
}
36 changes: 0 additions & 36 deletions logservice/schemastore/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
ticonfig "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -224,38 +223,3 @@ func TestGetIndexIDsIgnoresAddPrimaryKeySubJobsForMultiSchemaChange(t *testing.T
require.NotZero(t, anonymousIndexID)
require.Equal(t, []int64{anonymousIndexID}, getIndexIDs(job))
}

func TestExtractTableSchemas(t *testing.T) {
cases := []struct {
name string
query string
expected []string
}{
{
name: "unqualified table",
query: "SELECT * FROM `t`",
expected: []string{""},
},
{
name: "mixed qualified tables",
query: "SELECT * FROM `db1`.`t1` JOIN `t2` ON `db1`.`t1`.`id` = `t2`.`id`",
expected: []string{"db1", ""},
},
{
name: "subquery preserves visit order",
query: "SELECT * FROM `db1`.`t1` WHERE EXISTS (SELECT 1 FROM `db2`.`t2`)",
expected: []string{"db1", "db2"},
},
}

p := parser.New()
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
stmt, err := p.ParseOneStmt(tc.query, "", "")
require.NoError(t, err)
require.Equal(t, tc.expected, extractTableSchemas(stmt))
})
}

require.Nil(t, extractTableSchemas(nil))
}
Loading
Loading