Skip to content

Commit

Permalink
Migrate TrafficSwitchDirection and TableRemovalType to package wo…
Browse files Browse the repository at this point in the history
…rkflow

This required some updates of local variables from `workflow` to
`workflowName` so that I could reference the `workflow` package name.

I also reformatted imports in all the files I touched as part of this
change.

Signed-off-by: Andrew Mason <amason@slack-corp.com>
  • Loading branch information
ajm188 committed May 27, 2021
1 parent 6d6f643 commit 15e9ce6
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 155 deletions.
25 changes: 13 additions & 12 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/wrangler"

Expand Down Expand Up @@ -1967,7 +1968,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
action := subFlags.Arg(0)
ksWorkflow := subFlags.Arg(1)
target, workflow, err := splitKeyspaceWorkflow(ksWorkflow)
target, workflowName, err := splitKeyspaceWorkflow(ksWorkflow)
if err != nil {
return err
}
Expand All @@ -1979,19 +1980,19 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla

vrwp := &wrangler.VReplicationWorkflowParams{
TargetKeyspace: target,
Workflow: workflow,
Workflow: workflowName,
DryRun: *dryRun,
AutoStart: *autoStart,
StopAfterCopy: *stopAfterCopy,
}

printDetails := func() error {
s := ""
res, err := wr.ShowWorkflow(ctx, workflow, target)
res, err := wr.ShowWorkflow(ctx, workflowName, target)
if err != nil {
return err
}
s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflow)
s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflowName)
for ksShard := range res.ShardStatuses {
statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
for _, st := range statuses {
Expand Down Expand Up @@ -2227,7 +2228,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
}
case vReplicationWorkflowActionSwitchTraffic:
dryRunResults, err = wf.SwitchTraffic(wrangler.DirectionForward)
dryRunResults, err = wf.SwitchTraffic(workflow.DirectionForward)
case vReplicationWorkflowActionReverseTraffic:
dryRunResults, err = wf.ReverseTraffic()
case vReplicationWorkflowActionComplete:
Expand Down Expand Up @@ -2442,18 +2443,18 @@ func commandDropSources(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if subFlags.NArg() != 1 {
return fmt.Errorf("<keyspace.workflow> is required")
}
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
keyspace, workflowName, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}

removalType := wrangler.DropTable
removalType := workflow.DropTable
if *renameTables {
removalType = wrangler.RenameTable
removalType = workflow.RenameTable
}

_, _, _ = dryRun, keyspace, workflow
dryRunResults, err := wr.DropSources(ctx, keyspace, workflow, removalType, *keepData, false, *dryRun)
_, _, _ = dryRun, keyspace, workflowName
dryRunResults, err := wr.DropSources(ctx, keyspace, workflowName, removalType, *keepData, false, *dryRun)
if err != nil {
return err
}
Expand Down Expand Up @@ -2495,9 +2496,9 @@ func commandSwitchReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if *cellsStr != "" {
cells = strings.Split(*cellsStr, ",")
}
direction := wrangler.DirectionForward
direction := workflow.DirectionForward
if *reverse {
direction = wrangler.DirectionBackward
direction = workflow.DirectionBackward
}
if subFlags.NArg() != 1 {
return fmt.Errorf("<keyspace.workflow> is required")
Expand Down
33 changes: 33 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,39 @@ var (
ErrNoStreams = errors.New("no streams found")
)

// TrafficSwitchDirection specifies the switching direction.
type TrafficSwitchDirection int

// The following constants define the switching direction.
const (
DirectionForward = TrafficSwitchDirection(iota)
DirectionBackward
)

// TableRemovalType specifies the way the a table will be removed during a
// DropSource for a MoveTables workflow.
type TableRemovalType int

// The following consts define if DropSource will drop or rename the table.
const (
DropTable = TableRemovalType(iota)
RenameTable
)

var tableRemovalTypeStrs = [...]string{
"DROP TABLE",
"RENAME TABLE",
}

// String returns a string representation of a TableRemovalType
func (trt TableRemovalType) String() string {
if trt < DropTable || trt > RenameTable {
return "Unknown"
}

return tableRemovalTypeStrs[trt]
}

// ITrafficSwitcher is a temporary hack to allow us to move streamMigrater out
// of package wrangler without also needing to move trafficSwitcher in the same
// changeset.
Expand Down
59 changes: 30 additions & 29 deletions go/vt/wrangler/stream_migrater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package wrangler

import (
"context"
"fmt"
"strings"
"testing"
"time"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtctl/workflow"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand All @@ -42,13 +43,13 @@ func TestStreamMigrateMainflow(t *testing.T) {
tme.expectNoPreviousJournals()

// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}

tme.expectCheckJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestStreamMigrateMainflow(t *testing.T) {

tme.expectDeleteReverseVReplication()
tme.expectDeleteTargetVReplication()
if _, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", DropTable, false, false, false); err != nil {
if _, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", workflow.DropTable, false, false, false); err != nil {
t.Fatal(err)
}
verifyQueries(t, tme.allDBClients)
Expand All @@ -195,12 +196,12 @@ func TestStreamMigrateTwoStreams(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -364,12 +365,12 @@ func TestStreamMigrateOneToMany(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -497,12 +498,12 @@ func TestStreamMigrateManyToOne(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -632,12 +633,12 @@ func TestStreamMigrateSyncSuccess(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -824,12 +825,12 @@ func TestStreamMigrateSyncFail(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -946,12 +947,12 @@ func TestStreamMigrateCancel(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1051,12 +1052,12 @@ func TestStreamMigrateStoppedStreams(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1110,12 +1111,12 @@ func TestStreamMigrateCancelWithStoppedStreams(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1177,12 +1178,12 @@ func TestStreamMigrateStillCopying(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1240,12 +1241,12 @@ func TestStreamMigrateEmptyWorkflow(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1302,12 +1303,12 @@ func TestStreamMigrateDupWorkflow(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1365,12 +1366,12 @@ func TestStreamMigrateStreamsMismatch(t *testing.T) {

tme.expectNoPreviousJournals()
// Migrate reads
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, DirectionForward, false)
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", rdOnly, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
tme.expectNoPreviousJournals()
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, DirectionForward, false)
_, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", replica, nil, workflow.DirectionForward, false)
if err != nil {
t.Fatal(err)
}
Expand Down
10 changes: 5 additions & 5 deletions go/vt/wrangler/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ limitations under the License.
package wrangler

import (
"context"
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtctl/workflow"

"context"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var _ iswitcher = (*switcher)(nil)
Expand All @@ -48,19 +48,19 @@ func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error {
return r.ts.validateWorkflowHasCompleted(ctx)
}

func (r *switcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) error {
func (r *switcher) removeSourceTables(ctx context.Context, removalType workflow.TableRemovalType) error {
return r.ts.removeSourceTables(ctx, removalType)
}

func (r *switcher) dropSourceShards(ctx context.Context) error {
return r.ts.dropSourceShards(ctx)
}

func (r *switcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error {
func (r *switcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error {
return r.ts.switchShardReads(ctx, cells, servedTypes, direction)
}

func (r *switcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error {
func (r *switcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error {
return r.ts.switchTableReads(ctx, cells, servedTypes, direction)
}

Expand Down
Loading

0 comments on commit 15e9ce6

Please sign in to comment.