Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 14, 2025
1 parent 73f1c85 commit 369dd87
Showing 1 changed file with 8 additions and 12 deletions.
20 changes: 8 additions & 12 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
}
log.Infof("cancelMigration (%v): starting", ts.WorkflowName())
ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName())
// We create a new context while canceling the migration, so that we are independent of the original
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
Expand All @@ -1169,53 +1169,49 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
if !ts.IsMultiTenantMigration() {
log.Infof("cancelMigration (%v): switching denied tables to target", ts.WorkflowName())
ts.Logger().Infof("cancelMigration (%v): switching denied tables to target", ts.WorkflowName())
err = ts.switchDeniedTables(cmCtx, true /* revert */)
} else {
log.Infof("cancelMigration (%v): multi-tenant, not switching denied tables to target", ts.WorkflowName())
ts.Logger().Infof("cancelMigration (%v): multi-tenant, not switching denied tables to target", ts.WorkflowName())
}
} else {
log.Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())
ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
log.Infof("Cancel migration failed for %v: could not revert denied tables / shard access: %v", ts.WorkflowName(), err)
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
}

if err := sm.CancelStreamMigrations(cmCtx); err != nil {
log.Infof("Cancel migration failed for %v: could not cancel stream migrations: %v", ts.WorkflowName(), err)
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
}

log.Infof("cancelMigration (%s): restarting vreplication workflows", ts.WorkflowName())
ts.Logger().Infof("cancelMigration (%s): restarting vreplication workflows", ts.WorkflowName())
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
_, err := ts.TabletManagerClient().VReplicationExec(cmCtx, target.GetPrimary().Tablet, query)
return err
})
if err != nil {
log.Infof("Cancel migration failed for %v: could not restart vreplication: %v", ts.WorkflowName(), err)
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
}

log.Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())
ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
log.Infof("Cancel migration failed for %v: could not delete reverse vreplication streams: %v", ts.WorkflowName(), err)
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
}

if cancelErrs.HasErrors() {
log.Infof("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))
ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}

log.Infof("cancelMigration (%v): completed", ts.WorkflowName())
ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName())
return nil
}

Expand Down

0 comments on commit 369dd87

Please sign in to comment.