Skip to content

Commit

Permalink
Replicas should be able to heal if replication is not initialised pro…
Browse files Browse the repository at this point in the history
…perly (#10943)

* feat: add code to also reset replication parameters in setReplicationSourceLocked when required

Signed-off-by: Manan Gupta <manan@planetscale.com>

* test: fix tests to reflect the change

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 authored Aug 9, 2022
1 parent 522694e commit 7f25195
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 32 deletions.
1 change: 1 addition & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host strin
if stopReplicationBefore {
cmds = append(cmds, "STOP SLAVE")
}
cmds = append(cmds, "RESET SLAVE ALL")
cmds = append(cmds, "FAKE SET MASTER")
if startReplicationAfter {
cmds = append(cmds, "START SLAVE")
Expand Down
8 changes: 8 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por
if replicationStopBefore {
cmds = append(cmds, conn.StopReplicationCommand())
}
// Reset replication parameters commands makes the instance forget the source host port
// This is required because sometimes MySQL gets stuck due to improper initialization of
// master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails.
// Therefore, we have elected to always reset the replication parameters whenever we try to set the source host port
// Since there is no real overhead, but it makes this function robust enough to also handle failures like these.
cmds = append(cmds, conn.ResetReplicationParametersCommands()...)
// If flag value is same as default, check deprecated flag value
if *replicationConnectRetry == 10*time.Second && *masterConnectRetry != *replicationConnectRetry {
*replicationConnectRetry = *masterConnectRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,25 @@ func TestInitShardPrimary(t *testing.T) {

tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// These come from InitShardPrimary
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
tablet2.FakeMysqlDaemon.SetReplicationSourceInputs = append(tablet2.FakeMysqlDaemon.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", tablet1.Tablet.Hostname, tablet1.Tablet.MysqlPort))

tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -127,6 +131,7 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -135,6 +140,7 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,14 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
}
host := parent.Tablet.MysqlHostname
port := int(parent.Tablet.MysqlPort)
if status.SourceHost != host || status.SourcePort != port {
// This handles both changing the address and starting replication.
// We want to reset the replication parameters and set replication source again when forceStartReplication is provided
// because sometimes MySQL gets stuck due to improper initialization of master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails. So when this RPC
// gets called from VTOrc or replication manager to fix the replication in these cases with forceStartReplication, we should also
// reset the replication parameters and set the source port information again.
if status.SourceHost != host || status.SourcePort != port || forceStartReplication {
// This handles reseting the replication parameters, changing the address and then starting the replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, wasReplicating, shouldbeReplicating); err != nil {
if err := tm.handleRelayLogError(err); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func TestCheckPrimaryShip(t *testing.T) {
fakeMysql := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon)
fakeMysql.SetReplicationSourceInputs = append(fakeMysql.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", otherTablet.MysqlHostname, otherTablet.MysqlPort))
fakeMysql.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
25 changes: 19 additions & 6 deletions go/vt/wrangler/testlib/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
},
}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -202,6 +203,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -238,12 +240,14 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -296,6 +300,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -423,7 +428,8 @@ func TestBackupRestoreLagged(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -434,6 +440,7 @@ func TestBackupRestoreLagged(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -491,12 +498,14 @@ func TestBackupRestoreLagged(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -642,7 +651,8 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -653,6 +663,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -682,12 +693,14 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/testlib/copy_schema_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func copySchema(t *testing.T, useShardAsSource bool) {
sourceRdonly := NewFakeTablet(t, wr, "cell1", 1,
topodatapb.TabletType_RDONLY, sourceRdonlyDb, TabletKeyspaceShard(t, "ks", "-80"))
sourceRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
13 changes: 10 additions & 3 deletions go/vt/wrangler/testlib/emergency_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ func TestEmergencyReparentShard(t *testing.T) {
goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition)
goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -167,9 +169,11 @@ func TestEmergencyReparentShard(t *testing.T) {
goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition)
goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
}
goodReplica2.StartActionLoop(t, wr)
Expand Down Expand Up @@ -232,6 +236,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) {
newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
Expand Down Expand Up @@ -267,11 +272,13 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) {
moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition)
newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentPrimaryPosition)
moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
14 changes: 11 additions & 3 deletions go/vt/wrangler/testlib/external_reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestTabletExternallyReparentedBasic(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand Down Expand Up @@ -170,6 +171,7 @@ func TestTabletExternallyReparentedToReplica(t *testing.T) {
// primary is still good to go.
oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand Down Expand Up @@ -248,6 +250,7 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -260,7 +263,8 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {
// TabletActionReplicaWasRestarted and point to the new mysql port
goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -335,6 +339,7 @@ func TestTabletExternallyReparentedContinueOnUnexpectedPrimary(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -347,7 +352,8 @@ func TestTabletExternallyReparentedContinueOnUnexpectedPrimary(t *testing.T) {
// TabletActionReplicaWasRestarted and point to a bad host
goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -418,6 +424,7 @@ func TestTabletExternallyReparentedRerun(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -430,7 +437,8 @@ func TestTabletExternallyReparentedRerun(t *testing.T) {
// On the good replica, we will respond to
// TabletActionReplicaWasRestarted.
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/testlib/permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ func TestPermissions(t *testing.T) {
}
replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
Loading

0 comments on commit 7f25195

Please sign in to comment.