Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemadiff: foreign key validation (tables and columns) #11944

Merged
merged 10 commits into from
Jan 23, 2023
55 changes: 54 additions & 1 deletion go/vt/schemadiff/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,70 @@ func (e *InvalidColumnInCheckConstraintError) Error() string {
sqlescape.EscapeID(e.Column), sqlescape.EscapeID(e.Constraint), sqlescape.EscapeID(e.Table))
}

type ForeignKeyDependencyUnresolvedError struct {
Table string
}

func (e *ForeignKeyDependencyUnresolvedError) Error() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a unit test for this new error type. 4 new errors, but only 1 new unit test.

Copy link
Contributor Author

return fmt.Sprintf("table %s has unresolved/loop foreign key dependencies",
sqlescape.EscapeID(e.Table))
}

type InvalidColumnInForeignKeyConstraintError struct {
Table string
Constraint string
Column string
}

func (e *InvalidColumnInForeignKeyConstraintError) Error() string {
return fmt.Sprintf("invalid column %s referenced by foreign key constraint %s in table %s",
return fmt.Sprintf("invalid column %s covered by foreign key constraint %s in table %s",
sqlescape.EscapeID(e.Column), sqlescape.EscapeID(e.Constraint), sqlescape.EscapeID(e.Table))
}

type InvalidReferencedColumnInForeignKeyConstraintError struct {
Table string
Constraint string
ReferencedTable string
ReferencedColumn string
}

func (e *InvalidReferencedColumnInForeignKeyConstraintError) Error() string {
return fmt.Sprintf("invalid column %s.%s referenced by foreign key constraint %s in table %s",
sqlescape.EscapeID(e.ReferencedTable), sqlescape.EscapeID(e.ReferencedColumn), sqlescape.EscapeID(e.Constraint), sqlescape.EscapeID(e.Table))
}

type ForeignKeyColumnCountMismatchError struct {
Table string
Constraint string
ColumnCount int
ReferencedTable string
ReferencedColumnCount int
}

func (e *ForeignKeyColumnCountMismatchError) Error() string {
return fmt.Sprintf("mismatching column count %d referenced by foreign key constraint %s in table %s. Expected %d",
e.ReferencedColumnCount, sqlescape.EscapeID(e.Constraint), sqlescape.EscapeID(e.Table), e.ColumnCount)
}

type ForeignKeyColumnTypeMismatchError struct {
Table string
Constraint string
Column string
ReferencedTable string
ReferencedColumn string
}

func (e *ForeignKeyColumnTypeMismatchError) Error() string {
return fmt.Sprintf("mismatching column type %s.%s and %s.%s referenced by foreign key constraint %s in table %s",
sqlescape.EscapeID(e.ReferencedTable),
sqlescape.EscapeID(e.ReferencedColumn),
sqlescape.EscapeID(e.Table),
sqlescape.EscapeID(e.Column),
sqlescape.EscapeID(e.Constraint),
sqlescape.EscapeID(e.Table),
)
}

type ViewDependencyUnresolvedError struct {
View string
}
Expand Down
147 changes: 140 additions & 7 deletions go/vt/schemadiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Schema struct {

named map[string]Entity
sorted []Entity

foreignKeyParents []*CreateTableEntity // subset of tables
foreignKeyChildren []*CreateTableEntity // subset of tables
}

// newEmptySchema is used internally to initialize a Schema object
Expand All @@ -44,6 +47,9 @@ func newEmptySchema() *Schema {
views: []*CreateViewEntity{},
named: map[string]Entity{},
sorted: []Entity{},

foreignKeyParents: []*CreateTableEntity{},
foreignKeyChildren: []*CreateTableEntity{},
}
return schema
}
Expand Down Expand Up @@ -122,6 +128,18 @@ func NewSchemaFromSQL(sql string) (*Schema, error) {
return NewSchemaFromStatements(statements)
}

// getForeignKeyParentTableNames analyzes a CREATE TABLE definition and extracts all referened foreign key tables names.
// A table name may appear twice in the result output, it it is referenced by more than one foreign key
func getForeignKeyParentTableNames(createTable *sqlparser.CreateTable) (names []string, err error) {
for _, cs := range createTable.TableSpec.Constraints {
if check, ok := cs.Details.(*sqlparser.ForeignKeyDefinition); ok {
parentTableName := check.ReferenceDefinition.ReferencedTable.Name.String()
names = append(names, parentTableName)
}
}
return names, err
}

// getViewDependentTableNames analyzes a CREATE VIEW definition and extracts all tables/views read by this view
func getViewDependentTableNames(createView *sqlparser.CreateView) (names []string, err error) {
err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
Expand Down Expand Up @@ -175,10 +193,6 @@ func (s *Schema) normalize() error {
// If a view v1 depends on v2, then v2 must come before v1, even though v1
// precedes v2 alphabetically
dependencyLevels := make(map[string]int, len(s.tables)+len(s.views))
for _, t := range s.tables {
s.sorted = append(s.sorted, t)
dependencyLevels[t.Name()] = 0
}

allNamesFoundInLowerLevel := func(names []string, level int) bool {
for _, name := range names {
Expand All @@ -198,12 +212,58 @@ func (s *Schema) normalize() error {
return true
}

// We now iterate all tables. We iterate "dependency levels":
// - first we want all tables that don't have foreign keys or which only reference themselves
// - then we only want tables that reference 1st level tables. these are 2nd level tables
// - etc.
// we stop when we have been unable to find a table in an iteration.
fkParents := map[string]bool{}
iterationLevel := 0
for {
handledAnyTablesInIteration := false
for _, t := range s.tables {
name := t.Name()
if _, ok := dependencyLevels[name]; ok {
// already handled; skip
continue
}
// Not handled. Is this view dependent on already handled objects?
referencedTableNames, err := getForeignKeyParentTableNames(t.CreateTable)
if err != nil {
return err
}
if len(referencedTableNames) > 0 {
s.foreignKeyChildren = append(s.foreignKeyChildren, t)
}
nonSelfReferenceNames := []string{}
for _, referencedTableName := range referencedTableNames {
if referencedTableName != name {
nonSelfReferenceNames = append(nonSelfReferenceNames, referencedTableName)
}
fkParents[referencedTableName] = true
}
if allNamesFoundInLowerLevel(nonSelfReferenceNames, iterationLevel) {
s.sorted = append(s.sorted, t)
dependencyLevels[t.Name()] = iterationLevel
handledAnyTablesInIteration = true
}
}
if !handledAnyTablesInIteration {
break
}
iterationLevel++
}
for _, t := range s.tables {
if fkParents[t.Name()] {
s.foreignKeyParents = append(s.foreignKeyParents, t)
}
}
// We now iterate all views. We iterate "dependency levels":
// - first we want all views that only depend on tables. These are 1st level views.
// - then we only want views that depend on 1st level views or on tables. These are 2nd level views.
// - etc.
// we stop when we have been unable to find a view in an iteration.
for iterationLevel := 1; ; iterationLevel++ {
for {
handledAnyViewsInIteration := false
for _, v := range s.views {
name := v.Name()
Expand All @@ -225,11 +285,21 @@ func (s *Schema) normalize() error {
if !handledAnyViewsInIteration {
break
}
iterationLevel++
}
if len(s.sorted) != len(s.tables)+len(s.views) {
// We have leftover views. This can happen if the schema definition is invalid:
// We have leftover tables or views. This can happen if the schema definition is invalid:
// - a table's foreign key references a nonexistent table
// - two or more tables have circular FK dependency
// - a view depends on a nonexistent table
// - two views have a circular dependency
// - two or more views have a circular dependency
for _, t := range s.tables {
if _, ok := dependencyLevels[t.Name()]; !ok {
// We _know_ that in this iteration, at least one view is found unassigned a dependency level.
// We return the first one.
return &ForeignKeyDependencyUnresolvedError{Table: t.Name()}
}
}
for _, v := range s.views {
if _, ok := dependencyLevels[v.Name()]; !ok {
// We _know_ that in this iteration, at least one view is found unassigned a dependency level.
Expand All @@ -238,6 +308,69 @@ func (s *Schema) normalize() error {
}
}
}

// Validate table definitions
for _, t := range s.tables {
if err := t.validate(); err != nil {
return err
}
}
colTypeEqualForForeignKey := func(a, b sqlparser.ColumnType) bool {
return a.Type == b.Type &&
a.Unsigned == b.Unsigned &&
a.Zerofill == b.Zerofill &&
sqlparser.Equals.ColumnCharset(a.Charset, b.Charset) &&
sqlparser.Equals.SliceOfString(a.EnumValues, b.EnumValues)
}

// Now validate foreign key columns:
// - referenced table columns must exist
// - foreign key columns must match in count and type to referenced table columns
// - referenced table has an appropriate index over referenced columns
for _, t := range s.tables {
if len(t.TableSpec.Constraints) == 0 {
continue
}

tableColumns := map[string]*sqlparser.ColumnDefinition{}
for _, col := range t.CreateTable.TableSpec.Columns {
colName := col.Name.Lowered()
tableColumns[colName] = col
}

for _, cs := range t.TableSpec.Constraints {
check, ok := cs.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
referencedTableName := check.ReferenceDefinition.ReferencedTable.Name.String()
referencedTable := s.Table(referencedTableName) // we know this exists because we validated foreign key dependencies earlier on

referencedColumns := map[string]*sqlparser.ColumnDefinition{}
for _, col := range referencedTable.CreateTable.TableSpec.Columns {
colName := col.Name.Lowered()
referencedColumns[colName] = col
}
// Thanks to table validation, we already know the foreign key covered columns count is equal to the
// referenced table column count. Now ensure their types are identical
for i, col := range check.Source {
coveredColumn, ok := tableColumns[col.Lowered()]
if !ok {
return &InvalidColumnInForeignKeyConstraintError{Table: t.Name(), Constraint: cs.Name.String(), Column: col.String()}
}
referencedColumnName := check.ReferenceDefinition.ReferencedColumns[i].Lowered()
referencedColumn, ok := referencedColumns[referencedColumnName]
if !ok {
return &InvalidReferencedColumnInForeignKeyConstraintError{Table: t.Name(), Constraint: cs.Name.String(), ReferencedTable: referencedTableName, ReferencedColumn: referencedColumnName}
}
if !colTypeEqualForForeignKey(coveredColumn.Type, referencedColumn.Type) {
return &ForeignKeyColumnTypeMismatchError{Table: t.Name(), Constraint: cs.Name.String(), Column: coveredColumn.Name.String(), ReferencedTable: referencedTableName, ReferencedColumn: referencedColumnName}
}
}

// TODO(shlomi): find a valid index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is having an index a logical strict requirement or more an implementation detail of how MySQL uses it to enforce constraints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MySQL requires you to have an index over foreign key columns; a future PR enforces that. If you create a foreign key without creating an index, MySQL implicitly creates one on your behalf. The key must cover all the FK columns as a prefix. See future PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future PR being #12026

}
}
return nil
}

Expand Down
Loading