Skip to content

Commit

Permalink
Merge pull request #8190 from tinyspeck/am_more_workflow_extraction
Browse files Browse the repository at this point in the history
[workflow] Migrate `getCellsWith{Shard,Table}ReadsSwitched`, `TrafficSwitchDirection` and `TableRemovalType` to package workflow
  • Loading branch information
rafael authored Jun 2, 2021
2 parents 06f4f20 + 87b3614 commit 9b3c055
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 244 deletions.
25 changes: 13 additions & 12 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/wrangler"

Expand Down Expand Up @@ -1967,7 +1968,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
action := subFlags.Arg(0)
ksWorkflow := subFlags.Arg(1)
target, workflow, err := splitKeyspaceWorkflow(ksWorkflow)
target, workflowName, err := splitKeyspaceWorkflow(ksWorkflow)
if err != nil {
return err
}
Expand All @@ -1979,19 +1980,19 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla

vrwp := &wrangler.VReplicationWorkflowParams{
TargetKeyspace: target,
Workflow: workflow,
Workflow: workflowName,
DryRun: *dryRun,
AutoStart: *autoStart,
StopAfterCopy: *stopAfterCopy,
}

printDetails := func() error {
s := ""
res, err := wr.ShowWorkflow(ctx, workflow, target)
res, err := wr.ShowWorkflow(ctx, workflowName, target)
if err != nil {
return err
}
s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflow)
s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflowName)
for ksShard := range res.ShardStatuses {
statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
for _, st := range statuses {
Expand Down Expand Up @@ -2227,7 +2228,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
}
case vReplicationWorkflowActionSwitchTraffic:
dryRunResults, err = wf.SwitchTraffic(wrangler.DirectionForward)
dryRunResults, err = wf.SwitchTraffic(workflow.DirectionForward)
case vReplicationWorkflowActionReverseTraffic:
dryRunResults, err = wf.ReverseTraffic()
case vReplicationWorkflowActionComplete:
Expand Down Expand Up @@ -2442,18 +2443,18 @@ func commandDropSources(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if subFlags.NArg() != 1 {
return fmt.Errorf("<keyspace.workflow> is required")
}
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
keyspace, workflowName, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}

removalType := wrangler.DropTable
removalType := workflow.DropTable
if *renameTables {
removalType = wrangler.RenameTable
removalType = workflow.RenameTable
}

_, _, _ = dryRun, keyspace, workflow
dryRunResults, err := wr.DropSources(ctx, keyspace, workflow, removalType, *keepData, false, *dryRun)
_, _, _ = dryRun, keyspace, workflowName
dryRunResults, err := wr.DropSources(ctx, keyspace, workflowName, removalType, *keepData, false, *dryRun)
if err != nil {
return err
}
Expand Down Expand Up @@ -2495,9 +2496,9 @@ func commandSwitchReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if *cellsStr != "" {
cells = strings.Split(*cellsStr, ",")
}
direction := wrangler.DirectionForward
direction := workflow.DirectionForward
if *reverse {
direction = wrangler.DirectionBackward
direction = workflow.DirectionBackward
}
if subFlags.NArg() != 1 {
return fmt.Errorf("<keyspace.workflow> is required")
Expand Down
150 changes: 150 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/workflow/vexec"
"vitess.io/vitess/go/vt/vtgate/evalengine"
Expand Down Expand Up @@ -104,6 +106,154 @@ func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, table
return &journal, exists, nil
}

// GetCellsWithShardReadsSwitched returns the topo cells partitioned into two
// slices: one with the cells where shard reads have been switched for the given
// tablet type and one with the cells where shard reads have not been switched
// for the given tablet type.
//
// This function is for use in Reshard, and "switched reads" is defined as if
// any one of the source shards has the query service disabled in its tablet
// control record.
func (s *Server) GetCellsWithShardReadsSwitched(
ctx context.Context,
keyspace string,
si *topo.ShardInfo,
tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error) {
cells, err := s.ts.GetCellInfoNames(ctx)
if err != nil {
return nil, nil, err
}

for _, cell := range cells {
srvks, err := s.ts.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return nil, nil, err
}

// Checking one shard is enough.
var (
shardServedTypes []string
found bool
noControls bool
)

for _, partition := range srvks.GetPartitions() {
if tabletType != partition.GetServedType() {
continue
}

// If reads and writes are both switched it is possible that the
// shard is not in the partition table.
for _, shardReference := range partition.GetShardReferences() {
if key.KeyRangeEqual(shardReference.GetKeyRange(), si.GetKeyRange()) {
found = true
break
}
}

// It is possible that there are no tablet controls if the target
// shards are not yet serving, or once reads and writes are both
// switched.
if len(partition.GetShardTabletControls()) == 0 {
noControls = true
break
}

for _, tabletControl := range partition.GetShardTabletControls() {
if key.KeyRangeEqual(tabletControl.GetKeyRange(), si.GetKeyRange()) {
if !tabletControl.GetQueryServiceDisabled() {
shardServedTypes = append(shardServedTypes, si.ShardName())
}

break
}
}
}

if found && (len(shardServedTypes) > 0 || noControls) {
cellsNotSwitched = append(cellsNotSwitched, cell)
} else {
cellsSwitched = append(cellsSwitched, cell)
}
}

return cellsSwitched, cellsNotSwitched, nil
}

// GetCellsWithTableReadsSwitched returns the topo cells partitioned into two
// slices: one with the cells where table reads have been switched for the given
// tablet type and one with the cells where table reads have not been switched
// for the given tablet type.
//
// This function is for use in MoveTables, and "switched reads" is defined as if
// the routing rule for a (table, tablet_type) is pointing to the target
// keyspace.
func (s *Server) GetCellsWithTableReadsSwitched(
ctx context.Context,
keyspace string,
table string,
tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error) {
cells, err := s.ts.GetCellInfoNames(ctx)
if err != nil {
return nil, nil, err
}

getKeyspace := func(ruleTarget string) (string, error) {
arr := strings.Split(ruleTarget, ".")
if len(arr) != 2 {
return "", fmt.Errorf("rule target is not correctly formatted: %s", ruleTarget)
}

return arr[0], nil
}

for _, cell := range cells {
srvVSchema, err := s.ts.GetSrvVSchema(ctx, cell)
if err != nil {
return nil, nil, err
}

var (
found bool
switched bool
)

for _, rule := range srvVSchema.RoutingRules.Rules {
ruleName := fmt.Sprintf("%s.%s@%s", keyspace, table, strings.ToLower(tabletType.String()))
if rule.FromTable == ruleName {
found = true

for _, to := range rule.ToTables {
ks, err := getKeyspace(to)
if err != nil {
log.Errorf(err.Error())
return nil, nil, err
}

if ks == keyspace {
switched = true
break // if one table in the workflow switched, we are done.
}
}
}

if found {
break
}
}

if switched {
cellsSwitched = append(cellsSwitched, cell)
} else {
cellsNotSwitched = append(cellsNotSwitched, cell)
}
}

return cellsSwitched, cellsNotSwitched, nil
}

// GetWorkflows returns a list of all workflows that exist in a given keyspace,
// with some additional filtering depending on the request parameters (for
// example, ActiveOnly=true restricts the search to only workflows that are
Expand Down
33 changes: 33 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,39 @@ var (
ErrNoStreams = errors.New("no streams found")
)

// TrafficSwitchDirection specifies the switching direction.
type TrafficSwitchDirection int

// The following constants define the switching direction.
const (
DirectionForward = TrafficSwitchDirection(iota)
DirectionBackward
)

// TableRemovalType specifies the way the a table will be removed during a
// DropSource for a MoveTables workflow.
type TableRemovalType int

// The following consts define if DropSource will drop or rename the table.
const (
DropTable = TableRemovalType(iota)
RenameTable
)

var tableRemovalTypeStrs = [...]string{
"DROP TABLE",
"RENAME TABLE",
}

// String returns a string representation of a TableRemovalType
func (trt TableRemovalType) String() string {
if trt < DropTable || trt > RenameTable {
return "Unknown"
}

return tableRemovalTypeStrs[trt]
}

// ITrafficSwitcher is a temporary hack to allow us to move streamMigrater out
// of package wrangler without also needing to move trafficSwitcher in the same
// changeset.
Expand Down
Loading

0 comments on commit 9b3c055

Please sign in to comment.