From 92a1d0158c58a853a06d3362a1d9a7e3a95102be Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 15 Oct 2021 17:42:21 -0700 Subject: [PATCH 1/8] Continue moving toward a public-only interface between trafficswitcher+wrangler This will make the eventual move of trafficswitcher into `package workflow` simpler and smaller in scope Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/traffic_switcher.go | 6 + go/vt/wrangler/switcher.go | 2 +- go/vt/wrangler/switcher_dry_run.go | 82 ++-- go/vt/wrangler/traffic_switcher.go | 474 ++++++++++++----------- go/vt/wrangler/vdiff.go | 40 +- 5 files changed, 306 insertions(+), 298 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 8ada84dbaa6..cd2161a4b75 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -108,12 +108,18 @@ type ITrafficSwitcher interface { ReverseWorkflowName() string SourceKeyspaceName() string SourceKeyspaceSchema() *vindexes.KeyspaceSchema + Sources() map[string]*MigrationSource + Tables() []string + TargetKeyspaceName() string + Targets() map[string]*MigrationTarget WorkflowName() string /* Functions that *wrangler.trafficSwitcher implements */ ForAllSources(f func(source *MigrationSource) error) error ForAllTargets(f func(target *MigrationTarget) error) error + SourceShards() []*topo.ShardInfo + TargetShards() []*topo.ShardInfo } // TargetInfo contains the metadata for a set of targets involved in a workflow. diff --git a/go/vt/wrangler/switcher.go b/go/vt/wrangler/switcher.go index 6980b5085aa..27ff564dd17 100644 --- a/go/vt/wrangler/switcher.go +++ b/go/vt/wrangler/switcher.go @@ -105,7 +105,7 @@ func (r *switcher) stopStreams(ctx context.Context, sm *workflow.StreamMigrator) } func (r *switcher) cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) { - r.ts.wr.Logger().Infof("Cancel was requested.") + r.ts.Logger().Infof("Cancel was requested.") r.ts.cancelMigration(ctx, sm) } diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index e14d0aa51b9..a0e895c1a8d 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -50,34 +50,34 @@ func (dr *switcherDryRun) deleteRoutingRules(ctx context.Context) error { func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { sourceShards := make([]string, 0) targetShards := make([]string, 0) - for _, source := range dr.ts.sources { + for _, source := range dr.ts.Sources() { sourceShards = append(sourceShards, source.GetShard().ShardName()) } - for _, target := range dr.ts.targets { + for _, target := range dr.ts.Targets() { targetShards = append(targetShards, target.GetShard().ShardName()) } sort.Strings(sourceShards) sort.Strings(targetShards) if direction == workflow.DirectionForward { dr.drLog.Log(fmt.Sprintf("Switch reads from keyspace %s to keyspace %s for shards %s to shards %s", - dr.ts.sourceKeyspace, dr.ts.targetKeyspace, strings.Join(sourceShards, ","), strings.Join(targetShards, ","))) + dr.ts.SourceKeyspaceName(), dr.ts.TargetKeyspaceName(), strings.Join(sourceShards, ","), strings.Join(targetShards, ","))) } else { dr.drLog.Log(fmt.Sprintf("Switch reads from keyspace %s to keyspace %s for shards %s to shards %s", - dr.ts.targetKeyspace, dr.ts.sourceKeyspace, strings.Join(targetShards, ","), strings.Join(sourceShards, ","))) + dr.ts.TargetKeyspaceName(), dr.ts.SourceKeyspaceName(), strings.Join(targetShards, ","), strings.Join(sourceShards, ","))) } return nil } func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { - ks := dr.ts.targetKeyspace + ks := dr.ts.TargetKeyspaceName() if direction == workflow.DirectionBackward { - ks = dr.ts.sourceKeyspace + ks = dr.ts.SourceKeyspaceName() } var tabletTypes []string for _, servedType := range servedTypes { tabletTypes = append(tabletTypes, servedType.String()) } - tables := strings.Join(dr.ts.tables, ",") + tables := strings.Join(dr.ts.Tables(), ",") dr.drLog.Log(fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]", tables, ks, strings.Join(tabletTypes, ","))) dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables)) @@ -94,24 +94,24 @@ func (dr *switcherDryRun) createJournals(ctx context.Context, sourceWorkflows [] } func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error { - dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables [%s]", dr.ts.targetKeyspace, strings.Join(dr.ts.tables, ","))) + dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables [%s]", dr.ts.TargetKeyspaceName(), strings.Join(dr.ts.Tables(), ","))) return nil } func (dr *switcherDryRun) changeRouting(ctx context.Context) error { - dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.sourceKeyspace, dr.ts.targetKeyspace)) + dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.SourceKeyspaceName(), dr.ts.TargetKeyspaceName())) var deleteLogs, addLogs []string - if dr.ts.migrationType == binlogdatapb.MigrationType_TABLES { - tables := strings.Join(dr.ts.tables, ",") + if dr.ts.MigrationType() == binlogdatapb.MigrationType_TABLES { + tables := strings.Join(dr.ts.Tables(), ",") dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables)) return nil } deleteLogs = nil addLogs = nil - for _, source := range dr.ts.sources { + for _, source := range dr.ts.Sources() { deleteLogs = append(deleteLogs, fmt.Sprintf("\tShard %s, Tablet %d", source.GetShard().ShardName(), source.GetShard().PrimaryAlias.Uid)) } - for _, target := range dr.ts.targets { + for _, target := range dr.ts.Targets() { addLogs = append(addLogs, fmt.Sprintf("\tShard %s, Tablet %d", target.GetShard().ShardName(), target.GetShard().PrimaryAlias.Uid)) } if len(deleteLogs) > 0 { @@ -126,7 +126,7 @@ func (dr *switcherDryRun) changeRouting(ctx context.Context) error { func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error { dr.drLog.Log("SwitchWrites completed, freeze and delete vreplication streams on:") logs := make([]string, 0) - for _, t := range ts.targets { + for _, t := range ts.Targets() { logs = append(logs, fmt.Sprintf("\ttablet %d", t.GetPrimary().Alias.Uid)) } dr.drLog.LogSlice(logs) @@ -136,7 +136,7 @@ func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *traffi func (dr *switcherDryRun) startReverseVReplication(ctx context.Context) error { dr.drLog.Log("Start reverse replication streams on:") logs := make([]string, 0) - for _, t := range dr.ts.sources { + for _, t := range dr.ts.Sources() { logs = append(logs, fmt.Sprintf("\ttablet %d", t.GetPrimary().Alias.Uid)) } dr.drLog.LogSlice(logs) @@ -144,7 +144,7 @@ func (dr *switcherDryRun) startReverseVReplication(ctx context.Context) error { } func (dr *switcherDryRun) createReverseVReplication(ctx context.Context) error { - dr.drLog.Log(fmt.Sprintf("Create reverse replication workflow %s", dr.ts.reverseWorkflow)) + dr.drLog.Log(fmt.Sprintf("Create reverse replication workflow %s", dr.ts.ReverseWorkflowName())) return nil } @@ -156,7 +156,7 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *workflow.Strea } logs := make([]string, 0) - dr.drLog.Log(fmt.Sprintf("Migrate streams to %s:", dr.ts.targetKeyspace)) + dr.drLog.Log(fmt.Sprintf("Migrate streams to %s:", dr.ts.TargetKeyspaceName())) for key, streams := range sm.Streams() { for _, stream := range streams { logs = append(logs, fmt.Sprintf("\tShard %s Id %d, Workflow %s, Pos %s, BinLogSource %v", key, stream.ID, stream.Workflow, mysql.EncodePosition(stream.Position), stream.BinlogSource)) @@ -167,7 +167,7 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *workflow.Strea dr.drLog.LogSlice(logs) logs = nil } - for _, target := range dr.ts.targets { + for _, target := range dr.ts.Targets() { tabletStreams := templates for _, vrs := range tabletStreams { logs = append(logs, fmt.Sprintf("\t Keyspace %s, Shard %s, Tablet %d, Workflow %s, Id %d, Pos %v, BinLogSource %s", @@ -188,12 +188,12 @@ func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicatio func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error { logs := make([]string, 0) - for _, source := range dr.ts.sources { - position, _ := dr.ts.wr.tmc.MasterPosition(ctx, source.GetPrimary().Tablet) - logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s at Position %s", dr.ts.sourceKeyspace, source.GetShard().ShardName(), position)) + for _, source := range dr.ts.Sources() { + position, _ := dr.ts.TabletManagerClient().MasterPosition(ctx, source.GetPrimary().Tablet) + logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s at Position %s", dr.ts.SourceKeyspaceName(), source.GetShard().ShardName(), position)) } if len(logs) > 0 { - dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables [%s]:", dr.ts.sourceKeyspace, strings.Join(dr.ts.tables, ","))) + dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables [%s]:", dr.ts.SourceKeyspaceName(), strings.Join(dr.ts.Tables(), ","))) dr.drLog.LogSlice(logs) } return nil @@ -208,7 +208,7 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *workflow.StreamMi } } if len(logs) > 0 { - dr.drLog.Log(fmt.Sprintf("Stop streams on keyspace %s", dr.ts.sourceKeyspace)) + dr.drLog.Log(fmt.Sprintf("Stop streams on keyspace %s", dr.ts.SourceKeyspaceName())) dr.drLog.LogSlice(logs) } return nil, nil @@ -227,8 +227,8 @@ func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType workflow.TableRemovalType) error { logs := make([]string, 0) - for _, source := range dr.ts.sources { - for _, tableName := range dr.ts.tables { + for _, source := range dr.ts.Sources() { + for _, tableName := range dr.ts.Tables() { logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s", source.GetPrimary().Keyspace, source.GetPrimary().Shard, source.GetPrimary().DbName(), source.GetPrimary().Alias.Uid, tableName)) } @@ -239,7 +239,7 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType wo } if len(logs) > 0 { dr.drLog.Log(fmt.Sprintf("%s these tables from the database and removing them from the vschema for keyspace %s:", - action, dr.ts.sourceKeyspace)) + action, dr.ts.SourceKeyspaceName())) dr.drLog.LogSlice(logs) } return nil @@ -248,8 +248,8 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType wo func (dr *switcherDryRun) dropSourceShards(ctx context.Context) error { logs := make([]string, 0) tabletsList := make(map[string][]string) - for _, si := range dr.ts.sourceShards() { - tabletAliases, err := dr.ts.wr.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName()) + for _, si := range dr.ts.SourceShards() { + tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName()) if err != nil { return err } @@ -276,9 +276,9 @@ func (dr *switcherDryRun) validateWorkflowHasCompleted(ctx context.Context) erro func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) error { dr.drLog.Log("Delete vreplication streams on target:") logs := make([]string, 0) - for _, t := range dr.ts.targets { + for _, t := range dr.ts.Targets() { logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d", - t.GetShard().Keyspace(), t.GetShard().ShardName(), dr.ts.workflow, t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid)) + t.GetShard().Keyspace(), t.GetShard().ShardName(), dr.ts.WorkflowName(), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid)) } dr.drLog.LogSlice(logs) return nil @@ -287,9 +287,9 @@ func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) err func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Context) error { dr.drLog.Log("Delete reverse vreplication streams on source:") logs := make([]string, 0) - for _, t := range dr.ts.sources { + for _, t := range dr.ts.Sources() { logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d", - t.GetShard().Keyspace(), t.GetShard().ShardName(), workflow.ReverseWorkflowName(dr.ts.workflow), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid)) + t.GetShard().Keyspace(), t.GetShard().ShardName(), workflow.ReverseWorkflowName(dr.ts.WorkflowName()), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid)) } dr.drLog.LogSlice(logs) return nil @@ -297,9 +297,9 @@ func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Conte func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error { logs := make([]string, 0) - for _, target := range dr.ts.targets { + for _, target := range dr.ts.Targets() { logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s, Tablet %d, Workflow %s, DbName %s", - target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().Alias.Uid, dr.ts.workflow, target.GetPrimary().DbName())) + target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().Alias.Uid, dr.ts.WorkflowName(), target.GetPrimary().DbName())) } if len(logs) > 0 { dr.drLog.Log("Mark vreplication streams frozen on:") @@ -310,11 +310,11 @@ func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error { func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error { logs := make([]string, 0) - for _, si := range dr.ts.sourceShards() { + for _, si := range dr.ts.SourceShards() { logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Tablet %d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid)) } if len(logs) > 0 { - dr.drLog.Log(fmt.Sprintf("Denied tables [%s] will be removed from:", strings.Join(dr.ts.tables, ","))) + dr.drLog.Log(fmt.Sprintf("Denied tables [%s] will be removed from:", strings.Join(dr.ts.Tables(), ","))) dr.drLog.LogSlice(logs) } return nil @@ -326,15 +326,15 @@ func (dr *switcherDryRun) logs() *[]string { func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error { logs := make([]string, 0) - for _, target := range dr.ts.targets { - for _, tableName := range dr.ts.tables { + for _, target := range dr.ts.Targets() { + for _, tableName := range dr.ts.Tables() { logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s", target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().DbName(), target.GetPrimary().Alias.Uid, tableName)) } } if len(logs) > 0 { dr.drLog.Log(fmt.Sprintf("Dropping these tables from the database and removing from the vschema for keyspace %s:", - dr.ts.targetKeyspace)) + dr.ts.TargetKeyspaceName())) dr.drLog.LogSlice(logs) } return nil @@ -343,8 +343,8 @@ func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error { func (dr *switcherDryRun) dropTargetShards(ctx context.Context) error { logs := make([]string, 0) tabletsList := make(map[string][]string) - for _, si := range dr.ts.targetShards() { - tabletAliases, err := dr.ts.wr.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName()) + for _, si := range dr.ts.TargetShards() { + tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName()) if err != nil { return err } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index da339733d1a..52ec621f14d 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -110,6 +110,10 @@ func (ts *trafficSwitcher) MigrationType() binlogdatapb.MigrationType { ret func (ts *trafficSwitcher) ReverseWorkflowName() string { return ts.reverseWorkflow } func (ts *trafficSwitcher) SourceKeyspaceName() string { return ts.sourceKSSchema.Keyspace.Name } func (ts *trafficSwitcher) SourceKeyspaceSchema() *vindexes.KeyspaceSchema { return ts.sourceKSSchema } +func (ts *trafficSwitcher) Sources() map[string]*workflow.MigrationSource { return ts.sources } +func (ts *trafficSwitcher) Tables() []string { return ts.tables } +func (ts *trafficSwitcher) TargetKeyspaceName() string { return ts.targetKeyspace } +func (ts *trafficSwitcher) Targets() map[string]*workflow.MigrationTarget { return ts.targets } func (ts *trafficSwitcher) WorkflowName() string { return ts.workflow } func (ts *trafficSwitcher) ForAllSources(f func(source *workflow.MigrationSource) error) error { @@ -161,7 +165,7 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl } ws := &workflow.State{Workflow: workflowName, TargetKeyspace: targetKeyspace} - ws.SourceKeyspace = ts.sourceKeyspace + ws.SourceKeyspace = ts.SourceKeyspaceName() var cellsSwitched, cellsNotSwitched []string var keyspace string var reverse bool @@ -176,15 +180,15 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl } else { keyspace = targetKeyspace } - if ts.migrationType == binlogdatapb.MigrationType_TABLES { + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { ws.WorkflowType = workflow.TypeMoveTables // we assume a consistent state, so only choose routing rule for one table for replica/rdonly - if len(ts.tables) == 0 { + if len(ts.Tables()) == 0 { return nil, nil, fmt.Errorf("no tables in workflow %s.%s", keyspace, workflowName) } - table := ts.tables[0] + table := ts.Tables()[0] cellsSwitched, cellsNotSwitched, err = wr.getCellsWithTableReadsSwitched(ctx, keyspace, table, "rdonly") if err != nil { @@ -196,11 +200,11 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl return nil, nil, err } ws.ReplicaCellsNotSwitched, ws.ReplicaCellsSwitched = cellsNotSwitched, cellsSwitched - rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) + rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return nil, nil, err } - for _, table := range ts.tables { + for _, table := range ts.Tables() { rr := rules[table] // if a rule exists for the table and points to the target keyspace, writes have been switched if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", keyspace, table) { @@ -213,9 +217,9 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl // we assume a consistent state, so only choose one shard var shard *topo.ShardInfo if reverse { - shard = ts.targetShards()[0] + shard = ts.TargetShards()[0] } else { - shard = ts.sourceShards()[0] + shard = ts.SourceShards()[0] } cellsSwitched, cellsNotSwitched, err = wr.getCellsWithShardReadsSwitched(ctx, keyspace, shard, "rdonly") @@ -334,28 +338,28 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam } if err := ts.validate(ctx); err != nil { - ts.wr.Logger().Errorf("validate failed: %v", err) + ts.Logger().Errorf("validate failed: %v", err) return nil, err } // For reads, locking the source keyspace is sufficient. - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.sourceKeyspace, "SwitchReads") + ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") if lockErr != nil { - ts.wr.Logger().Errorf("LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) return nil, lockErr } defer unlock(&err) - if ts.migrationType == binlogdatapb.MigrationType_TABLES { + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { if err := sw.switchTableReads(ctx, cells, servedTypes, direction); err != nil { - ts.wr.Logger().Errorf("switchTableReads failed: %v", err) + ts.Logger().Errorf("switchTableReads failed: %v", err) return nil, err } return sw.logs(), nil } wr.Logger().Infof("About to switchShardReads: %+v, %+v, %+v", cells, servedTypes, direction) if err := ts.switchShardReads(ctx, cells, servedTypes, direction); err != nil { - ts.wr.Logger().Errorf("switchShardReads failed: %v", err) + ts.Logger().Errorf("switchShardReads failed: %v", err) return nil, err } @@ -435,35 +439,35 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa } if ts.frozen { - ts.wr.Logger().Warningf("Writes have already been switched for workflow %s, nothing to do here", ts.workflow) + ts.Logger().Warningf("Writes have already been switched for workflow %s, nothing to do here", ts.WorkflowName()) return 0, sw.logs(), nil } - ts.wr.Logger().Infof("Built switching metadata: %+v", ts) + ts.Logger().Infof("Built switching metadata: %+v", ts) if err := ts.validate(ctx); err != nil { - ts.wr.Logger().Errorf("validate failed: %v", err) + ts.Logger().Errorf("validate failed: %v", err) return 0, nil, err } if reverseReplication { - err := wr.areTabletsAvailableToStreamFrom(ctx, ts, ts.targetKeyspace, ts.targetShards()) + err := wr.areTabletsAvailableToStreamFrom(ctx, ts, ts.TargetKeyspaceName(), ts.TargetShards()) if err != nil { return 0, nil, err } } // Need to lock both source and target keyspaces. - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.sourceKeyspace, "SwitchWrites") + tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") if lockErr != nil { - ts.wr.Logger().Errorf("LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) return 0, nil, lockErr } ctx = tctx defer sourceUnlock(&err) - if ts.targetKeyspace != ts.sourceKeyspace { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.targetKeyspace, "SwitchWrites") + if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") if lockErr != nil { - ts.wr.Logger().Errorf("LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) return 0, nil, lockErr } ctx = tctx @@ -473,69 +477,69 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa // If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams. journalsExist, sourceWorkflows, err := ts.checkJournals(ctx) if err != nil { - ts.wr.Logger().Errorf("checkJournals failed: %v", err) + ts.Logger().Errorf("checkJournals failed: %v", err) return 0, nil, err } if !journalsExist { - ts.wr.Logger().Infof("No previous journals were found. Proceeding normally.") + ts.Logger().Infof("No previous journals were found. Proceeding normally.") sm, err := workflow.BuildStreamMigrator(ctx, ts, cancel) if err != nil { - ts.wr.Logger().Errorf("buildStreamMigrater failed: %v", err) + ts.Logger().Errorf("buildStreamMigrater failed: %v", err) return 0, nil, err } if cancel { sw.cancelMigration(ctx, sm) return 0, sw.logs(), nil } - ts.wr.Logger().Infof("Stopping streams") + ts.Logger().Infof("Stopping streams") sourceWorkflows, err = sw.stopStreams(ctx, sm) if err != nil { - ts.wr.Logger().Errorf("stopStreams failed: %v", err) + ts.Logger().Errorf("stopStreams failed: %v", err) for key, streams := range sm.Streams() { for _, stream := range streams { - ts.wr.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) + ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } } sw.cancelMigration(ctx, sm) return 0, nil, err } - ts.wr.Logger().Infof("Stopping source writes") + ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { - ts.wr.Logger().Errorf("stopSourceWrites failed: %v", err) + ts.Logger().Errorf("stopSourceWrites failed: %v", err) sw.cancelMigration(ctx, sm) return 0, nil, err } - ts.wr.Logger().Infof("Waiting for streams to catchup") + ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { - ts.wr.Logger().Errorf("waitForCatchup failed: %v", err) + ts.Logger().Errorf("waitForCatchup failed: %v", err) sw.cancelMigration(ctx, sm) return 0, nil, err } - ts.wr.Logger().Infof("Migrating streams") + ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { - ts.wr.Logger().Errorf("migrateStreams failed: %v", err) + ts.Logger().Errorf("migrateStreams failed: %v", err) sw.cancelMigration(ctx, sm) return 0, nil, err } - ts.wr.Logger().Infof("Creating reverse streams") + ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { - ts.wr.Logger().Errorf("createReverseVReplication failed: %v", err) + ts.Logger().Errorf("createReverseVReplication failed: %v", err) sw.cancelMigration(ctx, sm) return 0, nil, err } } else { if cancel { err := fmt.Errorf("traffic switching has reached the point of no return, cannot cancel") - ts.wr.Logger().Errorf("%v", err) + ts.Logger().Errorf("%v", err) return 0, nil, err } - ts.wr.Logger().Infof("Journals were found. Completing the left over steps.") + ts.Logger().Infof("Journals were found. Completing the left over steps.") // Need to gather positions in case all journals were not created. if err := ts.gatherPositions(ctx); err != nil { - ts.wr.Logger().Errorf("gatherPositions failed: %v", err) + ts.Logger().Errorf("gatherPositions failed: %v", err) return 0, nil, err } } @@ -543,30 +547,30 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. if err := sw.createJournals(ctx, sourceWorkflows); err != nil { - ts.wr.Logger().Errorf("createJournals failed: %v", err) + ts.Logger().Errorf("createJournals failed: %v", err) return 0, nil, err } if err := sw.allowTargetWrites(ctx); err != nil { - ts.wr.Logger().Errorf("allowTargetWrites failed: %v", err) + ts.Logger().Errorf("allowTargetWrites failed: %v", err) return 0, nil, err } if err := sw.changeRouting(ctx); err != nil { - ts.wr.Logger().Errorf("changeRouting failed: %v", err) + ts.Logger().Errorf("changeRouting failed: %v", err) return 0, nil, err } if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil { - ts.wr.Logger().Errorf("finalize failed: %v", err) + ts.Logger().Errorf("finalize failed: %v", err) return 0, nil, err } if reverseReplication { if err := sw.startReverseVReplication(ctx); err != nil { - ts.wr.Logger().Errorf("startReverseVReplication failed: %v", err) + ts.Logger().Errorf("startReverseVReplication failed: %v", err) return 0, nil, err } } if err := sw.freezeTargetVReplication(ctx); err != nil { - ts.wr.Logger().Errorf("deleteTargetVReplication failed: %v", err) + ts.Logger().Errorf("deleteTargetVReplication failed: %v", err) return 0, nil, err } @@ -587,24 +591,24 @@ func (wr *Wrangler) DropTargets(ctx context.Context, targetKeyspace, workflow st sw = &switcher{ts: ts, wr: wr} } var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.sourceKeyspace, "DropTargets") + tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") if lockErr != nil { - ts.wr.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) ctx = tctx - if ts.targetKeyspace != ts.sourceKeyspace { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.targetKeyspace, "DropTargets") + if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") if lockErr != nil { - ts.wr.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx } if !keepData { - switch ts.migrationType { + switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: log.Infof("Deleting target tables") if err := sw.removeTargetTables(ctx); err != nil { @@ -623,7 +627,7 @@ func (wr *Wrangler) DropTargets(ctx context.Context, targetKeyspace, workflow st if err := wr.dropArtifacts(ctx, sw); err != nil { return nil, err } - if err := ts.wr.ts.RebuildSrvVSchema(ctx, nil); err != nil { + if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } return sw.logs(), nil @@ -659,9 +663,9 @@ func (wr *Wrangler) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, sw = &switcher{ts: ts, wr: wr} } var tctx context.Context - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.targetKeyspace, "completeMigrateWorkflow") + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") if lockErr != nil { - ts.wr.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) @@ -671,7 +675,7 @@ func (wr *Wrangler) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, } if !cancel { sw.addParticipatingTablesToKeyspace(ctx, targetKeyspace, tableSpecs) - if err := ts.wr.ts.RebuildSrvVSchema(ctx, nil); err != nil { + if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } } @@ -698,17 +702,17 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam sw = &switcher{ts: ts, wr: wr} } var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.sourceKeyspace, "DropSources") + tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") if lockErr != nil { - ts.wr.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) ctx = tctx - if ts.targetKeyspace != ts.sourceKeyspace { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.targetKeyspace, "DropSources") + if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") if lockErr != nil { - ts.wr.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) + ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) @@ -721,7 +725,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam } } if !keepData { - switch ts.migrationType { + switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: log.Infof("Deleting tables") if err := sw.removeSourceTables(ctx, removalType); err != nil { @@ -741,7 +745,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam if err := wr.dropArtifacts(ctx, sw); err != nil { return nil, err } - if err := ts.wr.ts.RebuildSrvVSchema(ctx, nil); err != nil { + if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -845,16 +849,16 @@ func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, wo } func (ts *trafficSwitcher) validate(ctx context.Context) error { - if ts.migrationType == binlogdatapb.MigrationType_TABLES { + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { sourceTopo := ts.wr.ts if ts.externalTopo != nil { sourceTopo = ts.externalTopo } // All shards must be present. - if err := ts.compareShards(ctx, ts.sourceKeyspace, ts.sourceShards(), sourceTopo); err != nil { + if err := ts.compareShards(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), sourceTopo); err != nil { return err } - if err := ts.compareShards(ctx, ts.targetKeyspace, ts.targetShards(), ts.wr.ts); err != nil { + if err := ts.compareShards(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), ts.wr.ts); err != nil { return err } // Wildcard table names not allowed. @@ -886,7 +890,7 @@ func (ts *trafficSwitcher) compareShards(ctx context.Context, keyspace string, s func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) - rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) + rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err } @@ -897,56 +901,56 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, // For backward, we redirect to source for _, servedType := range servedTypes { tt := strings.ToLower(servedType.String()) - for _, table := range ts.tables { + for _, table := range ts.Tables() { if direction == workflow.DirectionForward { log.Infof("Route direction forward") - toTarget := []string{ts.targetKeyspace + "." + table} + toTarget := []string{ts.TargetKeyspaceName() + "." + table} rules[table+"@"+tt] = toTarget - rules[ts.targetKeyspace+"."+table+"@"+tt] = toTarget - rules[ts.sourceKeyspace+"."+table+"@"+tt] = toTarget + rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toTarget + rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toTarget } else { log.Infof("Route direction backwards") - toSource := []string{ts.sourceKeyspace + "." + table} + toSource := []string{ts.SourceKeyspaceName() + "." + table} rules[table+"@"+tt] = toSource - rules[ts.targetKeyspace+"."+table+"@"+tt] = toSource - rules[ts.sourceKeyspace+"."+table+"@"+tt] = toSource + rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toSource + rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toSource } } } - if err := topotools.SaveRoutingRules(ctx, ts.wr.ts, rules); err != nil { + if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil { return err } - return ts.wr.ts.RebuildSrvVSchema(ctx, cells) + return ts.TopoServer().RebuildSrvVSchema(ctx, cells) } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { var fromShards, toShards []*topo.ShardInfo if direction == workflow.DirectionForward { - fromShards, toShards = ts.sourceShards(), ts.targetShards() + fromShards, toShards = ts.SourceShards(), ts.TargetShards() } else { - fromShards, toShards = ts.targetShards(), ts.sourceShards() + fromShards, toShards = ts.TargetShards(), ts.SourceShards() } - if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.targetKeyspace, strings.Join(cells, ",")) + ts.TargetKeyspaceName(), strings.Join(cells, ",")) log.Errorf("%w", err2) return err2 } for _, servedType := range servedTypes { - if err := ts.wr.updateShardRecords(ctx, ts.sourceKeyspace, fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */); err != nil { + if err := ts.wr.updateShardRecords(ctx, ts.SourceKeyspaceName(), fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */); err != nil { return err } - if err := ts.wr.updateShardRecords(ctx, ts.sourceKeyspace, toShards, cells, servedType, false, false); err != nil { + if err := ts.wr.updateShardRecords(ctx, ts.SourceKeyspaceName(), toShards, cells, servedType, false, false); err != nil { return err } - err := ts.wr.ts.MigrateServedType(ctx, ts.sourceKeyspace, toShards, fromShards, servedType, cells) + err := ts.TopoServer().MigrateServedType(ctx, ts.SourceKeyspaceName(), toShards, fromShards, servedType, cells) if err != nil { return err } } - if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { err2 := vterrors.Wrapf(err, "After switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.targetKeyspace, strings.Join(cells, ",")) + ts.TargetKeyspaceName(), strings.Join(cells, ",")) log.Errorf("%w", err2) return err2 } @@ -957,11 +961,11 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, // If so, it also returns the list of sourceWorkflows that need to be switched. func (ts *trafficSwitcher) checkJournals(ctx context.Context) (journalsExist bool, sourceWorkflows []string, err error) { var ( - ws = workflow.NewServer(ts.wr.ts, ts.wr.tmc) + ws = workflow.NewServer(ts.TopoServer(), ts.TabletManagerClient()) mu sync.Mutex ) - err = ts.forAllSources(func(source *workflow.MigrationSource) error { + err = ts.ForAllSources(func(source *workflow.MigrationSource) error { mu.Lock() defer mu.Unlock() journal, exists, err := ws.CheckReshardingJournalExistsOnTablet(ctx, source.GetPrimary().Tablet, ts.id) @@ -982,20 +986,20 @@ func (ts *trafficSwitcher) checkJournals(ctx context.Context) (journalsExist boo func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { var err error - if ts.migrationType == binlogdatapb.MigrationType_TABLES { + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { err = ts.changeTableSourceWrites(ctx, disallowWrites) } else { - err = ts.changeShardsAccess(ctx, ts.sourceKeyspace, ts.sourceShards(), disallowWrites) + err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } if err != nil { log.Warningf("Error: %s", err) return err } - return ts.forAllSources(func(source *workflow.MigrationSource) error { + return ts.ForAllSources(func(source *workflow.MigrationSource) error { var err error - source.Position, err = ts.wr.tmc.MasterPosition(ctx, source.GetPrimary().Tablet) + source.Position, err = ts.TabletManagerClient().MasterPosition(ctx, source.GetPrimary().Tablet) ts.wr.Logger().Infof("Stopped Source Writes. Position for source %v:%v: %v", - ts.sourceKeyspace, source.GetShard().ShardName(), source.Position) + ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position) if err != nil { log.Warningf("Error: %s", err) } @@ -1004,9 +1008,9 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { - return ts.forAllSources(func(source *workflow.MigrationSource) error { - if _, err := ts.wr.ts.UpdateShardFields(ctx, ts.sourceKeyspace, source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.tables) + return ts.ForAllSources(func(source *workflow.MigrationSource) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { return err } @@ -1019,20 +1023,20 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati defer cancel() // source writes have been stopped, wait for all streams on targets to catch up if err := ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error { - ts.wr.Logger().Infof("Before Catchup: uid: %d, target primary %s, target position %s, shard %s", uid, + ts.Logger().Infof("Before Catchup: uid: %d, target primary %s, target position %s, shard %s", uid, target.GetPrimary().AliasString(), target.Position, target.GetShard().String()) bls := target.Sources[uid] - source := ts.sources[bls.Shard] - ts.wr.Logger().Infof("Before Catchup: waiting for keyspace:shard: %v:%v to reach source position %v, uid %d", - ts.targetKeyspace, target.GetShard().ShardName(), source.Position, uid) - if err := ts.wr.tmc.VReplicationWaitForPos(ctx, target.GetPrimary().Tablet, int(uid), source.Position); err != nil { + source := ts.Sources()[bls.Shard] + ts.Logger().Infof("Before Catchup: waiting for keyspace:shard: %v:%v to reach source position %v, uid %d", + ts.TargetKeyspaceName(), target.GetShard().ShardName(), source.Position, uid) + if err := ts.TabletManagerClient().VReplicationWaitForPos(ctx, target.GetPrimary().Tablet, int(uid), source.Position); err != nil { return err } log.Infof("After catchup: target keyspace:shard: %v:%v, source position %v, uid %d", - ts.targetKeyspace, target.GetShard().ShardName(), source.Position, uid) - ts.wr.Logger().Infof("After catchup: position for keyspace:shard: %v:%v reached, uid %d", - ts.targetKeyspace, target.GetShard().ShardName(), uid) - if _, err := ts.wr.tmc.VReplicationExec(ctx, target.GetPrimary().Tablet, binlogplayer.StopVReplication(uid, "stopped for cutover")); err != nil { + ts.TargetKeyspaceName(), target.GetShard().ShardName(), source.Position, uid) + ts.Logger().Infof("After catchup: position for keyspace:shard: %v:%v reached, uid %d", + ts.TargetKeyspaceName(), target.GetShard().ShardName(), uid) + if _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, binlogplayer.StopVReplication(uid, "stopped for cutover")); err != nil { log.Infof("error marking stopped for cutover on %s, uid %d", target.GetPrimary().AliasString(), uid) return err } @@ -1041,56 +1045,56 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati return err } // all targets have caught up, record their positions for setting up reverse workflows - return ts.forAllTargets(func(target *workflow.MigrationTarget) error { + return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { var err error - target.Position, err = ts.wr.tmc.MasterPosition(ctx, target.GetPrimary().Tablet) - ts.wr.Logger().Infof("After catchup, position for target primary %s, %v", target.GetPrimary().AliasString(), target.Position) + target.Position, err = ts.TabletManagerClient().MasterPosition(ctx, target.GetPrimary().Tablet) + ts.Logger().Infof("After catchup, position for target primary %s, %v", target.GetPrimary().AliasString(), target.Position) return err }) } func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) { var err error - if ts.migrationType == binlogdatapb.MigrationType_TABLES { + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { err = ts.changeTableSourceWrites(ctx, allowWrites) } else { - err = ts.changeShardsAccess(ctx, ts.sourceKeyspace, ts.sourceShards(), allowWrites) + err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { - ts.wr.Logger().Errorf("Cancel migration failed:", err) + ts.Logger().Errorf("Cancel migration failed:", err) } sm.CancelMigration(ctx) - err = ts.forAllTargets(func(target *workflow.MigrationTarget) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.workflow)) - _, err := ts.wr.tmc.VReplicationExec(ctx, target.GetPrimary().Tablet, query) + err = ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) + _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) return err }) if err != nil { - ts.wr.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) + ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } err = ts.deleteReverseVReplication(ctx) if err != nil { - ts.wr.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err) + ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err) } } func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error { - err := ts.forAllSources(func(source *workflow.MigrationSource) error { + err := ts.ForAllSources(func(source *workflow.MigrationSource) error { var err error - source.Position, err = ts.wr.tmc.MasterPosition(ctx, source.GetPrimary().Tablet) - ts.wr.Logger().Infof("Position for source %v:%v: %v", ts.sourceKeyspace, source.GetShard().ShardName(), source.Position) + source.Position, err = ts.TabletManagerClient().MasterPosition(ctx, source.GetPrimary().Tablet) + ts.Logger().Infof("Position for source %v:%v: %v", ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position) return err }) if err != nil { return err } - return ts.forAllTargets(func(target *workflow.MigrationTarget) error { + return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { var err error - target.Position, err = ts.wr.tmc.MasterPosition(ctx, target.GetPrimary().Tablet) - ts.wr.Logger().Infof("Position for target %v:%v: %v", ts.targetKeyspace, target.GetShard().ShardName(), target.Position) + target.Position, err = ts.TabletManagerClient().MasterPosition(ctx, target.GetPrimary().Tablet) + ts.Logger().Infof("Position for target %v:%v: %v", ts.TargetKeyspaceName(), target.GetShard().ShardName(), target.Position) return err }) } @@ -1101,9 +1105,9 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error } err := ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error { bls := target.Sources[uid] - source := ts.sources[bls.Shard] + source := ts.Sources()[bls.Shard] reverseBls := &binlogdatapb.BinlogSource{ - Keyspace: ts.targetKeyspace, + Keyspace: ts.TargetKeyspaceName(), Shard: target.GetShard().ShardName(), TabletType: bls.TabletType, Filter: &binlogdatapb.Filter{}, @@ -1116,19 +1120,19 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error } var filter string if strings.HasPrefix(rule.Match, "/") { - if ts.sourceKSSchema.Keyspace.Sharded { + if ts.SourceKeyspaceSchema().Keyspace.Sharded { filter = key.KeyRangeString(source.GetShard().KeyRange) } } else { var inKeyrange string - if ts.sourceKSSchema.Keyspace.Sharded { - vtable, ok := ts.sourceKSSchema.Tables[rule.Match] + if ts.SourceKeyspaceSchema().Keyspace.Sharded { + vtable, ok := ts.SourceKeyspaceSchema().Tables[rule.Match] if !ok { return fmt.Errorf("table %s not found in vschema1", rule.Match) } // TODO(sougou): handle degenerate cases like sequence, etc. // We currently assume the primary vindex is the best way to filter, which may not be true. - inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), ts.sourceKeyspace, vtable.ColumnVindexes[0].Name, key.KeyRangeString(source.GetShard().KeyRange)) + inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, key.KeyRangeString(source.GetShard().KeyRange)) } filter = fmt.Sprintf("select * from %s%s", rule.Match, inKeyrange) } @@ -1138,8 +1142,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error }) } log.Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s", - source.GetPrimary().Alias, ts.reverseWorkflow, target.Position) - _, err := ts.wr.VReplicationExec(ctx, source.GetPrimary().Alias, binlogplayer.CreateVReplicationState(ts.reverseWorkflow, reverseBls, target.Position, binlogplayer.BlpStopped, source.GetPrimary().DbName())) + source.GetPrimary().Alias, ts.ReverseWorkflowName(), target.Position) + _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, binlogplayer.CreateVReplicationState(ts.ReverseWorkflowName(), reverseBls, target.Position, binlogplayer.BlpStopped, source.GetPrimary().DbName())) if err != nil { return err } @@ -1148,7 +1152,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error updateQuery := ts.getReverseVReplicationUpdateQuery(target.GetPrimary().Alias.Cell, source.GetPrimary().Alias.Cell, source.GetPrimary().DbName()) if updateQuery != "" { log.Infof("Updating vreplication stream entry on %s with: %s", source.GetPrimary().Alias, updateQuery) - _, err = ts.wr.VReplicationExec(ctx, source.GetPrimary().Alias, updateQuery) + _, err = ts.VReplicationExec(ctx, source.GetPrimary().Alias, updateQuery) return err } return nil @@ -1166,23 +1170,23 @@ func (ts *trafficSwitcher) getReverseVReplicationUpdateQuery(targetCell string, if ts.optCells != "" || ts.optTabletTypes != "" { query := fmt.Sprintf("update _vt.vreplication set cell = '%s', tablet_types = '%s' where workflow = '%s' and db_name = '%s'", - ts.optCells, ts.optTabletTypes, ts.reverseWorkflow, dbname) + ts.optCells, ts.optTabletTypes, ts.ReverseWorkflowName(), dbname) return query } return "" } func (ts *trafficSwitcher) deleteReverseVReplication(ctx context.Context) error { - return ts.forAllSources(func(source *workflow.MigrationSource) error { + return ts.ForAllSources(func(source *workflow.MigrationSource) error { query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(source.GetPrimary().DbName()), encodeString(ts.reverseWorkflow)) - _, err := ts.wr.tmc.VReplicationExec(ctx, source.GetPrimary().Tablet, query) + _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query) return err }) } func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows []string) error { log.Infof("In createJournals for source workflows %+v", sourceWorkflows) - return ts.forAllSources(func(source *workflow.MigrationSource) error { + return ts.ForAllSources(func(source *workflow.MigrationSource) error { if source.Journaled { return nil } @@ -1190,18 +1194,18 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [ participantMap := make(map[string]bool) journal := &binlogdatapb.Journal{ Id: ts.id, - MigrationType: ts.migrationType, - Tables: ts.tables, + MigrationType: ts.MigrationType(), + Tables: ts.Tables(), LocalPosition: source.Position, Participants: participants, SourceWorkflows: sourceWorkflows, } - for targetShard, target := range ts.targets { + for targetShard, target := range ts.Targets() { for _, tsource := range target.Sources { participantMap[tsource.Shard] = true } journal.ShardGtids = append(journal.ShardGtids, &binlogdatapb.ShardGtid{ - Keyspace: ts.targetKeyspace, + Keyspace: ts.TargetKeyspaceName(), Shard: targetShard, Gtid: target.Position, }) @@ -1219,12 +1223,12 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [ } log.Infof("Creating journal %v", journal) - ts.wr.Logger().Infof("Creating journal: %v", journal) + ts.Logger().Infof("Creating journal: %v", journal) statement := fmt.Sprintf("insert into _vt.resharding_journal "+ "(id, db_name, val) "+ "values (%v, %v, %v)", ts.id, encodeString(source.GetPrimary().DbName()), encodeString(journal.String())) - if _, err := ts.wr.tmc.VReplicationExec(ctx, source.GetPrimary().Tablet, statement); err != nil { + if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, statement); err != nil { return err } return nil @@ -1232,16 +1236,16 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [ } func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { - if ts.migrationType == binlogdatapb.MigrationType_TABLES { + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { return ts.allowTableTargetWrites(ctx) } - return ts.changeShardsAccess(ctx, ts.targetKeyspace, ts.targetShards(), allowWrites) + return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites) } func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error { - return ts.forAllTargets(func(target *workflow.MigrationTarget) error { - if _, err := ts.wr.ts.UpdateShardFields(ctx, ts.targetKeyspace, target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.tables) + return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) }); err != nil { return err } @@ -1250,38 +1254,38 @@ func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error { } func (ts *trafficSwitcher) changeRouting(ctx context.Context) error { - if ts.migrationType == binlogdatapb.MigrationType_TABLES { + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { return ts.changeWriteRoute(ctx) } return ts.changeShardRouting(ctx) } func (ts *trafficSwitcher) changeWriteRoute(ctx context.Context) error { - rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) + rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err } - for _, table := range ts.tables { - delete(rules, ts.targetKeyspace+"."+table) - ts.wr.Logger().Infof("Delete routing: %v", ts.targetKeyspace+"."+table) - rules[table] = []string{ts.targetKeyspace + "." + table} - rules[ts.sourceKeyspace+"."+table] = []string{ts.targetKeyspace + "." + table} - ts.wr.Logger().Infof("Add routing: %v %v", table, ts.sourceKeyspace+"."+table) + for _, table := range ts.Tables() { + delete(rules, ts.TargetKeyspaceName()+"."+table) + ts.Logger().Infof("Delete routing: %v", ts.TargetKeyspaceName()+"."+table) + rules[table] = []string{ts.TargetKeyspaceName() + "." + table} + rules[ts.SourceKeyspaceName()+"."+table] = []string{ts.TargetKeyspaceName() + "." + table} + ts.Logger().Infof("Add routing: %v %v", table, ts.SourceKeyspaceName()+"."+table) } - if err := topotools.SaveRoutingRules(ctx, ts.wr.ts, rules); err != nil { + if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil { return err } - return ts.wr.ts.RebuildSrvVSchema(ctx, nil) + return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { - if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, ""); err != nil { - err2 := vterrors.Wrapf(err, "Before changing shard routes, found SrvKeyspace for %s is corrupt", ts.targetKeyspace) + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), ""); err != nil { + err2 := vterrors.Wrapf(err, "Before changing shard routes, found SrvKeyspace for %s is corrupt", ts.TargetKeyspaceName()) log.Errorf("%w", err2) return err2 } - err := ts.forAllSources(func(source *workflow.MigrationSource) error { - _, err := ts.wr.ts.UpdateShardFields(ctx, ts.sourceKeyspace, source.GetShard().ShardName(), func(si *topo.ShardInfo) error { + err := ts.ForAllSources(func(source *workflow.MigrationSource) error { + _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { si.IsPrimaryServing = false return nil }) @@ -1290,8 +1294,8 @@ func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { if err != nil { return err } - err = ts.forAllTargets(func(target *workflow.MigrationTarget) error { - _, err := ts.wr.ts.UpdateShardFields(ctx, ts.targetKeyspace, target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + err = ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { si.IsPrimaryServing = true return nil }) @@ -1300,12 +1304,12 @@ func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { if err != nil { return err } - err = ts.wr.ts.MigrateServedType(ctx, ts.targetKeyspace, ts.targetShards(), ts.sourceShards(), topodatapb.TabletType_PRIMARY, nil) + err = ts.TopoServer().MigrateServedType(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), ts.SourceShards(), topodatapb.TabletType_PRIMARY, nil) if err != nil { return err } - if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, ""); err != nil { - err2 := vterrors.Wrapf(err, "After changing shard routes, found SrvKeyspace for %s is corrupt", ts.targetKeyspace) + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), ""); err != nil { + err2 := vterrors.Wrapf(err, "After changing shard routes, found SrvKeyspace for %s is corrupt", ts.TargetKeyspaceName()) log.Errorf("%w", err2) return err2 } @@ -1313,15 +1317,15 @@ func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { } func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error { - return ts.forAllSources(func(source *workflow.MigrationSource) error { + return ts.ForAllSources(func(source *workflow.MigrationSource) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s", encodeString(source.GetPrimary().DbName())) - _, err := ts.wr.VReplicationExec(ctx, source.GetPrimary().Alias, query) + _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query) return err }) } func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace string, shards []*topo.ShardInfo, access accessType) error { - if err := ts.wr.ts.UpdateDisableQueryService(ctx, keyspace, shards, topodatapb.TabletType_PRIMARY, nil, access == disallowWrites /* disable */); err != nil { + if err := ts.TopoServer().UpdateDisableQueryService(ctx, keyspace, shards, topodatapb.TabletType_PRIMARY, nil, access == disallowWrites /* disable */); err != nil { return err } return ts.wr.refreshPrimaryTablets(ctx, shards) @@ -1380,26 +1384,26 @@ func (ts *trafficSwitcher) forAllUids(f func(target *workflow.MigrationTarget, u return allErrors.AggrError(vterrors.Aggregate) } -func (ts *trafficSwitcher) sourceShards() []*topo.ShardInfo { - shards := make([]*topo.ShardInfo, 0, len(ts.sources)) - for _, source := range ts.sources { +func (ts *trafficSwitcher) SourceShards() []*topo.ShardInfo { + shards := make([]*topo.ShardInfo, 0, len(ts.Sources())) + for _, source := range ts.Sources() { shards = append(shards, source.GetShard()) } return shards } -func (ts *trafficSwitcher) targetShards() []*topo.ShardInfo { - shards := make([]*topo.ShardInfo, 0, len(ts.targets)) - for _, target := range ts.targets { +func (ts *trafficSwitcher) TargetShards() []*topo.ShardInfo { + shards := make([]*topo.ShardInfo, 0, len(ts.Targets())) + for _, target := range ts.Targets() { shards = append(shards, target.GetShard()) } return shards } func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { - return ts.forAllSources(func(source *workflow.MigrationSource) error { - if _, err := ts.wr.ts.UpdateShardFields(ctx, ts.sourceKeyspace, source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.tables) + return ts.ForAllSources(func(source *workflow.MigrationSource) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) }); err != nil { return err } @@ -1414,8 +1418,8 @@ func (ts *trafficSwitcher) validateWorkflowHasCompleted(ctx context.Context) err func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) error { wg := sync.WaitGroup{} rec := concurrency.AllErrorRecorder{} - if ts.migrationType == binlogdatapb.MigrationType_SHARDS { - _ = ts.forAllSources(func(source *workflow.MigrationSource) error { + if ts.MigrationType() == binlogdatapb.MigrationType_SHARDS { + _ = ts.ForAllSources(func(source *workflow.MigrationSource) error { wg.Add(1) if source.GetShard().IsPrimaryServing { rec.RecordError(fmt.Errorf(fmt.Sprintf("Shard %s is still serving", source.GetShard().ShardName()))) @@ -1424,10 +1428,10 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er return nil }) } else { - _ = ts.forAllTargets(func(target *workflow.MigrationTarget) error { + _ = ts.ForAllTargets(func(target *workflow.MigrationTarget) error { wg.Add(1) - query := fmt.Sprintf("select 1 from _vt.vreplication where db_name='%s' and workflow='%s' and message!='FROZEN'", target.GetPrimary().DbName(), ts.workflow) - rs, _ := ts.wr.VReplicationExec(ctx, target.GetPrimary().Alias, query) + query := fmt.Sprintf("select 1 from _vt.vreplication where db_name='%s' and workflow='%s' and message!='FROZEN'", target.GetPrimary().DbName(), ts.WorkflowName()) + rs, _ := ts.VReplicationExec(ctx, target.GetPrimary().Alias, query) if len(rs.Rows) > 0 { rec.RecordError(fmt.Errorf("vreplication streams are not frozen on tablet %d", target.GetPrimary().Alias.Uid)) } @@ -1438,16 +1442,16 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er //check if table is routable wg.Wait() - if ts.migrationType == binlogdatapb.MigrationType_TABLES { - rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { + rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { rec.RecordError(fmt.Errorf("could not get RoutingRules")) } for fromTable, toTables := range rules { for _, toTable := range toTables { - for _, table := range ts.tables { - if toTable == fmt.Sprintf("%s.%s", ts.sourceKeyspace, table) { - rec.RecordError(fmt.Errorf("routing still exists from keyspace %s table %s to %s", ts.sourceKeyspace, table, fromTable)) + for _, table := range ts.Tables() { + if toTable == fmt.Sprintf("%s.%s", ts.SourceKeyspaceName(), table) { + rec.RecordError(fmt.Errorf("routing still exists from keyspace %s table %s to %s", ts.SourceKeyspaceName(), table, fromTable)) } } } @@ -1465,22 +1469,22 @@ func getRenameFileName(tableName string) string { } func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType workflow.TableRemovalType) error { - err := ts.forAllSources(func(source *workflow.MigrationSource) error { - for _, tableName := range ts.tables { + err := ts.ForAllSources(func(source *workflow.MigrationSource) error { + for _, tableName := range ts.Tables() { query := fmt.Sprintf("drop table %s.%s", source.GetPrimary().DbName(), tableName) if removalType == workflow.DropTable { - ts.wr.Logger().Infof("Dropping table %s.%s\n", source.GetPrimary().DbName(), tableName) + ts.Logger().Infof("Dropping table %s.%s\n", source.GetPrimary().DbName(), tableName) } else { renameName := getRenameFileName(tableName) - ts.wr.Logger().Infof("Renaming table %s.%s to %s.%s\n", source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName) + ts.Logger().Infof("Renaming table %s.%s to %s.%s\n", source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName) query = fmt.Sprintf("rename table %s.%s TO %s.%s", source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName) } _, err := ts.wr.ExecuteFetchAsDba(ctx, source.GetPrimary().Alias, query, 1, false, true) if err != nil { - ts.wr.Logger().Errorf("Error removing table %s: %v", tableName, err) + ts.Logger().Errorf("Error removing table %s: %v", tableName, err) return err } - ts.wr.Logger().Infof("Removed table %s.%s\n", source.GetPrimary().DbName(), tableName) + ts.Logger().Infof("Removed table %s.%s\n", source.GetPrimary().DbName(), tableName) } return nil @@ -1489,30 +1493,30 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType w return err } - return ts.dropParticipatingTablesFromKeyspace(ctx, ts.sourceKeyspace) + return ts.dropParticipatingTablesFromKeyspace(ctx, ts.SourceKeyspaceName()) } func (ts *trafficSwitcher) dropParticipatingTablesFromKeyspace(ctx context.Context, keyspace string) error { - vschema, err := ts.wr.ts.GetVSchema(ctx, keyspace) + vschema, err := ts.TopoServer().GetVSchema(ctx, keyspace) if err != nil { return err } - for _, tableName := range ts.tables { + for _, tableName := range ts.Tables() { delete(vschema.Tables, tableName) } - return ts.wr.ts.SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) } // FIXME: even after dropSourceShards there are still entries in the topo, need to research and fix func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { - return ts.forAllSources(func(source *workflow.MigrationSource) error { - ts.wr.Logger().Infof("Deleting shard %s.%s\n", source.GetShard().Keyspace(), source.GetShard().ShardName()) + return ts.ForAllSources(func(source *workflow.MigrationSource) error { + ts.Logger().Infof("Deleting shard %s.%s\n", source.GetShard().Keyspace(), source.GetShard().ShardName()) err := ts.wr.DeleteShard(ctx, source.GetShard().Keyspace(), source.GetShard().ShardName(), true, false) if err != nil { - ts.wr.Logger().Errorf("Error deleting shard %s: %v", source.GetShard().ShardName(), err) + ts.Logger().Errorf("Error deleting shard %s: %v", source.GetShard().ShardName(), err) return err } - ts.wr.Logger().Infof("Deleted shard %s.%s\n", source.GetShard().Keyspace(), source.GetShard().ShardName()) + ts.Logger().Infof("Deleted shard %s.%s\n", source.GetShard().Keyspace(), source.GetShard().ShardName()) return nil }) } @@ -1520,10 +1524,10 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error { // Mark target streams as frozen before deleting. If SwitchWrites gets // re-invoked after a freeze, it will skip all the previous steps - err := ts.forAllTargets(func(target *workflow.MigrationTarget) error { - ts.wr.Logger().Infof("Marking target streams frozen for workflow %s db_name %s", ts.workflow, target.GetPrimary().DbName()) - query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", frozenStr, encodeString(target.GetPrimary().DbName()), encodeString(ts.workflow)) - _, err := ts.wr.tmc.VReplicationExec(ctx, target.GetPrimary().Tablet, query) + err := ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + ts.Logger().Infof("Marking target streams frozen for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName()) + query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", frozenStr, encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) + _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) return err }) if err != nil { @@ -1533,36 +1537,36 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error { } func (ts *trafficSwitcher) dropTargetVReplicationStreams(ctx context.Context) error { - return ts.forAllTargets(func(target *workflow.MigrationTarget) error { - ts.wr.Logger().Infof("Deleting target streams for workflow %s db_name %s", ts.workflow, target.GetPrimary().DbName()) - query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.workflow)) - _, err := ts.wr.tmc.VReplicationExec(ctx, target.GetPrimary().Tablet, query) + return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + ts.Logger().Infof("Deleting target streams for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName()) + query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) + _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) return err }) } func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Context) error { - return ts.forAllSources(func(source *workflow.MigrationSource) error { - ts.wr.Logger().Infof("Deleting reverse streams for workflow %s db_name %s", ts.workflow, source.GetPrimary().DbName()) + return ts.ForAllSources(func(source *workflow.MigrationSource) error { + ts.Logger().Infof("Deleting reverse streams for workflow %s db_name %s", ts.WorkflowName(), source.GetPrimary().DbName()) query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", - encodeString(source.GetPrimary().DbName()), encodeString(workflow.ReverseWorkflowName(ts.workflow))) - _, err := ts.wr.tmc.VReplicationExec(ctx, source.GetPrimary().Tablet, query) + encodeString(source.GetPrimary().DbName()), encodeString(workflow.ReverseWorkflowName(ts.WorkflowName()))) + _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query) return err }) } func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { log.Infof("removeTargetTables") - err := ts.forAllTargets(func(target *workflow.MigrationTarget) error { - for _, tableName := range ts.tables { + err := ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + for _, tableName := range ts.Tables() { query := fmt.Sprintf("drop table %s.%s", target.GetPrimary().DbName(), tableName) - ts.wr.Logger().Infof("Dropping table %s.%s\n", target.GetPrimary().DbName(), tableName) + ts.Logger().Infof("Dropping table %s.%s\n", target.GetPrimary().DbName(), tableName) _, err := ts.wr.ExecuteFetchAsDba(ctx, target.GetPrimary().Alias, query, 1, false, true) if err != nil { - ts.wr.Logger().Errorf("Error removing table %s: %v", tableName, err) + ts.Logger().Errorf("Error removing table %s: %v", tableName, err) return err } - ts.wr.Logger().Infof("Removed table %s.%s\n", target.GetPrimary().DbName(), tableName) + ts.Logger().Infof("Removed table %s.%s\n", target.GetPrimary().DbName(), tableName) } return nil @@ -1571,40 +1575,40 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { return err } - return ts.dropParticipatingTablesFromKeyspace(ctx, ts.targetKeyspace) + return ts.dropParticipatingTablesFromKeyspace(ctx, ts.TargetKeyspaceName()) } func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error { - return ts.forAllTargets(func(target *workflow.MigrationTarget) error { - ts.wr.Logger().Infof("Deleting shard %s.%s\n", target.GetShard().Keyspace(), target.GetShard().ShardName()) + return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + ts.Logger().Infof("Deleting shard %s.%s\n", target.GetShard().Keyspace(), target.GetShard().ShardName()) err := ts.wr.DeleteShard(ctx, target.GetShard().Keyspace(), target.GetShard().ShardName(), true, false) if err != nil { - ts.wr.Logger().Errorf("Error deleting shard %s: %v", target.GetShard().ShardName(), err) + ts.Logger().Errorf("Error deleting shard %s: %v", target.GetShard().ShardName(), err) return err } - ts.wr.Logger().Infof("Deleted shard %s.%s\n", target.GetShard().Keyspace(), target.GetShard().ShardName()) + ts.Logger().Infof("Deleted shard %s.%s\n", target.GetShard().Keyspace(), target.GetShard().ShardName()) return nil }) } func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { - rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) + rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err } - for _, table := range ts.tables { + for _, table := range ts.Tables() { delete(rules, table) delete(rules, table+"@replica") delete(rules, table+"@rdonly") - delete(rules, ts.targetKeyspace+"."+table) - delete(rules, ts.targetKeyspace+"."+table+"@replica") - delete(rules, ts.targetKeyspace+"."+table+"@rdonly") - delete(rules, ts.sourceKeyspace+"."+table) - delete(rules, ts.sourceKeyspace+"."+table+"@replica") - delete(rules, ts.sourceKeyspace+"."+table+"@rdonly") - } - if err := topotools.SaveRoutingRules(ctx, ts.wr.ts, rules); err != nil { + delete(rules, ts.TargetKeyspaceName()+"."+table) + delete(rules, ts.TargetKeyspaceName()+"."+table+"@replica") + delete(rules, ts.TargetKeyspaceName()+"."+table+"@rdonly") + delete(rules, ts.SourceKeyspaceName()+"."+table) + delete(rules, ts.SourceKeyspaceName()+"."+table+"@replica") + delete(rules, ts.SourceKeyspaceName()+"."+table+"@rdonly") + } + if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil { return err } return nil @@ -1613,9 +1617,7 @@ func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { // addParticipatingTablesToKeyspace updates the vschema with the new tables that were created as part of the // Migrate flow. It is called when the Migrate flow is Completed func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, keyspace, tableSpecs string) error { - var err error - var vschema *vschemapb.Keyspace - vschema, err = ts.wr.ts.GetVSchema(ctx, keyspace) + vschema, err := ts.TopoServer().GetVSchema(ctx, keyspace) if err != nil { return err } @@ -1645,5 +1647,5 @@ func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, vschema.Tables[table] = &vschemapb.Table{} } } - return ts.wr.ts.SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) } diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index ba3d5c56769..8db28d70368 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -179,7 +179,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou return nil, err } if err := ts.validate(ctx); err != nil { - ts.wr.Logger().Errorf("validate: %v", err) + ts.Logger().Errorf("validate: %v", err) return nil, err } tables = strings.TrimSpace(tables) @@ -199,13 +199,13 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou targetKeyspace: targetKeyspace, tables: includeTables, } - for shard, source := range ts.sources { + for shard, source := range ts.Sources() { df.sources[shard] = &shardStreamer{ primary: source.GetPrimary(), } } var oneTarget *workflow.MigrationTarget - for shard, target := range ts.targets { + for shard, target := range ts.Targets() { df.targets[shard] = &shardStreamer{ primary: target.GetPrimary(), } @@ -324,7 +324,7 @@ func (df *vdiff) diffTable(ctx context.Context, wr *Wrangler, table string, td * return vterrors.Wrap(err, "stopTargets") } // Make sure all sources are past the target's positions and start a query stream that records the current source positions. - if err := df.startQueryStreams(ctx, df.ts.sourceKeyspace, df.sources, td.sourceExpression, filteredReplicationWaitTime); err != nil { + if err := df.startQueryStreams(ctx, df.ts.SourceKeyspaceName(), df.sources, td.sourceExpression, filteredReplicationWaitTime); err != nil { return vterrors.Wrap(err, "startQueryStreams(sources)") } // Fast forward the targets to the newly recorded source positions. @@ -332,7 +332,7 @@ func (df *vdiff) diffTable(ctx context.Context, wr *Wrangler, table string, td * return vterrors.Wrap(err, "syncTargets") } // Sources and targets are in sync. Start query streams on the targets. - if err := df.startQueryStreams(ctx, df.ts.targetKeyspace, df.targets, td.targetExpression, filteredReplicationWaitTime); err != nil { + if err := df.startQueryStreams(ctx, df.ts.TargetKeyspaceName(), df.targets, td.targetExpression, filteredReplicationWaitTime); err != nil { return vterrors.Wrap(err, "startQueryStreams(targets)") } // Now that queries are running, target vreplication streams can be restarted. @@ -576,11 +576,11 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err1 = df.forAll(df.sources, func(shard string, source *shardStreamer) error { - sourceTopo := df.ts.wr.ts - if ts.externalTopo != nil { - sourceTopo = ts.externalTopo + sourceTopo := df.ts.TopoServer() + if ts.ExternalTopo() != nil { + sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(sourceTopo, []string{df.sourceCell}, df.ts.sourceKeyspace, shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(sourceTopo, []string{df.sourceCell}, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr) if err != nil { return err } @@ -598,7 +598,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(df.ts.wr.ts, []string{df.targetCell}, df.ts.targetKeyspace, shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(df.ts.TopoServer(), []string{df.targetCell}, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr) if err != nil { return err } @@ -624,13 +624,13 @@ func (df *vdiff) stopTargets(ctx context.Context) error { var mu sync.Mutex err := df.forAll(df.targets, func(shard string, target *shardStreamer) error { - query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for vdiff' where db_name=%s and workflow=%s", encodeString(target.primary.DbName()), encodeString(df.ts.workflow)) - _, err := df.ts.wr.tmc.VReplicationExec(ctx, target.primary.Tablet, query) + query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for vdiff' where db_name=%s and workflow=%s", encodeString(target.primary.DbName()), encodeString(df.ts.WorkflowName())) + _, err := df.ts.TabletManagerClient().VReplicationExec(ctx, target.primary.Tablet, query) if err != nil { return err } - query = fmt.Sprintf("select source, pos from _vt.vreplication where db_name=%s and workflow=%s", encodeString(target.primary.DbName()), encodeString(df.ts.workflow)) - p3qr, err := df.ts.wr.tmc.VReplicationExec(ctx, target.primary.Tablet, query) + query = fmt.Sprintf("select source, pos from _vt.vreplication where db_name=%s and workflow=%s", encodeString(target.primary.DbName()), encodeString(df.ts.WorkflowName())) + p3qr, err := df.ts.TabletManagerClient().VReplicationExec(ctx, target.primary.Tablet, query) if err != nil { return err } @@ -680,7 +680,7 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici return fmt.Errorf("workflow %s.%s: stream has not started on tablet %s", df.targetKeyspace, df.workflow, participant.primary.Alias.String()) } log.Infof("WaitForPosition: tablet %s should reach position %s", participant.tablet.Alias.String(), mysql.EncodePosition(participant.position)) - if err := df.ts.wr.tmc.WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil { + if err := df.ts.TabletManagerClient().WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil { log.Errorf("WaitForPosition error: %s", err) return vterrors.Wrapf(err, "WaitForPosition for tablet %v", topoproto.TabletAliasString(participant.tablet.Alias)) } @@ -760,10 +760,10 @@ func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime ti bls := target.Sources[uid] pos := df.sources[bls.Shard].snapshotPosition query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", pos, uid) - if _, err := df.ts.wr.tmc.VReplicationExec(ctx, target.GetPrimary().Tablet, query); err != nil { + if _, err := df.ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query); err != nil { return err } - if err := df.ts.wr.tmc.VReplicationWaitForPos(waitCtx, target.GetPrimary().Tablet, int(uid), pos); err != nil { + if err := df.ts.TabletManagerClient().VReplicationWaitForPos(waitCtx, target.GetPrimary().Tablet, int(uid), pos); err != nil { return vterrors.Wrapf(err, "VReplicationWaitForPos for tablet %v", topoproto.TabletAliasString(target.GetPrimary().Tablet.Alias)) } return nil @@ -773,7 +773,7 @@ func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime ti } err = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - pos, err := df.ts.wr.tmc.MasterPosition(ctx, target.primary.Tablet) + pos, err := df.ts.TabletManagerClient().MasterPosition(ctx, target.primary.Tablet) if err != nil { return err } @@ -790,9 +790,9 @@ func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime ti // restartTargets restarts the stopped target vreplication streams. func (df *vdiff) restartTargets(ctx context.Context) error { return df.forAll(df.targets, func(shard string, target *shardStreamer) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(target.primary.DbName()), encodeString(df.ts.workflow)) + query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(target.primary.DbName()), encodeString(df.ts.WorkflowName())) log.Infof("restarting target replication with %s", query) - _, err := df.ts.wr.tmc.VReplicationExec(ctx, target.primary.Tablet, query) + _, err := df.ts.TabletManagerClient().VReplicationExec(ctx, target.primary.Tablet, query) return err }) } From 436f1510fa8d7073466b51e3d5526967e4b871fa Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 15 Oct 2021 19:45:49 -0700 Subject: [PATCH 2/8] Extract `ts.compareShards` to `package workflow` This should only be temporarily exported Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/traffic_switcher.go | 44 ++++++++++++++++++++++++ go/vt/wrangler/traffic_switcher.go | 21 ++--------- 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index cd2161a4b75..ff3db3d1de0 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -27,8 +27,10 @@ import ( "strings" "google.golang.org/protobuf/encoding/prototext" + "k8s.io/apimachinery/pkg/util/sets" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -270,6 +272,48 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag }, nil } +// CompareShards compares the list of shards in a workflow with the shards in +// that keyspace according to the topo. It returns an error if they do not match. +// +// This function is used to validate MoveTables workflows. +// +// (TODO|@ajm188): This function is temporarily-exported until *wrangler.trafficSwitcher +// has been fully moved over to this package. Once that refactor is finished, +// this function should be unexported. Consequently, YOU SHOULD NOT DEPEND ON +// THIS FUNCTION EXTERNALLY. +func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ts *topo.Server) error { + shardSet := sets.NewString() + for _, si := range shards { + shardSet.Insert(si.ShardName()) + } + + topoShards, err := ts.GetShardNames(ctx, keyspace) + if err != nil { + return err + } + + topoShardSet := sets.NewString(topoShards...) + if !shardSet.Equal(topoShardSet) { + wfExtra := shardSet.Difference(topoShardSet) + topoExtra := topoShardSet.Difference(shardSet) + + var rec concurrency.AllErrorRecorder + if wfExtra.Len() > 0 { + wfExtraSorted := wfExtra.List() + rec.RecordError(fmt.Errorf("switch command shards not in topo: %v", wfExtraSorted)) + } + + if topoExtra.Len() > 0 { + topoExtraSorted := topoExtra.List() + rec.RecordError(fmt.Errorf("topo shards not in switch command: %v", topoExtraSorted)) + } + + return fmt.Errorf("mismatched shards for keyspace %s: %s", keyspace, strings.Join(rec.ErrorStrings(), "; ")) + } + + return nil +} + // HashStreams produces a stable hash based on the target keyspace and migration // targets. func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64 { diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 52ec621f14d..bbe6378b35a 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -855,10 +855,10 @@ func (ts *trafficSwitcher) validate(ctx context.Context) error { sourceTopo = ts.externalTopo } // All shards must be present. - if err := ts.compareShards(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), sourceTopo); err != nil { + if err := workflow.CompareShards(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), sourceTopo); err != nil { return err } - if err := ts.compareShards(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), ts.wr.ts); err != nil { + if err := workflow.CompareShards(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), ts.wr.ts); err != nil { return err } // Wildcard table names not allowed. @@ -871,23 +871,6 @@ func (ts *trafficSwitcher) validate(ctx context.Context) error { return nil } -func (ts *trafficSwitcher) compareShards(ctx context.Context, keyspace string, sis []*topo.ShardInfo, topo *topo.Server) error { - var shards []string - for _, si := range sis { - shards = append(shards, si.ShardName()) - } - topoShards, err := topo.GetShardNames(ctx, keyspace) - if err != nil { - return err - } - sort.Strings(topoShards) - sort.Strings(shards) - if !reflect.DeepEqual(topoShards, shards) { - return fmt.Errorf("mismatched shards for keyspace %s: topo: %v vs switch command: %v", keyspace, topoShards, shards) - } - return nil -} - func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) From 195e0075ab7ed1d244266c4103ec20861892ee4b Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 15 Oct 2021 19:48:02 -0700 Subject: [PATCH 3/8] Cleanup private versions of `forAll{Sources,Targets}` Signed-off-by: Andrew Mason --- go/vt/wrangler/traffic_switcher.go | 65 +++++++++++++----------------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index bbe6378b35a..81b7444bfc3 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -117,10 +117,37 @@ func (ts *trafficSwitcher) Targets() map[string]*workflow.MigrationTarget { ret func (ts *trafficSwitcher) WorkflowName() string { return ts.workflow } func (ts *trafficSwitcher) ForAllSources(f func(source *workflow.MigrationSource) error) error { - return ts.forAllSources(f) + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, source := range ts.sources { + wg.Add(1) + go func(source *workflow.MigrationSource) { + defer wg.Done() + + if err := f(source); err != nil { + allErrors.RecordError(err) + } + }(source) + } + wg.Wait() + return allErrors.AggrError(vterrors.Aggregate) } + func (ts *trafficSwitcher) ForAllTargets(f func(source *workflow.MigrationTarget) error) error { - return ts.forAllTargets(f) + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, target := range ts.targets { + wg.Add(1) + go func(target *workflow.MigrationTarget) { + defer wg.Done() + + if err := f(target); err != nil { + allErrors.RecordError(err) + } + }(target) + } + wg.Wait() + return allErrors.AggrError(vterrors.Aggregate) } /* end: implementation of workflow.ITrafficSwitcher */ @@ -1314,40 +1341,6 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri return ts.wr.refreshPrimaryTablets(ctx, shards) } -func (ts *trafficSwitcher) forAllSources(f func(*workflow.MigrationSource) error) error { - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for _, source := range ts.sources { - wg.Add(1) - go func(source *workflow.MigrationSource) { - defer wg.Done() - - if err := f(source); err != nil { - allErrors.RecordError(err) - } - }(source) - } - wg.Wait() - return allErrors.AggrError(vterrors.Aggregate) -} - -func (ts *trafficSwitcher) forAllTargets(f func(*workflow.MigrationTarget) error) error { - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for _, target := range ts.targets { - wg.Add(1) - go func(target *workflow.MigrationTarget) { - defer wg.Done() - - if err := f(target); err != nil { - allErrors.RecordError(err) - } - }(target) - } - wg.Wait() - return allErrors.AggrError(vterrors.Aggregate) -} - func (ts *trafficSwitcher) forAllUids(f func(target *workflow.MigrationTarget, uid uint32) error) error { var wg sync.WaitGroup allErrors := &concurrency.AllErrorRecorder{} From 10c5305cb0ecb104791e7d59472b75a05d825e98 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 15 Oct 2021 19:49:28 -0700 Subject: [PATCH 4/8] Make `workflow.Frozen` a const (which it should always have been) and cleanup Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/traffic_switcher.go | 2 +- go/vt/wrangler/traffic_switcher.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index ff3db3d1de0..786af551b9f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -42,7 +42,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -var ( +const ( // Frozen is the message value of frozen vreplication streams. Frozen = "FROZEN" ) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 81b7444bfc3..04b78dc6236 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -51,7 +51,6 @@ import ( ) const ( - frozenStr = "FROZEN" errorNoStreams = "no streams found in keyspace %s for: %s" // use pt-osc's naming convention, this format also ensures vstreamer ignores such tables renameTableTemplate = "_%.59s_old" // limit table name to 64 characters @@ -1502,7 +1501,7 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error { // re-invoked after a freeze, it will skip all the previous steps err := ts.ForAllTargets(func(target *workflow.MigrationTarget) error { ts.Logger().Infof("Marking target streams frozen for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName()) - query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", frozenStr, encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) + query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", workflow.Frozen, encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) return err }) From cc929887f249215912f3c26302e438ef0795fe48 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 15 Oct 2021 19:50:19 -0700 Subject: [PATCH 5/8] [inclusive naming] switch to new tmc rpcs Signed-off-by: Andrew Mason --- go/vt/wrangler/switcher_dry_run.go | 2 +- go/vt/wrangler/traffic_switcher.go | 8 ++++---- go/vt/wrangler/vdiff.go | 2 +- go/vt/wrangler/vdiff_env_test.go | 6 +++++- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index a0e895c1a8d..832f5f1917f 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -189,7 +189,7 @@ func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicatio func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error { logs := make([]string, 0) for _, source := range dr.ts.Sources() { - position, _ := dr.ts.TabletManagerClient().MasterPosition(ctx, source.GetPrimary().Tablet) + position, _ := dr.ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet) logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s at Position %s", dr.ts.SourceKeyspaceName(), source.GetShard().ShardName(), position)) } if len(logs) > 0 { diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 04b78dc6236..46ce6433c37 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1006,7 +1006,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } return ts.ForAllSources(func(source *workflow.MigrationSource) error { var err error - source.Position, err = ts.TabletManagerClient().MasterPosition(ctx, source.GetPrimary().Tablet) + source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet) ts.wr.Logger().Infof("Stopped Source Writes. Position for source %v:%v: %v", ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position) if err != nil { @@ -1056,7 +1056,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati // all targets have caught up, record their positions for setting up reverse workflows return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { var err error - target.Position, err = ts.TabletManagerClient().MasterPosition(ctx, target.GetPrimary().Tablet) + target.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet) ts.Logger().Infof("After catchup, position for target primary %s, %v", target.GetPrimary().AliasString(), target.Position) return err }) @@ -1093,7 +1093,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *workflow.Str func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error { err := ts.ForAllSources(func(source *workflow.MigrationSource) error { var err error - source.Position, err = ts.TabletManagerClient().MasterPosition(ctx, source.GetPrimary().Tablet) + source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet) ts.Logger().Infof("Position for source %v:%v: %v", ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position) return err }) @@ -1102,7 +1102,7 @@ func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error { } return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { var err error - target.Position, err = ts.TabletManagerClient().MasterPosition(ctx, target.GetPrimary().Tablet) + target.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet) ts.Logger().Infof("Position for target %v:%v: %v", ts.TargetKeyspaceName(), target.GetShard().ShardName(), target.Position) return err }) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 8db28d70368..c321e89d93c 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -773,7 +773,7 @@ func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime ti } err = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - pos, err := df.ts.TabletManagerClient().MasterPosition(ctx, target.primary.Tablet) + pos, err := df.ts.TabletManagerClient().PrimaryPosition(ctx, target.primary.Tablet) if err != nil { return err } diff --git a/go/vt/wrangler/vdiff_env_test.go b/go/vt/wrangler/vdiff_env_test.go index 9ca55436ce8..29baaea801e 100644 --- a/go/vt/wrangler/vdiff_env_test.go +++ b/go/vt/wrangler/vdiff_env_test.go @@ -335,8 +335,12 @@ func (tmc *testVDiffTMClient) VReplicationWaitForPos(ctx context.Context, tablet return nil } -// TODO(deepthi): rename this to PrimaryPosition after v12.0 +// TODO(deepthi): remove this after v12.0 func (tmc *testVDiffTMClient) MasterPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { + return tmc.PrimaryPosition(ctx, tablet) +} + +func (tmc *testVDiffTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { pos, ok := tmc.pos[int(tablet.Alias.Uid)] if !ok { return "", fmt.Errorf("no primary position for %d", tablet.Alias.Uid) From ef6835cda606b7db5201ec037d738b8642a9274e Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sat, 16 Oct 2021 05:15:31 -0700 Subject: [PATCH 6/8] Add `ForAllUIDs` to interface and export Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/traffic_switcher.go | 1 + go/vt/wrangler/traffic_switcher.go | 42 ++++++++++++------------ go/vt/wrangler/vdiff.go | 2 +- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 786af551b9f..3973b9cce4f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -120,6 +120,7 @@ type ITrafficSwitcher interface { ForAllSources(f func(source *MigrationSource) error) error ForAllTargets(f func(target *MigrationTarget) error) error + ForAllUIDs(f func(target *MigrationTarget, uid uint32) error) error SourceShards() []*topo.ShardInfo TargetShards() []*topo.ShardInfo } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 46ce6433c37..8726605139e 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -149,6 +149,25 @@ func (ts *trafficSwitcher) ForAllTargets(f func(source *workflow.MigrationTarget return allErrors.AggrError(vterrors.Aggregate) } +func (ts *trafficSwitcher) ForAllUIDs(f func(target *workflow.MigrationTarget, uid uint32) error) error { + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, target := range ts.Targets() { + for uid := range target.Sources { + wg.Add(1) + go func(target *workflow.MigrationTarget, uid uint32) { + defer wg.Done() + + if err := f(target, uid); err != nil { + allErrors.RecordError(err) + } + }(target, uid) + } + } + wg.Wait() + return allErrors.AggrError(vterrors.Aggregate) +} + /* end: implementation of workflow.ITrafficSwitcher */ // For a Reshard, to check whether we have switched reads for a tablet type, we check if any one of the source shards has @@ -1031,7 +1050,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) defer cancel() // source writes have been stopped, wait for all streams on targets to catch up - if err := ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error { + if err := ts.ForAllUIDs(func(target *workflow.MigrationTarget, uid uint32) error { ts.Logger().Infof("Before Catchup: uid: %d, target primary %s, target position %s, shard %s", uid, target.GetPrimary().AliasString(), target.Position, target.GetShard().String()) bls := target.Sources[uid] @@ -1112,7 +1131,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error if err := ts.deleteReverseVReplication(ctx); err != nil { return err } - err := ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error { + err := ts.ForAllUIDs(func(target *workflow.MigrationTarget, uid uint32) error { bls := target.Sources[uid] source := ts.Sources()[bls.Shard] reverseBls := &binlogdatapb.BinlogSource{ @@ -1340,25 +1359,6 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri return ts.wr.refreshPrimaryTablets(ctx, shards) } -func (ts *trafficSwitcher) forAllUids(f func(target *workflow.MigrationTarget, uid uint32) error) error { - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for _, target := range ts.targets { - for uid := range target.Sources { - wg.Add(1) - go func(target *workflow.MigrationTarget, uid uint32) { - defer wg.Done() - - if err := f(target, uid); err != nil { - allErrors.RecordError(err) - } - }(target, uid) - } - } - wg.Wait() - return allErrors.AggrError(vterrors.Aggregate) -} - func (ts *trafficSwitcher) SourceShards() []*topo.ShardInfo { shards := make([]*topo.ShardInfo, 0, len(ts.Sources())) for _, source := range ts.Sources() { diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index c321e89d93c..37b8fb69771 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -756,7 +756,7 @@ func (df *vdiff) streamOne(ctx context.Context, keyspace, shard string, particip func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime time.Duration) error { waitCtx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) defer cancel() - err := df.ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error { + err := df.ts.ForAllUIDs(func(target *workflow.MigrationTarget, uid uint32) error { bls := target.Sources[uid] pos := df.sources[bls.Shard].snapshotPosition query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", pos, uid) From dad596a3e08ddd143d58c21729c6c7b626061a3f Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 17 Oct 2021 10:31:45 -0400 Subject: [PATCH 7/8] Add `ExternalTopo` to the interface and implement it Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/traffic_switcher.go | 1 + go/vt/wrangler/traffic_switcher.go | 1 + 2 files changed, 2 insertions(+) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 3973b9cce4f..c4378a231b3 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -106,6 +106,7 @@ type ITrafficSwitcher interface { /* Functions that expose fields on the *wrangler.trafficSwitcher */ + ExternalTopo() *topo.Server MigrationType() binlogdatapb.MigrationType ReverseWorkflowName() string SourceKeyspaceName() string diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 8726605139e..da95ec75791 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -105,6 +105,7 @@ func (ts *trafficSwitcher) VReplicationExec(ctx context.Context, alias *topodata return ts.wr.VReplicationExec(ctx, alias, query) } +func (ts *trafficSwitcher) ExternalTopo() *topo.Server { return ts.externalTopo } func (ts *trafficSwitcher) MigrationType() binlogdatapb.MigrationType { return ts.migrationType } func (ts *trafficSwitcher) ReverseWorkflowName() string { return ts.reverseWorkflow } func (ts *trafficSwitcher) SourceKeyspaceName() string { return ts.sourceKSSchema.Keyspace.Name } From 846f83fb35a83f24d55d950cb9939b53bb3a09ce Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 17 Oct 2021 10:36:14 -0400 Subject: [PATCH 8/8] Remove unused `wr` param from `tableDiffer.diff` Signed-off-by: Andrew Mason --- go/vt/wrangler/vdiff.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 37b8fb69771..c1b3c3b75c9 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -252,7 +252,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou return nil, err } // Perform the diff of source and target streams. - dr, err := td.diff(ctx, df.ts.wr, &rowsToCompare, debug, onlyPks) + dr, err := td.diff(ctx, &rowsToCompare, debug, onlyPks) if err != nil { return nil, vterrors.Wrap(err, "diff") } @@ -912,7 +912,7 @@ func humanInt(n int64) string { //----------------------------------------------------------------- // tableDiffer -func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *int64, debug, onlyPks bool) (*DiffReport, error) { +func (td *tableDiffer) diff(ctx context.Context, rowsToCompare *int64, debug, onlyPks bool) (*DiffReport, error) { sourceExecutor := newPrimitiveExecutor(ctx, td.sourcePrimitive) targetExecutor := newPrimitiveExecutor(ctx, td.targetPrimitive) dr := &DiffReport{}