Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vtctl/workflow] More trafficswitcher extraction #9007

Merged
merged 8 commits into from
Oct 20, 2021
54 changes: 53 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"strings"

"google.golang.org/protobuf/encoding/prototext"
"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/evalengine"
Expand All @@ -40,7 +42,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
const (
// Frozen is the message value of frozen vreplication streams.
Frozen = "FROZEN"
)
Expand Down Expand Up @@ -104,16 +106,24 @@ type ITrafficSwitcher interface {

/* Functions that expose fields on the *wrangler.trafficSwitcher */

ExternalTopo() *topo.Server
MigrationType() binlogdatapb.MigrationType
ReverseWorkflowName() string
SourceKeyspaceName() string
SourceKeyspaceSchema() *vindexes.KeyspaceSchema
Sources() map[string]*MigrationSource
Tables() []string
TargetKeyspaceName() string
Targets() map[string]*MigrationTarget
WorkflowName() string

/* Functions that *wrangler.trafficSwitcher implements */

ForAllSources(f func(source *MigrationSource) error) error
ForAllTargets(f func(target *MigrationTarget) error) error
ForAllUIDs(f func(target *MigrationTarget, uid uint32) error) error
SourceShards() []*topo.ShardInfo
TargetShards() []*topo.ShardInfo
}

// TargetInfo contains the metadata for a set of targets involved in a workflow.
Expand Down Expand Up @@ -264,6 +274,48 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag
}, nil
}

// CompareShards compares the list of shards in a workflow with the shards in
// that keyspace according to the topo. It returns an error if they do not match.
//
// This function is used to validate MoveTables workflows.
//
// (TODO|@ajm188): This function is temporarily-exported until *wrangler.trafficSwitcher
// has been fully moved over to this package. Once that refactor is finished,
// this function should be unexported. Consequently, YOU SHOULD NOT DEPEND ON
// THIS FUNCTION EXTERNALLY.
func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ts *topo.Server) error {
shardSet := sets.NewString()
for _, si := range shards {
shardSet.Insert(si.ShardName())
}

topoShards, err := ts.GetShardNames(ctx, keyspace)
if err != nil {
return err
}

topoShardSet := sets.NewString(topoShards...)
if !shardSet.Equal(topoShardSet) {
wfExtra := shardSet.Difference(topoShardSet)
topoExtra := topoShardSet.Difference(shardSet)

var rec concurrency.AllErrorRecorder
if wfExtra.Len() > 0 {
wfExtraSorted := wfExtra.List()
rec.RecordError(fmt.Errorf("switch command shards not in topo: %v", wfExtraSorted))
}

if topoExtra.Len() > 0 {
topoExtraSorted := topoExtra.List()
rec.RecordError(fmt.Errorf("topo shards not in switch command: %v", topoExtraSorted))
}

return fmt.Errorf("mismatched shards for keyspace %s: %s", keyspace, strings.Join(rec.ErrorStrings(), "; "))
}

return nil
}

// HashStreams produces a stable hash based on the target keyspace and migration
// targets.
func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64 {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *switcher) stopStreams(ctx context.Context, sm *workflow.StreamMigrator)
}

func (r *switcher) cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) {
r.ts.wr.Logger().Infof("Cancel was requested.")
r.ts.Logger().Infof("Cancel was requested.")
r.ts.cancelMigration(ctx, sm)
}

Expand Down
82 changes: 41 additions & 41 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,34 @@ func (dr *switcherDryRun) deleteRoutingRules(ctx context.Context) error {
func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error {
sourceShards := make([]string, 0)
targetShards := make([]string, 0)
for _, source := range dr.ts.sources {
for _, source := range dr.ts.Sources() {
sourceShards = append(sourceShards, source.GetShard().ShardName())
}
for _, target := range dr.ts.targets {
for _, target := range dr.ts.Targets() {
targetShards = append(targetShards, target.GetShard().ShardName())
}
sort.Strings(sourceShards)
sort.Strings(targetShards)
if direction == workflow.DirectionForward {
dr.drLog.Log(fmt.Sprintf("Switch reads from keyspace %s to keyspace %s for shards %s to shards %s",
dr.ts.sourceKeyspace, dr.ts.targetKeyspace, strings.Join(sourceShards, ","), strings.Join(targetShards, ",")))
dr.ts.SourceKeyspaceName(), dr.ts.TargetKeyspaceName(), strings.Join(sourceShards, ","), strings.Join(targetShards, ",")))
} else {
dr.drLog.Log(fmt.Sprintf("Switch reads from keyspace %s to keyspace %s for shards %s to shards %s",
dr.ts.targetKeyspace, dr.ts.sourceKeyspace, strings.Join(targetShards, ","), strings.Join(sourceShards, ",")))
dr.ts.TargetKeyspaceName(), dr.ts.SourceKeyspaceName(), strings.Join(targetShards, ","), strings.Join(sourceShards, ",")))
}
return nil
}

func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error {
ks := dr.ts.targetKeyspace
ks := dr.ts.TargetKeyspaceName()
if direction == workflow.DirectionBackward {
ks = dr.ts.sourceKeyspace
ks = dr.ts.SourceKeyspaceName()
}
var tabletTypes []string
for _, servedType := range servedTypes {
tabletTypes = append(tabletTypes, servedType.String())
}
tables := strings.Join(dr.ts.tables, ",")
tables := strings.Join(dr.ts.Tables(), ",")
dr.drLog.Log(fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]",
tables, ks, strings.Join(tabletTypes, ",")))
dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables))
Expand All @@ -94,24 +94,24 @@ func (dr *switcherDryRun) createJournals(ctx context.Context, sourceWorkflows []
}

func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error {
dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables [%s]", dr.ts.targetKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables [%s]", dr.ts.TargetKeyspaceName(), strings.Join(dr.ts.Tables(), ",")))
return nil
}

func (dr *switcherDryRun) changeRouting(ctx context.Context) error {
dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.sourceKeyspace, dr.ts.targetKeyspace))
dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.SourceKeyspaceName(), dr.ts.TargetKeyspaceName()))
var deleteLogs, addLogs []string
if dr.ts.migrationType == binlogdatapb.MigrationType_TABLES {
tables := strings.Join(dr.ts.tables, ",")
if dr.ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
tables := strings.Join(dr.ts.Tables(), ",")
dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables))
return nil
}
deleteLogs = nil
addLogs = nil
for _, source := range dr.ts.sources {
for _, source := range dr.ts.Sources() {
deleteLogs = append(deleteLogs, fmt.Sprintf("\tShard %s, Tablet %d", source.GetShard().ShardName(), source.GetShard().PrimaryAlias.Uid))
}
for _, target := range dr.ts.targets {
for _, target := range dr.ts.Targets() {
addLogs = append(addLogs, fmt.Sprintf("\tShard %s, Tablet %d", target.GetShard().ShardName(), target.GetShard().PrimaryAlias.Uid))
}
if len(deleteLogs) > 0 {
Expand All @@ -126,7 +126,7 @@ func (dr *switcherDryRun) changeRouting(ctx context.Context) error {
func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error {
dr.drLog.Log("SwitchWrites completed, freeze and delete vreplication streams on:")
logs := make([]string, 0)
for _, t := range ts.targets {
for _, t := range ts.Targets() {
logs = append(logs, fmt.Sprintf("\ttablet %d", t.GetPrimary().Alias.Uid))
}
dr.drLog.LogSlice(logs)
Expand All @@ -136,15 +136,15 @@ func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *traffi
func (dr *switcherDryRun) startReverseVReplication(ctx context.Context) error {
dr.drLog.Log("Start reverse replication streams on:")
logs := make([]string, 0)
for _, t := range dr.ts.sources {
for _, t := range dr.ts.Sources() {
logs = append(logs, fmt.Sprintf("\ttablet %d", t.GetPrimary().Alias.Uid))
}
dr.drLog.LogSlice(logs)
return nil
}

func (dr *switcherDryRun) createReverseVReplication(ctx context.Context) error {
dr.drLog.Log(fmt.Sprintf("Create reverse replication workflow %s", dr.ts.reverseWorkflow))
dr.drLog.Log(fmt.Sprintf("Create reverse replication workflow %s", dr.ts.ReverseWorkflowName()))
return nil
}

Expand All @@ -156,7 +156,7 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *workflow.Strea
}
logs := make([]string, 0)

dr.drLog.Log(fmt.Sprintf("Migrate streams to %s:", dr.ts.targetKeyspace))
dr.drLog.Log(fmt.Sprintf("Migrate streams to %s:", dr.ts.TargetKeyspaceName()))
for key, streams := range sm.Streams() {
for _, stream := range streams {
logs = append(logs, fmt.Sprintf("\tShard %s Id %d, Workflow %s, Pos %s, BinLogSource %v", key, stream.ID, stream.Workflow, mysql.EncodePosition(stream.Position), stream.BinlogSource))
Expand All @@ -167,7 +167,7 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *workflow.Strea
dr.drLog.LogSlice(logs)
logs = nil
}
for _, target := range dr.ts.targets {
for _, target := range dr.ts.Targets() {
tabletStreams := templates
for _, vrs := range tabletStreams {
logs = append(logs, fmt.Sprintf("\t Keyspace %s, Shard %s, Tablet %d, Workflow %s, Id %d, Pos %v, BinLogSource %s",
Expand All @@ -188,12 +188,12 @@ func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicatio

func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error {
logs := make([]string, 0)
for _, source := range dr.ts.sources {
position, _ := dr.ts.wr.tmc.MasterPosition(ctx, source.GetPrimary().Tablet)
logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s at Position %s", dr.ts.sourceKeyspace, source.GetShard().ShardName(), position))
for _, source := range dr.ts.Sources() {
position, _ := dr.ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet)
logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s at Position %s", dr.ts.SourceKeyspaceName(), source.GetShard().ShardName(), position))
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables [%s]:", dr.ts.sourceKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables [%s]:", dr.ts.SourceKeyspaceName(), strings.Join(dr.ts.Tables(), ",")))
dr.drLog.LogSlice(logs)
}
return nil
Expand All @@ -208,7 +208,7 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *workflow.StreamMi
}
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Stop streams on keyspace %s", dr.ts.sourceKeyspace))
dr.drLog.Log(fmt.Sprintf("Stop streams on keyspace %s", dr.ts.SourceKeyspaceName()))
dr.drLog.LogSlice(logs)
}
return nil, nil
Expand All @@ -227,8 +227,8 @@ func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string)

func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType workflow.TableRemovalType) error {
logs := make([]string, 0)
for _, source := range dr.ts.sources {
for _, tableName := range dr.ts.tables {
for _, source := range dr.ts.Sources() {
for _, tableName := range dr.ts.Tables() {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s",
source.GetPrimary().Keyspace, source.GetPrimary().Shard, source.GetPrimary().DbName(), source.GetPrimary().Alias.Uid, tableName))
}
Expand All @@ -239,7 +239,7 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType wo
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("%s these tables from the database and removing them from the vschema for keyspace %s:",
action, dr.ts.sourceKeyspace))
action, dr.ts.SourceKeyspaceName()))
dr.drLog.LogSlice(logs)
}
return nil
Expand All @@ -248,8 +248,8 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType wo
func (dr *switcherDryRun) dropSourceShards(ctx context.Context) error {
logs := make([]string, 0)
tabletsList := make(map[string][]string)
for _, si := range dr.ts.sourceShards() {
tabletAliases, err := dr.ts.wr.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName())
for _, si := range dr.ts.SourceShards() {
tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName())
if err != nil {
return err
}
Expand All @@ -276,9 +276,9 @@ func (dr *switcherDryRun) validateWorkflowHasCompleted(ctx context.Context) erro
func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) error {
dr.drLog.Log("Delete vreplication streams on target:")
logs := make([]string, 0)
for _, t := range dr.ts.targets {
for _, t := range dr.ts.Targets() {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d",
t.GetShard().Keyspace(), t.GetShard().ShardName(), dr.ts.workflow, t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid))
t.GetShard().Keyspace(), t.GetShard().ShardName(), dr.ts.WorkflowName(), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid))
}
dr.drLog.LogSlice(logs)
return nil
Expand All @@ -287,19 +287,19 @@ func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) err
func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Context) error {
dr.drLog.Log("Delete reverse vreplication streams on source:")
logs := make([]string, 0)
for _, t := range dr.ts.sources {
for _, t := range dr.ts.Sources() {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d",
t.GetShard().Keyspace(), t.GetShard().ShardName(), workflow.ReverseWorkflowName(dr.ts.workflow), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid))
t.GetShard().Keyspace(), t.GetShard().ShardName(), workflow.ReverseWorkflowName(dr.ts.WorkflowName()), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid))
}
dr.drLog.LogSlice(logs)
return nil
}

func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error {
logs := make([]string, 0)
for _, target := range dr.ts.targets {
for _, target := range dr.ts.Targets() {
logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s, Tablet %d, Workflow %s, DbName %s",
target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().Alias.Uid, dr.ts.workflow, target.GetPrimary().DbName()))
target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().Alias.Uid, dr.ts.WorkflowName(), target.GetPrimary().DbName()))
}
if len(logs) > 0 {
dr.drLog.Log("Mark vreplication streams frozen on:")
Expand All @@ -310,11 +310,11 @@ func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error {

func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error {
logs := make([]string, 0)
for _, si := range dr.ts.sourceShards() {
for _, si := range dr.ts.SourceShards() {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Tablet %d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid))
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Denied tables [%s] will be removed from:", strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Denied tables [%s] will be removed from:", strings.Join(dr.ts.Tables(), ",")))
dr.drLog.LogSlice(logs)
}
return nil
Expand All @@ -326,15 +326,15 @@ func (dr *switcherDryRun) logs() *[]string {

func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error {
logs := make([]string, 0)
for _, target := range dr.ts.targets {
for _, tableName := range dr.ts.tables {
for _, target := range dr.ts.Targets() {
for _, tableName := range dr.ts.Tables() {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s",
target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().DbName(), target.GetPrimary().Alias.Uid, tableName))
}
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Dropping these tables from the database and removing from the vschema for keyspace %s:",
dr.ts.targetKeyspace))
dr.ts.TargetKeyspaceName()))
dr.drLog.LogSlice(logs)
}
return nil
Expand All @@ -343,8 +343,8 @@ func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error {
func (dr *switcherDryRun) dropTargetShards(ctx context.Context) error {
logs := make([]string, 0)
tabletsList := make(map[string][]string)
for _, si := range dr.ts.targetShards() {
tabletAliases, err := dr.ts.wr.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName())
for _, si := range dr.ts.TargetShards() {
tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName())
if err != nil {
return err
}
Expand Down
Loading