Skip to content

Commit

Permalink
Add copyright. Remove dead code
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 21, 2025
1 parent 6cdd137 commit 5273121
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 200 deletions.
229 changes: 29 additions & 200 deletions go/vt/vtctl/workflow/sequences.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
/*
Copyright 2025 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
"context"
"fmt"
"sort"
"strings"
"sync"

Expand All @@ -12,7 +27,6 @@ import (
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
Expand Down Expand Up @@ -272,6 +286,19 @@ func (ts *trafficSwitcher) updateSequenceValue(ctx context.Context, seq *sequenc
return nil
}

// initializeTargetSequences initializes the backing sequence tables
// using a map keyed by the backing sequence table name.
//
// The backing tables must have already been created, unless a default
// global keyspace exists for the trafficSwitcher -- in which case we
// will create the backing table there if needed.

// This function will then ensure that the next value is set to a value
// greater than any currently stored in the using table on the target
// keyspace. If the backing table is updated to a new higher value then
// it will also tell the primary tablet serving the sequence to
// refresh/reset its cache to be sure that it does not provide a value
// that is less than the current max.
func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error {
maxValues, err := ts.getMaxSequenceValues(ctx, sequencesByBackingTable)
if err != nil {
Expand Down Expand Up @@ -602,204 +629,6 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa
return sequencesByBackingTable, allFullyQualified, nil
}

// initializeTargetSequences initializes the backing sequence tables
// using a map keyed by the backing sequence table name.
//
// The backing tables must have already been created, unless a default
// global keyspace exists for the trafficSwitcher -- in which case we
// will create the backing table there if needed.

// This function will then ensure that the next value is set to a value
// greater than any currently stored in the using table on the target
// keyspace. If the backing table is updated to a new higher value then
// it will also tell the primary tablet serving the sequence to
// refresh/reset its cache to be sure that it does not provide a value
// that is less than the current max.
func (ts *trafficSwitcher) initializeTargetSequences2(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error {
initSequenceTable := func(ictx context.Context, sequenceMetadata *sequenceMetadata) error {
// Now we need to run this query on the target shards in order
// to get the max value and set the next id for the sequence to
// a higher value.
shardResults := make([]int64, 0, len(ts.TargetShards()))
srMu := sync.Mutex{}
ierr := ts.ForAllTargets(func(target *MigrationTarget) error {
primary := target.GetPrimary()
if primary == nil || primary.GetAlias() == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for target shard %s/%s",
ts.targetKeyspace, target.GetShard().ShardName())
}
usingCol, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDefinition.AutoIncrement.Column)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid column name %s specified for sequence in table %s: %v",
sequenceMetadata.usingTableDefinition.AutoIncrement.Column, sequenceMetadata.usingTableName, err)
}
usingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDBName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s specified for sequence in table %s: %v",
sequenceMetadata.usingTableDBName, sequenceMetadata.usingTableName, err)
}
usingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name specified for sequence in table %s: %v",
sequenceMetadata.usingTableName, err)
}
query := sqlparser.BuildParsedQuery(sqlGetMaxSequenceVal,
usingCol,
usingDB,
usingTable,
)
qr, terr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primary.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{
Query: []byte(query.Query),
MaxRows: 1,
})
if terr != nil || len(qr.Rows) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr)
}
rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0]
maxID := int64(0)
if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max
maxID, terr = rawVal.ToInt64()
if terr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr)
}
}
srMu.Lock()
defer srMu.Unlock()
shardResults = append(shardResults, maxID)
return nil
})
if ierr != nil {
return ierr
}
select {
case <-ictx.Done():
return ictx.Err()
default:
}
if len(shardResults) == 0 { // This should never happen
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get any results for the max used sequence value for target table %s.%s in order to initialize the backing sequence table",
ts.targetKeyspace, sequenceMetadata.usingTableName)
}
// Sort the values to find the max value across all shards.
sort.Slice(shardResults, func(i, j int) bool {
return shardResults[i] < shardResults[j]
})
nextVal := shardResults[len(shardResults)-1] + 1
// Now we need to update the sequence table, if needed, in order to
// ensure that that the next value it provides is > the current max.
sequenceShard, ierr := ts.TopoServer().GetOnlyShard(ictx, sequenceMetadata.backingTableKeyspace)
if ierr != nil || sequenceShard == nil || sequenceShard.PrimaryAlias == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v",
sequenceMetadata.backingTableKeyspace, ierr)
}
sequenceTablet, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias)
if ierr != nil || sequenceTablet == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v",
sequenceMetadata.backingTableKeyspace, ierr)
}
select {
case <-ictx.Done():
return ictx.Err()
default:
}
if sequenceTablet.DbNameOverride != "" {
sequenceMetadata.backingTableDBName = sequenceTablet.DbNameOverride
}
backingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableDBName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s in sequence backing table %s: %v",
sequenceMetadata.backingTableDBName, sequenceMetadata.backingTableName, err)
}
backingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence backing table name %s: %v",
sequenceMetadata.backingTableName, err)
}
query := sqlparser.BuildParsedQuery(sqlInitSequenceTable,
backingDB,
backingTable,
nextVal,
nextVal,
nextVal,
)
// Now execute this on the primary tablet of the unsharded keyspace
// housing the backing table.
initialize:
qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{
Query: []byte(query.Query),
MaxRows: 1,
})
if ierr != nil {
vterr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to initialize the backing sequence table %s.%s: %v",
sequenceMetadata.backingTableDBName, sequenceMetadata.backingTableName, ierr)
// If the sequence table doesn't exist, let's try and create it, otherwise
// return the error.
if sqlErr, ok := sqlerror.NewSQLErrorFromError(ierr).(*sqlerror.SQLError); !ok ||
(sqlErr.Num != sqlerror.ERNoSuchTable && sqlErr.Num != sqlerror.ERBadTable) {
return vterr
}
stmt := sqlparser.BuildParsedQuery(sqlCreateSequenceTable, backingTable)
_, ierr = ts.ws.tmc.ApplySchema(ctx, sequenceTablet.Tablet, &tmutils.SchemaChange{
SQL: stmt.Query,
Force: false,
AllowReplication: true,
SQLMode: vreplication.SQLMode,
DisableForeignKeyChecks: true,
})
if ierr != nil {
return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", err)
}
select {
case <-ctx.Done():
return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", ctx.Err())
default:
goto initialize
}
}
// If we actually updated the backing sequence table, then we need
// to tell the primary tablet managing the sequence to refresh/reset
// its cache for the table.
if qr.RowsAffected == 0 {
return nil
}
select {
case <-ictx.Done():
return ictx.Err()
default:
}
ts.Logger().Infof("Resetting sequence cache for backing table %s on shard %s/%s using tablet %s",
sequenceMetadata.backingTableName, sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias)
ti, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias)
if ierr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get primary tablet for keyspace %s: %v",
sequenceMetadata.backingTableKeyspace, ierr)
}
// ResetSequences interfaces with the schema engine and the actual
// table identifiers DO NOT contain the backticks. So we have to
// ensure that the table name is unescaped.
unescapedBackingTable, err := sqlescape.UnescapeID(backingTable)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence backing table name %s: %v", backingTable, err)
}
ierr = ts.TabletManagerClient().ResetSequences(ictx, ti.Tablet, []string{unescapedBackingTable})
if ierr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to reset the sequence cache for backing table %s on shard %s/%s using tablet %s: %v",
sequenceMetadata.backingTableName, sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr)
}
return nil
}

initGroup, gctx := errgroup.WithContext(ctx)
for _, sequenceMetadata := range sequencesByBackingTable {
initGroup.Go(func() error {
return initSequenceTable(gctx, sequenceMetadata)
})
}
return initGroup.Wait()
}

func (ts *trafficSwitcher) mustResetSequences(ctx context.Context) (bool, error) {
switch ts.workflowType {
case binlogdatapb.VReplicationWorkflowType_Migrate,
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vtctl/workflow/sequences_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2025 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
Expand Down

0 comments on commit 5273121

Please sign in to comment.