Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemadiff: validate and apply foreign key indexes #12026

Merged
merged 25 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5acb9f3
more testing for getViewDependentTableNames()
shlomi-noach Dec 12, 2022
9e6e31c
getForeignKeyParentTableNames
shlomi-noach Dec 12, 2022
9224d01
schema normalization: sort tables by fk reference order
shlomi-noach Dec 12, 2022
03139da
minor test addition
shlomi-noach Dec 12, 2022
5079877
ForeignKeyDependencyUnresolvedError
shlomi-noach Dec 12, 2022
29bac39
InvalidReferencedColumnInForeignKeyConstraintError, MismatchingForeig…
shlomi-noach Dec 12, 2022
87e00ee
validate or autogenerate index on local table based on foreign key co…
shlomi-noach Dec 21, 2022
fb5b19d
validate referenced table key
shlomi-noach Dec 23, 2022
fc78e0b
ColumnKeyOption consts become public
shlomi-noach Dec 27, 2022
fc5d4b6
adding DuplicateKeyNameError
shlomi-noach Dec 27, 2022
e09acc2
normalize PRIMARY KEY definition: a 'id int primary key' column defin…
shlomi-noach Dec 27, 2022
8631527
Merge branch 'main' into schema-diff-normalize-primary-key
shlomi-noach Dec 27, 2022
b0a8c6f
further unit test adaptations
shlomi-noach Dec 27, 2022
d9cd695
Merge branch 'main' into schemadiff-fk-keys
shlomi-noach Jan 1, 2023
0f8c986
merge main, resolve conflict
shlomi-noach Jan 1, 2023
6eded6c
adapting tests: adding required local index over foreign key columns
shlomi-noach Jan 1, 2023
c0a3df4
implicitly add key over foreign key columns if no appropriate key exists
shlomi-noach Jan 1, 2023
211f85d
normalize CREATE TABLE statement to include missing foreign key indexes
shlomi-noach Jan 1, 2023
76a4834
rename error: ForeignKeyColumnCountMismatchError
shlomi-noach Jan 23, 2023
fd46324
rename MismatchingForeignKeyColumnTypeError to ForeignKeyColumnTypeMi…
shlomi-noach Jan 23, 2023
9daec2b
changes per review
shlomi-noach Jan 23, 2023
6a3548d
merge main, resolve conflict
shlomi-noach Jan 23, 2023
3fe002c
add a few tests that validate an implicit index isn't added if not ne…
shlomi-noach Jan 24, 2023
d175211
case insensitive column comparison in indexCoversColumnsInOrder and i…
shlomi-noach Jan 24, 2023
16e1ed3
MissingForeignKeyIndexError is not needed bcause, compatible with MyS…
shlomi-noach Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions go/vt/schemadiff/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,32 @@ func (e *ForeignKeyColumnTypeMismatchError) Error() string {
)
}

type MissingForeignKeyReferencedIndexError struct {
Table string
Constraint string
ReferencedTable string
}

func (e *MissingForeignKeyReferencedIndexError) Error() string {
return fmt.Sprintf("missing index in referenced table %s for foreign key constraint %s in table %s",
sqlescape.EscapeID(e.ReferencedTable),
sqlescape.EscapeID(e.Constraint),
sqlescape.EscapeID(e.Table),
)
}

type IndexNeededByForeignKeyError struct {
Table string
Key string
}

func (e *IndexNeededByForeignKeyError) Error() string {
return fmt.Sprintf("key %s needed by a foreign key constraint in table %s",
sqlescape.EscapeID(e.Key),
sqlescape.EscapeID(e.Table),
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure why these are 3 different error types when they seem all pretty similar? Would it be simpler to model this as one?

The first two seem to be the case of there's an FK constraint to the table itself, but in that case I think it's ok if Table and ReferencedTable contain the same value?

Not sure about the 3rd one, but that seems to not have a constraint name and not sure when that happens? Don't we always have to generate a constraint name anyway if it's missing and would it be fine if that name was set then on the same error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the 1st error, MissingForeignKeyIndexError: I've just removed it. It can never fire because, compatible with MySQL's behavior, we always add an index if one is needed, either in a CREATE TABLE statement or in a ALTER TABLE ... ADD ... FOREIGN KEY statement.

2nd error, MissingForeignKeyReferencedIndexError, is when the parent table does not have an index covering the referenced columns. Here, MySQL does not add anything implicitly, because that's on a different table. The error is compatible with MySQL.

3rd error, IndexNeededByForeignKeyError is for when you attempt to ALTER TABLE ... DROP KEY and when that leaves a foreign key without an index covering its local columns. Compatible with MySQL, that's an error.


type ViewDependencyUnresolvedError struct {
View string
}
Expand Down
4 changes: 3 additions & 1 deletion go/vt/schemadiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,9 @@ func (s *Schema) normalize() error {
}
}

// TODO(shlomi): find a valid index
if !referencedTable.columnsCoveredByInOrderIndex(check.ReferenceDefinition.ReferencedColumns) {
return &MissingForeignKeyReferencedIndexError{Table: t.Name(), Constraint: cs.Name.String(), ReferencedTable: referencedTableName}
}
}
}
return nil
Expand Down
36 changes: 20 additions & 16 deletions go/vt/schemadiff/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,15 @@ func TestGetForeignKeyParentTableNames(t *testing.T) {

func TestTableForeignKeyOrdering(t *testing.T) {
fkQueries := []string{
"create table t11 (id int primary key, i int, constraint f12 foreign key (i) references t12(id) on delete restrict, constraint f20 foreign key (i) references t20(id) on delete restrict)",
"create table t11 (id int primary key, i int, key ix (i), constraint f12 foreign key (i) references t12(id) on delete restrict, constraint f20 foreign key (i) references t20(id) on delete restrict)",
"create table t15(id int, primary key(id))",
"create view v09 as select * from v13, t17",
"create table t20 (id int primary key, i int, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t20 (id int primary key, i int, key ix (i), constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create view v13 as select * from t20",
"create table t12 (id int primary key, i int, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t17 (id int primary key, i int, constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t16 (id int primary key, i int, constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t14 (id int primary key, i int, constraint f14 foreign key (i) references t14(id) on delete restrict)",
"create table t12 (id int primary key, i int, key ix (i), constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t17 (id int primary key, i int, key ix (i), constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t16 (id int primary key, i int, key ix (i), constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t14 (id int primary key, i int, key ix (i), constraint f14 foreign key (i) references t14(id) on delete restrict)",
}
expectSortedTableNames := []string{
"t14",
Expand Down Expand Up @@ -287,10 +287,10 @@ func TestInvalidSchema(t *testing.T) {
expectErr error
}{
{
schema: "create table t11 (id int primary key, i int, constraint f11 foreign key (i) references t11(id) on delete restrict)",
schema: "create table t11 (id int primary key, i int, key ix(i), constraint f11 foreign key (i) references t11(id) on delete restrict)",
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
},
{
schema: "create table t11 (id int primary key, i int, constraint f11 foreign key (i7) references t11(id) on delete restrict)",
Expand All @@ -309,35 +309,39 @@ func TestInvalidSchema(t *testing.T) {
expectErr: &ForeignKeyDependencyUnresolvedError{Table: "t11"},
},
{
schema: "create table t11 (id int primary key, i int, constraint f11 foreign key (i) references t11(id2) on delete restrict)",
schema: "create table t11 (id int primary key, i int, key ix(i), constraint f11 foreign key (i) references t11(id2) on delete restrict)",
expectErr: &InvalidReferencedColumnInForeignKeyConstraintError{Table: "t11", Constraint: "f11", ReferencedTable: "t11", ReferencedColumn: "id2"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, constraint f10 foreign key (i) references t10(x) on delete restrict)",
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(x) on delete restrict)",
expectErr: &InvalidReferencedColumnInForeignKeyConstraintError{Table: "t11", Constraint: "f10", ReferencedTable: "t10", ReferencedColumn: "x"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int unsigned, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id int primary key, i int); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(i) on delete restrict)",
expectErr: &MissingForeignKeyReferencedIndexError{Table: "t11", Constraint: "f10", ReferencedTable: "t10"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int unsigned, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i bigint, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id int primary key); create table t11 (id int primary key, i bigint, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i int, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i varchar(100), constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i varchar(100), key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
// InnoDB allows different string length
schema: "create table t10(id varchar(50) primary key); create table t11 (id int primary key, i varchar(100), constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id varchar(50) primary key); create table t11 (id int primary key, i varchar(100), key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
},
{
schema: "create table t10(id varchar(50) charset utf8mb3 primary key); create table t11 (id int primary key, i varchar(100) charset utf8mb4, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id varchar(50) charset utf8mb3 primary key); create table t11 (id int primary key, i varchar(100) charset utf8mb4, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
}
Expand Down
125 changes: 106 additions & 19 deletions go/vt/schemadiff/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ func NewCreateTableEntity(c *sqlparser.CreateTable) (*CreateTableEntity, error)
// - table option case (upper/lower/special)
// The function returns this receiver as courtesy
func (c *CreateTableEntity) normalize() *CreateTableEntity {
c.normalizeKeys()
c.normalizePrimaryKeyColumns()
c.normalizeForeignKeyIndexes() // implicitly add missing indexes for foreign keys
c.normalizeKeys() // assign names to keys
c.normalizeUnnamedConstraints()
c.normalizeTableOptions()
c.normalizeColumnOptions()
Expand Down Expand Up @@ -622,7 +624,7 @@ func (c *CreateTableEntity) normalizeKeys() {
}

func (c *CreateTableEntity) normalizeUnnamedConstraints() {
// let's ensure all keys have names
// let's ensure all constraints have names
constraintNameExists := map[string]bool{}
// first, we iterate and take note for all keys that do already have names
for _, constraint := range c.CreateTable.TableSpec.Constraints {
Expand Down Expand Up @@ -650,6 +652,34 @@ func (c *CreateTableEntity) normalizeUnnamedConstraints() {
}
}

func (c *CreateTableEntity) normalizeForeignKeyIndexes() {
for _, constraint := range c.CreateTable.TableSpec.Constraints {
fk, ok := constraint.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if !c.columnsCoveredByInOrderIndex(fk.Source) {
// We add a foreign key, but the local FK columns are not indexed.
// MySQL's behavior is to implicitly add an index that covers the foreign key's local columns.
// The name of the index is either:
// - the same name of the constraint, if such name is provided
// - and error if an index by this name exists
// - or, a standard auto-generated index name, if the constraint name is not provided
indexDefinition := &sqlparser.IndexDefinition{
Info: &sqlparser.IndexInfo{
Type: "key",
Name: constraint.Name, // if name is empty, then the name is later auto populated
},
}
for _, col := range fk.Source {
indexColumn := &sqlparser.IndexColumn{Column: col}
indexDefinition.Columns = append(indexDefinition.Columns, indexColumn)
}
c.TableSpec.Indexes = append(c.TableSpec.Indexes, indexDefinition)
}
}
}

// Name implements Entity interface
func (c *CreateTableEntity) Name() string {
return c.CreateTable.GetTable().Name.String()
Expand Down Expand Up @@ -1734,6 +1764,21 @@ func (c *CreateTableEntity) apply(diff *AlterTableEntityDiff) error {
if !found {
return &ApplyKeyNotFoundError{Table: c.Name(), Key: opt.Name.String()}
}

// Now, if this is a normal key being dropped, let's validate it does not leave any foreign key constraint uncovered
switch opt.Type {
case sqlparser.PrimaryKeyType, sqlparser.NormalKeyType:
for _, cs := range c.CreateTable.TableSpec.Constraints {
fk, ok := cs.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if !c.columnsCoveredByInOrderIndex(fk.Source) {
return &IndexNeededByForeignKeyError{Table: c.Name(), Key: opt.Name.String()}
}
}
}

case *sqlparser.AddIndexDefinition:
// validate no existing key by same name
keyName := opt.IndexDefinition.Info.Name.String()
Expand Down Expand Up @@ -1966,6 +2011,7 @@ func (c *CreateTableEntity) Apply(diff EntityDiff) (Entity, error) {
// - edit or remove keys if referenced columns are dropped
// - drop check constraints for a single specific column if that column
// is the only referenced column in that check constraint.
// - add implicit keys for foreign key constraint, if needed
func (c *CreateTableEntity) postApplyNormalize() error {
// reduce or remove keys based on existing column list
// (a column may have been removed)postApplyNormalize
Expand Down Expand Up @@ -2027,6 +2073,10 @@ func (c *CreateTableEntity) postApplyNormalize() error {
}
c.CreateTable.TableSpec.Constraints = keptConstraints

c.normalizePrimaryKeyColumns()
c.normalizeForeignKeyIndexes()
c.normalizeKeys()

return nil
}

Expand Down Expand Up @@ -2057,6 +2107,44 @@ func getKeyColumnNames(key *sqlparser.IndexDefinition) (colNames map[string]bool
return colNames
}

// indexCoversColumnsInOrder checks if the given index covers the given columns in order and in prefix.
// the index must either covers the exact list of columns or continue to cover additional columns beyond.
// Used for validating indexes covering foreign keys.
func indexCoversColumnsInOrder(index *sqlparser.IndexDefinition, columns sqlparser.Columns) bool {
if len(columns) == 0 {
return false
}
if len(index.Columns) < len(columns) {
// obviously the index doesn't cover the required columns
return false
}
for i, col := range columns {
// the index must cover same columns, in order, wih possibly more columns covered than requested.
indexCol := index.Columns[i]
if !strings.EqualFold(col.String(), indexCol.Column.String()) {
return false
}
}
return true
}

// indexesCoveringForeignKeyColumns returns a list of indexes that cover a given list of coumns, in-oder and in prefix.
// Used for validating indexes covering foreign keys.
func (c *CreateTableEntity) indexesCoveringForeignKeyColumns(columns sqlparser.Columns) (indexes []*sqlparser.IndexDefinition) {
for _, index := range c.CreateTable.TableSpec.Indexes {
if indexCoversColumnsInOrder(index, columns) {
indexes = append(indexes, index)
}
}
return indexes
}

// columnsCoveredByInOrderIndex returns 'true' when there is at least one index that covers the given
// list of columns in-order and in-prefix.
func (c *CreateTableEntity) columnsCoveredByInOrderIndex(columns sqlparser.Columns) bool {
return len(c.indexesCoveringForeignKeyColumns(columns)) > 0
}

func (c *CreateTableEntity) validateDuplicateKeyNameError() error {
keyNames := map[string]bool{}
for _, key := range c.CreateTable.TableSpec.Indexes {
Expand All @@ -2080,6 +2168,22 @@ func (c *CreateTableEntity) validate() error {
}
columnExists[colName] = true
}
// validate all columns used by foreign key constraints do in fact exist,
// and that there exists an index over those columns
for _, cs := range c.CreateTable.TableSpec.Constraints {
fk, ok := cs.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if len(fk.Source) != len(fk.ReferenceDefinition.ReferencedColumns) {
return &ForeignKeyColumnCountMismatchError{Table: c.Name(), Constraint: cs.Name.String(), ColumnCount: len(fk.Source), ReferencedTable: fk.ReferenceDefinition.ReferencedTable.Name.String(), ReferencedColumnCount: len(fk.ReferenceDefinition.ReferencedColumns)}
}
for _, col := range fk.Source {
if !columnExists[col.Lowered()] {
return &InvalidColumnInForeignKeyConstraintError{Table: c.Name(), Constraint: cs.Name.String(), Column: col.String()}
}
}
}
// validate all columns referenced by indexes do in fact exist
for _, key := range c.CreateTable.TableSpec.Indexes {
for colName := range getKeyColumnNames(key) {
Expand Down Expand Up @@ -2130,23 +2234,6 @@ func (c *CreateTableEntity) validate() error {
}
}
}
// validate all columns used by foreign key constraints do in fact exist,
// and that there exists an index over those columns
for _, cs := range c.CreateTable.TableSpec.Constraints {
check, ok := cs.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if len(check.Source) != len(check.ReferenceDefinition.ReferencedColumns) {
return &ForeignKeyColumnCountMismatchError{Table: c.Name(), Constraint: cs.Name.String(), ColumnCount: len(check.Source), ReferencedTable: check.ReferenceDefinition.ReferencedTable.Name.String(), ReferencedColumnCount: len(check.ReferenceDefinition.ReferencedColumns)}
}
for _, col := range check.Source {
if !columnExists[col.Lowered()] {
return &InvalidColumnInForeignKeyConstraintError{Table: c.Name(), Constraint: cs.Name.String(), Column: col.String()}
}
}
// TODO(shlomi): find a valid index
}
// validate all columns referenced by constraint checks do in fact exist
for _, cs := range c.CreateTable.TableSpec.Constraints {
check, ok := cs.Details.(*sqlparser.CheckConstraintDefinition)
Expand Down
Loading