Skip to content

Commit

Permalink
VReplication: Update singular workflow in traffic switcher (vitessio#…
Browse files Browse the repository at this point in the history
…14826)

Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
mattlord authored and ejortegau committed Jan 24, 2024
1 parent 805fd9a commit 0fefe22
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string,

func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error {
return ts.ForAllSources(func(source *MigrationSource) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s", encodeString(source.GetPrimary().DbName()))
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(source.GetPrimary().DbName()), encodeString(ts.ReverseWorkflowName()))
_, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query)
return err
})
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,8 @@ func (ts *trafficSwitcher) deleteShardRoutingRules(ctx context.Context) error {

func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error {
return ts.ForAllSources(func(source *workflow.MigrationSource) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s", encodeString(source.GetPrimary().DbName()))
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(source.GetPrimary().DbName()), encodeString(ts.ReverseWorkflowName()))
_, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query)
return err
})
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ func (tme *testShardMigraterEnv) expectStartReverseVReplication() {
// NOTE: this is not a faithful reproduction of what should happen.
// The ids returned are not accurate.
for _, dbclient := range tme.dbSourceClients {
dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil)
dbclient.addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
dbclient.addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
dbclient.addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
Expand Down
20 changes: 10 additions & 10 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,11 @@ func TestTableMigrateMainflow(t *testing.T) {
createJournals()

startReverseVReplication := func() {
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
Expand Down Expand Up @@ -731,11 +731,11 @@ func TestShardMigrateMainflow(t *testing.T) {
createJournals()

startReverseVReplication := func() {
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
Expand Down Expand Up @@ -1233,11 +1233,11 @@ func TestTableMigrateJournalExists(t *testing.T) {
tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil)

// mi.startReverseVReplication
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
Expand Down Expand Up @@ -1312,11 +1312,11 @@ func TestShardMigrateJournalExists(t *testing.T) {
tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil)

// mi.startReverseVReplication
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
Expand Down Expand Up @@ -2043,11 +2043,11 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) {
createJournals()

startReverseVReplication := func() {
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil)
tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
Expand Down

0 comments on commit 0fefe22

Please sign in to comment.