diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index f60b43a49ab..6dea5d3b9f1 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -71,30 +71,37 @@ const ( createDDLAsCopyDropForeignKeys = "copy:drop_foreign_keys" ) -// addTablesToVSchema adds tables to an (unsharded) vschema. Depending on copyAttributes It will also add any sequence info -// that is associated with a table by copying it from the vschema of the source keyspace. -// For a migrate workflow we do not copy attributes since the source keyspace is just a proxy to import data into Vitess -// Todo: For now we only copy sequence but later we may also want to copy other attributes like authoritative column flag and list of columns -func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyAttributes bool) error { +// addTablesToVSchema adds tables to an (unsharded) vschema if they are not already defined. +// If copyVSchema is true then we copy over the vschema table definitions from the source, +// otherwise we create empty ones. +// For a migrate workflow we do not copy the vschema since the source keyspace is just a +// proxy to import data into Vitess. +func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyVSchema bool) error { if targetVSchema.Tables == nil { targetVSchema.Tables = make(map[string]*vschemapb.Table) } - for _, table := range tables { - targetVSchema.Tables[table] = &vschemapb.Table{} - } - - if copyAttributes { // if source keyspace is provided, copy over the sequence info. + if copyVSchema { srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace) if err != nil { - return err + return vterrors.Wrapf(err, "failed to get vschema for source keyspace %s", sourceKeyspace) } for _, table := range tables { - srcTable, ok := srcVSchema.Tables[table] - if ok { - targetVSchema.Tables[table].AutoIncrement = srcTable.AutoIncrement + srcTable, sok := srcVSchema.Tables[table] + if _, tok := targetVSchema.Tables[table]; sok && !tok { + targetVSchema.Tables[table] = srcTable + // If going from sharded to unsharded, then we need to remove the + // column vindexes as they are not valid for unsharded tables. + if srcVSchema.Sharded { + targetVSchema.Tables[table].ColumnVindexes = nil + } } } - + } + // Ensure that each table at least has an empty definition on the target. + for _, table := range tables { + if _, tok := targetVSchema.Tables[table]; !tok { + targetVSchema.Tables[table] = &vschemapb.Table{} + } } return nil } diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 8a7e68f9d82..8f92a814f06 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -37,6 +37,7 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vtgate/vindexes" ) const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'" @@ -2833,3 +2834,195 @@ func TestMoveTablesDDLFlag(t *testing.T) { }) } } + +func TestAddTablesToVSchema(t *testing.T) { + ctx := context.Background() + ts := memorytopo.NewServer("zone1") + srcks := "source" + wr := &Wrangler{ + logger: logutil.NewMemoryLogger(), + ts: ts, + sourceTs: ts, + } + tests := []struct { + name string + sourceVSchema *vschemapb.Keyspace + inTargetVSchema *vschemapb.Keyspace + tables []string + copyVSchema bool + wantTargetVSchema *vschemapb.Keyspace + }{ + { + name: "no target vschema; copy source vschema", + sourceVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": { + Type: vindexes.TypeSequence, + }, + "t3": { + AutoIncrement: &vschemapb.AutoIncrement{ + Column: "c1", + Sequence: "t2", + }, + }, + }, + }, + inTargetVSchema: &vschemapb.Keyspace{}, + tables: []string{"t1", "t2", "t3", "t4"}, + copyVSchema: true, + wantTargetVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": { + Type: vindexes.TypeSequence, + }, + "t3": { + AutoIncrement: &vschemapb.AutoIncrement{ + Column: "c1", + Sequence: "t2", + }, + }, + "t4": {}, + }, + }, + }, + { + name: "no target vschema; copy source vschema; sharded source", + sourceVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": { + Type: vindexes.TypeSequence, + }, + "t3": { + AutoIncrement: &vschemapb.AutoIncrement{ + Column: "c1", + Sequence: "t2", + }, + }, + "t4": { + ColumnVindexes: []*vschemapb.ColumnVindex{ // Should be stripped on target + { + Column: "c1", + Name: "hash", + }, + }, + }, + }, + }, + inTargetVSchema: &vschemapb.Keyspace{}, + tables: []string{"t1", "t2", "t3", "t4"}, + copyVSchema: true, + wantTargetVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": { + Type: vindexes.TypeSequence, + }, + "t3": { + AutoIncrement: &vschemapb.AutoIncrement{ + Column: "c1", + Sequence: "t2", + }, + }, + "t4": {}, + }, + }, + }, + { + name: "target vschema; copy source vschema", + sourceVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": { + Type: vindexes.TypeSequence, + }, + "t3": { + AutoIncrement: &vschemapb.AutoIncrement{ + Column: "c1", + Sequence: "t2", + }, + }, + "t4": { + ColumnVindexes: []*vschemapb.ColumnVindex{ // Should be stripped on target + { + Column: "c1", + Name: "hash", + }, + }, + }, + }, + }, + inTargetVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": {}, + "t3": {}, + "t4": {}, + }, + }, + tables: []string{"t1", "t2", "t3", "t4"}, + copyVSchema: true, + wantTargetVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": {}, + "t3": {}, + "t4": {}, + }, + }, + }, + { + name: "no target vschema; do not copy source vschema", + sourceVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + "t2": { + Type: vindexes.TypeSequence, + }, + "t3": { + AutoIncrement: &vschemapb.AutoIncrement{ + Column: "c1", + Sequence: "t2", + }, + }, + }, + }, + inTargetVSchema: &vschemapb.Keyspace{}, + tables: []string{"t1", "t2"}, + copyVSchema: false, + wantTargetVSchema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": {}, + "t2": {}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.SaveVSchema(ctx, srcks, tt.sourceVSchema) + err := wr.addTablesToVSchema(ctx, srcks, tt.inTargetVSchema, tt.tables, tt.copyVSchema) + require.NoError(t, err) + require.Equal(t, tt.wantTargetVSchema, tt.inTargetVSchema) + }) + } +}