Skip to content

Commit

Permalink
Return SourceAlias in FullStatus RPC, use to optimize VTOrc
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Feb 21, 2025
1 parent c9c227d commit b3a8bf4
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 138 deletions.
290 changes: 159 additions & 131 deletions go/vt/proto/replicationdata/replicationdata.pb.go

Large diffs are not rendered by default.

107 changes: 107 additions & 0 deletions go/vt/proto/replicationdata/replicationdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ CREATE TABLE database_instance (
log_replica_updates tinyint NOT NULL,
binary_log_file varchar(128) NOT NULL,
binary_log_pos bigint NOT NULL,
source_alias varchar(256) NOT NULL,
source_host varchar(128) NOT NULL,
source_port smallint NOT NULL,
replica_net_timeout int NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtorc/inst/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Instance struct {
LogBinEnabled bool
LogReplicationUpdatesEnabled bool
SelfBinlogCoordinates BinlogCoordinates
SourceAlias string
SourceHost string
SourcePort int
SourceUUID string
Expand Down
29 changes: 23 additions & 6 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named

instance.SourceHost = fs.ReplicationStatus.SourceHost
instance.SourcePort = int(fs.ReplicationStatus.SourcePort)
instance.SourceAlias = topoproto.TabletAliasString(fs.SourceAlias)

if fs.ReplicationStatus.ReplicationLagUnknown {
instance.SecondsBehindPrimary.Valid = false
Expand Down Expand Up @@ -478,21 +479,34 @@ func ReadInstanceClusterAttributes(instance *Instance) (err error) {
var primaryExecutedGtidSet string
primaryDataFound := false

query := `SELECT
primaryAlias := instance.SourceAlias
primaryHostname := instance.SourceHost
primaryPort := instance.SourcePort

whereCond := `alias = ?`
args := sqlutils.Args(primaryAlias)

// Fallback to hostname + port query if no primaryAlias defined (added in v22)
if primaryAlias == "" {
whereCond = `hostname = ? AND port = ?`
args = sqlutils.Args(primaryHostname, primaryPort)
}

query := fmt.Sprintf(`SELECT
replication_depth,
source_host,
source_port,
ancestry_uuid,
executed_gtid_set
FROM database_instance
WHERE
hostname = ?
AND port = ?`
primaryHostname := instance.SourceHost
primaryPort := instance.SourcePort
args := sqlutils.Args(primaryHostname, primaryPort)
%s`,
whereCond,
)

err = db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error {
primaryReplicationDepth = m.GetUint("replication_depth")
primaryAlias = m.GetString("alias")
primaryHostname = m.GetString("source_host")
primaryPort = m.GetInt("source_port")
ancestryUUID = m.GetString("ancestry_uuid")
Expand Down Expand Up @@ -536,6 +550,7 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
instance.BinlogRowImage = m.GetString("binlog_row_image")
instance.LogBinEnabled = m.GetBool("log_bin")
instance.LogReplicationUpdatesEnabled = m.GetBool("log_replica_updates")
instance.SourceAlias = m.GetString("source_alias")
instance.SourceHost = m.GetString("source_host")
instance.SourcePort = m.GetInt("source_port")
instance.ReplicaNetTimeout = m.GetInt32("replica_net_timeout")
Expand Down Expand Up @@ -836,6 +851,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
"log_replica_updates",
"binary_log_file",
"binary_log_pos",
"source_alias",
"source_host",
"source_port",
"replica_net_timeout",
Expand Down Expand Up @@ -916,6 +932,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
args = append(args, instance.LogReplicationUpdatesEnabled)
args = append(args, instance.SelfBinlogCoordinates.LogFile)
args = append(args, instance.SelfBinlogCoordinates.LogPos)
args = append(args, instance.SourceAlias)
args = append(args, instance.SourceHost)
args = append(args, instance.SourcePort)
args = append(args, instance.ReplicaNetTimeout)
Expand Down
21 changes: 20 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,24 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
// Semi sync status - "show status like 'Rpl_semi_sync_%_status'"
primarySemiSyncStatus, replicaSemiSyncStatus := tm.MysqlDaemon.SemiSyncStatus(ctx)

// Semi sync clients count - "show status like 'semi_sync_source_clients'"
// Semi sync clients count - "show status like 'semi_sync_source_clients'"
semiSyncClients := tm.MysqlDaemon.SemiSyncClients(ctx)

// Semi sync settings - "show status like 'rpl_semi_sync_%'
semiSyncTimeout, semiSyncNumReplicas := tm.MysqlDaemon.SemiSyncSettings(ctx)

// Replication configuration
replConfiguration, err := tm.MysqlDaemon.ReplicationConfiguration(ctx)
if err != nil {
return nil, err
}

// Replication source tablet alias
sourceAlias, err := tm.getReplicationSourceAlias(ctx)
if err != nil {
return nil, err
}

return &replicationdatapb.FullStatus{
ServerId: serverID,
ServerUuid: serverUUID,
Expand All @@ -189,6 +196,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
SemiSyncWaitForReplicaCount: semiSyncNumReplicas,
SuperReadOnly: superReadOnly,
ReplicationConfiguration: replConfiguration,
SourceAlias: sourceAlias,
}, nil
}

Expand Down Expand Up @@ -731,6 +739,14 @@ func (tm *TabletManager) SetReplicationSource(ctx context.Context, parentAlias *
return tm.setReplicationSourceLocked(ctx, parentAlias, timeCreatedNS, waitPosition, forceStartReplication, semiSyncAction, heartbeatInterval)
}

func (tm *TabletManager) getReplicationSourceAlias(ctx context.Context) (*topodatapb.TabletAlias, error) {
if err := tm.lock(ctx); err != nil {
return nil, err
}
defer tm.unlock()
return tm.sourceAlias, nil
}

func (tm *TabletManager) setReplicationSourceSemiSyncNoAction(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool) error {
log.Infof("SetReplicationSource: parent: %v position: %v force: %v", parentAlias, waitPosition, forceStartReplication)
if err := tm.lock(ctx); err != nil {
Expand Down Expand Up @@ -883,6 +899,9 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
}
}

// Store the current primary alias
tm.sourceAlias = parentAlias

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type TabletManager struct {
// tabletAlias is saved away from tablet for read-only access
tabletAlias *topodatapb.TabletAlias

// sourceAlias is the current replication source.
sourceAlias *topodatapb.TabletAlias

// baseTabletType is the tablet type we revert back to
// when we transition back from something like PRIMARY.
baseTabletType topodatapb.TabletType
Expand Down

0 comments on commit b3a8bf4

Please sign in to comment.