Skip to content

Commit

Permalink
[release-17.0] VReplication: More intelligently manage vschema table …
Browse files Browse the repository at this point in the history
…entries on unsharded targets (#13220) (#13239)

* More intelligently manage vschema table entries on unsharded targets.

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Ensure every table at least has an empty definition

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add unit test

Signed-off-by: Matt Lord <mattalord@gmail.com>

---------

Signed-off-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
vitess-bot[bot] and mattlord authored Jun 5, 2023
1 parent b3eab36 commit 6cd0345
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 15 deletions.
37 changes: 22 additions & 15 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,37 @@ const (
createDDLAsCopyDropForeignKeys = "copy:drop_foreign_keys"
)

// addTablesToVSchema adds tables to an (unsharded) vschema. Depending on copyAttributes It will also add any sequence info
// that is associated with a table by copying it from the vschema of the source keyspace.
// For a migrate workflow we do not copy attributes since the source keyspace is just a proxy to import data into Vitess
// Todo: For now we only copy sequence but later we may also want to copy other attributes like authoritative column flag and list of columns
func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyAttributes bool) error {
// addTablesToVSchema adds tables to an (unsharded) vschema if they are not already defined.
// If copyVSchema is true then we copy over the vschema table definitions from the source,
// otherwise we create empty ones.
// For a migrate workflow we do not copy the vschema since the source keyspace is just a
// proxy to import data into Vitess.
func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyVSchema bool) error {
if targetVSchema.Tables == nil {
targetVSchema.Tables = make(map[string]*vschemapb.Table)
}
for _, table := range tables {
targetVSchema.Tables[table] = &vschemapb.Table{}
}

if copyAttributes { // if source keyspace is provided, copy over the sequence info.
if copyVSchema {
srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace)
if err != nil {
return err
return vterrors.Wrapf(err, "failed to get vschema for source keyspace %s", sourceKeyspace)
}
for _, table := range tables {
srcTable, ok := srcVSchema.Tables[table]
if ok {
targetVSchema.Tables[table].AutoIncrement = srcTable.AutoIncrement
srcTable, sok := srcVSchema.Tables[table]
if _, tok := targetVSchema.Tables[table]; sok && !tok {
targetVSchema.Tables[table] = srcTable
// If going from sharded to unsharded, then we need to remove the
// column vindexes as they are not valid for unsharded tables.
if srcVSchema.Sharded {
targetVSchema.Tables[table].ColumnVindexes = nil
}
}
}

}
// Ensure that each table at least has an empty definition on the target.
for _, table := range tables {
if _, tok := targetVSchema.Tables[table]; !tok {
targetVSchema.Tables[table] = &vschemapb.Table{}
}
}
return nil
}
Expand Down
193 changes: 193 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'"
Expand Down Expand Up @@ -2833,3 +2834,195 @@ func TestMoveTablesDDLFlag(t *testing.T) {
})
}
}

func TestAddTablesToVSchema(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("zone1")
srcks := "source"
wr := &Wrangler{
logger: logutil.NewMemoryLogger(),
ts: ts,
sourceTs: ts,
}
tests := []struct {
name string
sourceVSchema *vschemapb.Keyspace
inTargetVSchema *vschemapb.Keyspace
tables []string
copyVSchema bool
wantTargetVSchema *vschemapb.Keyspace
}{
{
name: "no target vschema; copy source vschema",
sourceVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{},
tables: []string{"t1", "t2", "t3", "t4"},
copyVSchema: true,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {},
},
},
},
{
name: "no target vschema; copy source vschema; sharded source",
sourceVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {
ColumnVindexes: []*vschemapb.ColumnVindex{ // Should be stripped on target
{
Column: "c1",
Name: "hash",
},
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{},
tables: []string{"t1", "t2", "t3", "t4"},
copyVSchema: true,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {},
},
},
},
{
name: "target vschema; copy source vschema",
sourceVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {
ColumnVindexes: []*vschemapb.ColumnVindex{ // Should be stripped on target
{
Column: "c1",
Name: "hash",
},
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {},
"t3": {},
"t4": {},
},
},
tables: []string{"t1", "t2", "t3", "t4"},
copyVSchema: true,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {},
"t3": {},
"t4": {},
},
},
},
{
name: "no target vschema; do not copy source vschema",
sourceVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{},
tables: []string{"t1", "t2"},
copyVSchema: false,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {},
"t2": {},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.SaveVSchema(ctx, srcks, tt.sourceVSchema)
err := wr.addTablesToVSchema(ctx, srcks, tt.inTargetVSchema, tt.tables, tt.copyVSchema)
require.NoError(t, err)
require.Equal(t, tt.wantTargetVSchema, tt.inTargetVSchema)
})
}
}

0 comments on commit 6cd0345

Please sign in to comment.