From 819481ef946b883f96a7a7846cbe641f1a3094c6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jul 2023 13:20:58 -0400 Subject: [PATCH 01/15] Don't include unhealthy tablets in candidate list Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 83 ++++++++++--------- .../tabletmanager/vreplication/engine.go | 2 +- .../vttablet/tabletserver/health_streamer.go | 1 + 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 20d4126831a..6af883deb89 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -26,8 +26,10 @@ import ( "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/topo/topoproto" + querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vttablet/tabletconn" @@ -284,9 +286,8 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - rand.Seed(time.Now().UnixNano()) - // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found - // or the context is canceled + // Keep trying at intervals (tabletPickerRetryDelay) until a tablet + // is found or the context is cancelled. for { select { case <-ctx.Done(): @@ -319,15 +320,15 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } else if tp.inOrder { candidates = tp.orderByTabletType(candidates) } else { - // Randomize candidates + // Randomize candidates. rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] }) } if len(candidates) == 0 { - // if no candidates were found, sleep and try again + // If no candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds", + log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { @@ -336,60 +337,54 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") case <-timer.C: } + // Got here? Means we iterated all tablets and did not find a + // healthy viable candidate. + tp.incNoTabletFoundStat() continue } - for _, ti := range candidates { - // try to connect to tablet - if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil { - // OK to use ctx here because it is not actually used by the underlying Close implementation - _ = conn.Close(ctx) - log.Infof("tablet picker found tablet %s", ti.Tablet.String()) - return ti.Tablet, nil - } - // err found - log.Warningf("unable to connect to tablet for alias %v", ti.Alias) - } - // Got here? Means we iterated all tablets and did not find a healthy one - tp.incNoTabletFoundStat() + log.Infof("Tablet picker found healhty tablet for streaming: %s", candidates[0].Tablet.String()) + return candidates[0].Tablet, nil } } -// GetMatchingTablets returns a list of TabletInfo for tablets -// that match the cells, keyspace, shard and tabletTypes for this TabletPicker +// GetMatchingTablets returns a list of TabletInfo for healthy +// tablets that match the cells, keyspace, shard and tabletTypes +// for this TabletPicker. func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo { - // Special handling for PRIMARY tablet type - // Since there is only one primary, we ignore cell and find the primary + // Special handling for PRIMARY tablet type: since there is only + // one primary per shard, we ignore cell and find the primary. aliases := make([]*topodatapb.TabletAlias, 0) if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard) if err != nil { - log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error()) + log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err) return nil } aliases = append(aliases, si.PrimaryAlias) } else { actualCells := make([]string, 0) for _, cell := range tp.cells { - // check if cell is actually an alias - // non-blocking read so that this is fast + // Check if cell is actually an alias; using a + // non-blocking read so that this is fast. shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() _, err := tp.ts.GetCellInfo(shortCtx, cell, false) if err != nil { - // not a valid cell, check whether it is a cell alias + // Not a valid cell, check whether it is a cell alias... shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false) - // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue + // If we get an error, either cellAlias doesn't exist or + // it isn't a cell alias at all; ignore and continue. if err == nil { actualCells = append(actualCells, alias.Cells...) } else { log.Infof("Unable to resolve cell %s, ignoring", cell) } } else { - // valid cell, add it to our list + // Valid cell, add it to our list. actualCells = append(actualCells, cell) } } @@ -397,12 +392,11 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - // match cell, keyspace and shard + // Match cell, keyspace, and shard. sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard) if err != nil { continue } - for _, node := range sri.Nodes { aliases = append(aliases, node.TabletAlias) } @@ -412,25 +406,39 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if len(aliases) == 0 { return nil } + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) if err != nil { - log.Warningf("error fetching tablets from topo: %v", err) - // If we get a partial result we can still use it, otherwise return + log.Warningf("Error fetching tablets from topo: %v", err) + // If we get a partial result we can still use it, otherwise return. if len(tabletMap) == 0 { return nil } } + tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] if !ok { - // Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores - // topo.ErrNoNode). Just log a warning - log.Warningf("failed to load tablet %v", tabletAlias) + // Either tablet disappeared on us, or we got a partial result + // (GetTabletMap ignores topo.ErrNoNode); just log a warning. + log.Warningf("Tablet picker failed to load tablet %v", tabletAlias) } else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) { - tablets = append(tablets, tabletInfo) + // Try to connect to the tablet and confirm that it's usable. + if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { + // Ensure that the tablet is healthy and serving. + if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { + if shr.RealtimeStats.HealthError == "" && shr.Serving { + return nil + } + return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving") + }); err == nil { + _ = conn.Close(ctx) + tablets = append(tablets, tabletInfo) + } + } } } return tablets @@ -438,7 +446,6 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn func init() { // TODO(sougou): consolidate this call to be once per process. - rand.Seed(time.Now().UnixNano()) globalTPStats = newTabletPickerStats() } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 52f9c072d49..755066d9273 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -791,7 +791,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int32, pos string) error { return fmt.Errorf("unexpected result: %v", qr) } - // When err is not nil then we got a retryable error and will loop again + // When err is not nil then we got a retryable error and will loop again. if err == nil { current, dcerr := binlogplayer.DecodePosition(qr.Rows[0][0].ToString()) if dcerr != nil { diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index ad9acf495d8..e8de6301616 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -181,6 +181,7 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str } return err } + return nil } } } From daa70f1635110d25ebe6988b2d4d77f289598656 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jul 2023 14:17:37 -0400 Subject: [PATCH 02/15] Check source tablet health on error and pick new one if unhealthy Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 2 +- .../tabletmanager/vreplication/controller.go | 94 ++++++++++++++----- .../tabletmanager/vreplication/stats.go | 7 +- .../tabletmanager/vreplication/stats_test.go | 16 +++- 4 files changed, 91 insertions(+), 28 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 6af883deb89..d1f7b679c9c 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -342,7 +342,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table tp.incNoTabletFoundStat() continue } - log.Infof("Tablet picker found healhty tablet for streaming: %s", candidates[0].Tablet.String()) + log.Infof("Tablet picker found healthy tablet for streaming: %s", candidates[0].Tablet.String()) return candidates[0].Tablet, nil } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 34e28147ff1..b575ae86052 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -27,7 +27,9 @@ import ( "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -36,7 +38,9 @@ import ( "vitess.io/vitess/go/vt/topo" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -85,7 +89,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor done: make(chan struct{}), source: &binlogdatapb.BinlogSource{}, } - ct.sourceTablet.Store("") + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) // id @@ -180,7 +184,7 @@ func (ct *controller) run(ctx context.Context) { func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { - ct.sourceTablet.Store("") + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) if x := recover(); x != nil { log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) err = fmt.Errorf("panic: %v", x) @@ -199,25 +203,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { } defer dbClient.Close() - var tablet *topodatapb.Tablet - if ct.source.GetExternalMysql() == "" { - log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) - tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) - defer tpCancel() - tablet, err = ct.tabletPicker.PickForStreaming(tpCtx) - if err != nil { - select { - case <-ctx.Done(): - default: - ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) - ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) - } - return err - } - ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) - log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) - ct.sourceTablet.Store(tablet.Alias.String()) + tablet, err := ct.pickSourceTablet(ctx, dbClient) + if err != nil { + return err } + switch { case len(ct.source.Tables) > 0: // Table names can have search patterns. Resolve them against the schema. @@ -266,8 +256,17 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre) err = vr.Replicate(ctx) ct.lastWorkflowError.Record(err) + + // If the source tablet is now unhealthy, then pick a new one. + if ct.sourceTabletIsUnhealthy() { + if _, err = ct.pickSourceTablet(ctx, dbClient); err != nil { + return err + } + } + // If this is a mysql error that we know needs manual intervention OR - // we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError) + // we cannot identify this as non-recoverable, but it has persisted + // beyond the retry limit (maxTimeToRetryError). if isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { @@ -293,6 +292,59 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string) } return nil } + +// pickSourceTablet picks a healthy tablet to source for the +// vreplication stream. If the source is marked as external, it +// returns nil. +func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (tablet *topodatapb.Tablet, err error) { + if ct.source.GetExternalMysql() == "" { + log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", + ct.id, ct.workflow) + tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) + defer tpCancel() + tablet, err = ct.tabletPicker.PickForStreaming(tpCtx) + if err != nil { + select { + case <-ctx.Done(): + default: + ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) + ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) + } + return tablet, err + } + ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) + log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", + tablet.Alias.String(), ct.id, ct.workflow) + ct.sourceTablet.Store(tablet.Alias) + } + return tablet, err +} + +func (ct *controller) sourceTabletIsUnhealthy() bool { + ctx, cancel := context.WithTimeout(ct.vre.ctx, topo.RemoteOperationTimeout) + defer cancel() + tabletAlias := ct.sourceTablet.Load().(*topodatapb.TabletAlias) + tabletInfo, err := ct.vre.ts.GetTablet(ctx, tabletAlias) + if err != nil { + log.Warningf("Unable to get tablet info for %v when checking the source tablet's health for the vreplication controller %d for workflow %s: %v", + tabletAlias, ct.id, ct.workflow, err) + return false + } + if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { + // Ensure that the tablet is healthy and serving. + if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { + if shr.RealtimeStats.HealthError == "" && shr.Serving { + return nil + } + return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving") + }); err == nil { + _ = conn.Close(ctx) + return true + } + } + return false +} + func (ct *controller) Stop() { ct.cancel() <-ct.done diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index fbf53fa7da4..909137cf5cf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -24,6 +24,7 @@ import ( "time" "vitess.io/vitess/go/vt/binlog/binlogplayer" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/servenv" @@ -152,7 +153,7 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Load().(string) + result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Load().(*topodatapb.TabletAlias).String() } return result })) @@ -468,7 +469,7 @@ func (st *vrStats) status() *EngineStatus { ReplicationLagSeconds: ct.blpStats.ReplicationLagSeconds.Load(), Counts: ct.blpStats.Timings.Counts(), Rates: ct.blpStats.Rates.Get(), - SourceTablet: ct.sourceTablet.Load().(string), + SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias), Messages: ct.blpStats.MessageHistory(), QueryCounts: ct.blpStats.QueryCount.Counts(), PhaseTimings: ct.blpStats.PhaseTimings.Counts(), @@ -506,7 +507,7 @@ type ControllerStatus struct { Counts map[string]int64 Rates map[string][]float64 State string - SourceTablet string + SourceTablet *topodatapb.TabletAlias Messages []string QueryCounts map[string]int64 PhaseTimings map[string]int64 diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 2a05a726b5d..db8a07f38cb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var wantOut = ` @@ -107,8 +108,14 @@ func TestStatusHtml(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Store("src1") - testStats.controllers[2].sourceTablet.Store("src2") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) + testStats.controllers[2].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 02, + }) close(testStats.controllers[2].done) tpl := template.Must(template.New("test").Parse(vreplicationTemplate)) @@ -135,7 +142,10 @@ func TestVReplicationStats(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Store("src1") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) sleepTime := 1 * time.Millisecond record := func(phase string) { From a003580d8eb929994a0c49d2aae11929ad421852 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 21 Jul 2023 12:25:47 -0400 Subject: [PATCH 03/15] Leverage retry in controller.run() with tablet selection in runBlp() Signed-off-by: Matt Lord --- .../tabletmanager/vreplication/controller.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index b575ae86052..e8a0f18664f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -257,11 +257,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { err = vr.Replicate(ctx) ct.lastWorkflowError.Record(err) - // If the source tablet is now unhealthy, then pick a new one. + // If the source tablet has become unhealthy then we need to wait + // and re-run the binlog player again, thus picking a new source + // tablet. if ct.sourceTabletIsUnhealthy() { - if _, err = ct.pickSourceTablet(ctx, dbClient); err != nil { - return err - } + return vterrors.NewErrorf(vtrpcpb.Code_INTERNAL, vterrors.ServerNotAvailable, + "source tablet %s is unhealthy", ct.sourceTablet.Load().(*topodatapb.TabletAlias).String()) } // If this is a mysql error that we know needs manual intervention OR @@ -270,7 +271,8 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { - log.Errorf("INTERNAL: unable to setState() in controller. Attempting to set error text: [%v]; setState() error is: %v", err, errSetState) + log.Errorf("INTERNAL: unable to setState() in controller. Attempting to set error text: [%v]; setState() error is: %v", + err, errSetState) return err // yes, err and not errSetState. } return nil // this will cause vreplicate to quit the workflow From d5499e4643891bf860288adca5f8fc072c14a6e1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 21 Jul 2023 15:46:42 -0400 Subject: [PATCH 04/15] Fix existing unit tests Signed-off-by: Matt Lord --- go/vt/wrangler/fake_tablet_test.go | 26 ++++++++++-- go/vt/wrangler/traffic_switcher_env_test.go | 44 +++++++++++++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 8ffae2d7328..fde7e44e2c5 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -32,6 +32,8 @@ import ( "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/grpctmserver" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager" "vitess.io/vitess/go/vt/vttablet/tabletservermock" @@ -48,6 +50,12 @@ import ( _ "vitess.io/vitess/go/vt/vttablet/grpctabletconn" ) +func init() { + // enforce we will use the right protocol (gRPC) in all unit tests + tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") + tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +} + // This file was copied from testlib. All tests from testlib should be moved // to the current directory. In order to move tests from there, we have to // remove the circular dependency it causes (through vtctl dependence). @@ -81,6 +89,8 @@ type fakeTablet struct { StartHTTPServer bool HTTPListener net.Listener HTTPServer *http.Server + + queryservice.QueryService } // TabletOption is an interface for changing tablet parameters. @@ -141,6 +151,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy Tablet: tablet, FakeMysqlDaemon: fakeMysqlDaemon, RPCServer: grpc.NewServer(), + QueryService: fakes.ErrorQueryService, } } @@ -238,8 +249,15 @@ func (ft *fakeTablet) Target() querypb.Target { } } -func init() { - // enforce we will use the right protocol (gRPC) in all unit tests - tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") - tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +func (ft *fakeTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: ft.Tablet.Keyspace, + Shard: ft.Tablet.Shard, + Cell: ft.Tablet.Alias.Cell, + TabletType: ft.Tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) } diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index f493e7b5939..609b4c60a6c 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -19,6 +19,7 @@ package wrangler import ( "context" "fmt" + "math/rand" "sync" "testing" "time" @@ -30,6 +31,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -40,6 +42,9 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -139,6 +144,19 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -307,6 +325,19 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -452,6 +483,19 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschema.Vindex{ From ca220b07297731b84ddf9f6a78b85ffe3085f3bf Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 21 Jul 2023 17:05:33 -0400 Subject: [PATCH 05/15] Correct health streamer behavior Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 7 ++++--- go/vt/vttablet/tabletmanager/vreplication/controller.go | 7 ++++--- go/vt/vttablet/tabletserver/health_streamer.go | 1 - go/vt/wrangler/fake_tablet_test.go | 1 - go/vt/wrangler/traffic_switcher_env_test.go | 6 +++--- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index d1f7b679c9c..a89c782fa4a 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -18,6 +18,7 @@ package discovery import ( "fmt" + "io" "math/rand" "sort" "strings" @@ -431,13 +432,13 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn // Ensure that the tablet is healthy and serving. if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { if shr.RealtimeStats.HealthError == "" && shr.Serving { - return nil + return io.EOF // End the stream } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving") - }); err == nil { - _ = conn.Close(ctx) + }); err == nil || err == io.EOF { tablets = append(tablets, tabletInfo) } + _ = conn.Close(ctx) } } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index e8a0f18664f..efdb6ae684b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -19,6 +19,7 @@ package vreplication import ( "context" "fmt" + "io" "strconv" "strings" "sync/atomic" @@ -336,13 +337,13 @@ func (ct *controller) sourceTabletIsUnhealthy() bool { // Ensure that the tablet is healthy and serving. if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { if shr.RealtimeStats.HealthError == "" && shr.Serving { - return nil + return io.EOF // End the stream } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving") - }); err == nil { - _ = conn.Close(ctx) + }); err == nil || err == io.EOF { return true } + _ = conn.Close(ctx) } return false } diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index e8de6301616..ad9acf495d8 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -181,7 +181,6 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str } return err } - return nil } } } diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index fde7e44e2c5..b0cbd6b8c0f 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -255,7 +255,6 @@ func (ft *fakeTablet) StreamHealth(ctx context.Context, callback func(*querypb.S Target: &querypb.Target{ Keyspace: ft.Tablet.Keyspace, Shard: ft.Tablet.Shard, - Cell: ft.Tablet.Alias.Cell, TabletType: ft.Tablet.Type, }, RealtimeStats: &querypb.RealtimeStats{}, diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 609b4c60a6c..6da0e386487 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -155,7 +155,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, } return nil, nil }) - tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) vs := &vschemapb.Keyspace{ Sharded: true, @@ -336,7 +336,7 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar } return nil, nil }) - tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) vs := &vschemapb.Keyspace{ Sharded: true, @@ -494,7 +494,7 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe } return nil, nil }) - tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) vs := &vschemapb.Keyspace{ Sharded: true, From 8eac121f5e70cf49d889c0f2bc910160ed53d794 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 10:56:51 -0400 Subject: [PATCH 06/15] Simplify/minimize changes Signed-off-by: Matt Lord --- .../tabletmanager/vreplication/controller.go | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index efdb6ae684b..88152af57dd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -19,7 +19,6 @@ package vreplication import ( "context" "fmt" - "io" "strconv" "strings" "sync/atomic" @@ -28,9 +27,7 @@ import ( "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/vt/discovery" - "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -39,9 +36,7 @@ import ( "vitess.io/vitess/go/vt/topo" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -258,14 +253,6 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { err = vr.Replicate(ctx) ct.lastWorkflowError.Record(err) - // If the source tablet has become unhealthy then we need to wait - // and re-run the binlog player again, thus picking a new source - // tablet. - if ct.sourceTabletIsUnhealthy() { - return vterrors.NewErrorf(vtrpcpb.Code_INTERNAL, vterrors.ServerNotAvailable, - "source tablet %s is unhealthy", ct.sourceTablet.Load().(*topodatapb.TabletAlias).String()) - } - // If this is a mysql error that we know needs manual intervention OR // we cannot identify this as non-recoverable, but it has persisted // beyond the retry limit (maxTimeToRetryError). @@ -323,31 +310,6 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye return tablet, err } -func (ct *controller) sourceTabletIsUnhealthy() bool { - ctx, cancel := context.WithTimeout(ct.vre.ctx, topo.RemoteOperationTimeout) - defer cancel() - tabletAlias := ct.sourceTablet.Load().(*topodatapb.TabletAlias) - tabletInfo, err := ct.vre.ts.GetTablet(ctx, tabletAlias) - if err != nil { - log.Warningf("Unable to get tablet info for %v when checking the source tablet's health for the vreplication controller %d for workflow %s: %v", - tabletAlias, ct.id, ct.workflow, err) - return false - } - if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { - // Ensure that the tablet is healthy and serving. - if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { - if shr.RealtimeStats.HealthError == "" && shr.Serving { - return io.EOF // End the stream - } - return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving") - }); err == nil || err == io.EOF { - return true - } - _ = conn.Close(ctx) - } - return false -} - func (ct *controller) Stop() { ct.cancel() <-ct.done From 2fed0455adc5a0e73bbb9e03e054ea69af32df8f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 13:12:14 -0400 Subject: [PATCH 07/15] Unit test covering new behavior Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker_test.go | 60 +++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 88368c02a60..45973eee328 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -479,6 +479,45 @@ func TestPickErrorOnlySpecified(t *testing.T) { require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0)) } +// TestPickFallbackType tests that when providing a list of tablet types to +// pick from, with the list in preference order, that when the primary/first +// type has no available healthy tablets that we select a healthy tablet from +// the secondary/second type. +func TestPickFallbackType(t *testing.T) { + cells := []string{"cell1", "cell2"} + localCell := cells[0] + tabletTypes := "replica,primary" + options := TabletPickerOptions{ + TabletOrder: "InOrder", + } + te := newPickerTestEnv(t, cells) + + // This one should be selected even though it's the secondary type + // as it is healthy and serving. + primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Replica tablet should not be selected as it is unhealthy. + replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false) + defer deleteTablet(t, te, replicaTablet) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) +} + type pickerTestEnv struct { t *testing.T keyspace string @@ -527,18 +566,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell err := te.topoServ.CreateTablet(context.Background(), tablet) require.NoError(te.t, err) + shr := &querypb.StreamHealthResponse{ + Serving: serving, + Target: &querypb.Target{ + Keyspace: te.keyspace, + Shard: te.shard, + TabletType: tabletType, + }, + RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"}, + } if healthy { - _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ - Serving: serving, - Target: &querypb.Target{ - Keyspace: te.keyspace, - Shard: te.shard, - TabletType: tabletType, - }, - RealtimeStats: &querypb.RealtimeStats{HealthError: ""}, - }) + shr.RealtimeStats.HealthError = "" } + _ = createFixedHealthConn(tablet, shr) + return tablet } From 5ef32fe88c5628fb943dbf8c9474231a5940cfc5 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 14:04:49 -0400 Subject: [PATCH 08/15] Minor tweaks after self review Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 21 +++++++++---------- go/vt/discovery/tablet_picker_test.go | 9 ++++---- .../tabletmanager/vreplication/controller.go | 4 ++-- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index a89c782fa4a..6f62937c803 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -283,12 +283,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo return candidates } -// PickForStreaming picks an available tablet. +// PickForStreaming picks a tablet that is healthy and serving. // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - // Keep trying at intervals (tabletPickerRetryDelay) until a tablet - // is found or the context is cancelled. + // Keep trying at intervals (tabletPickerRetryDelay) until a healthy + // serving tablet is found or the context is cancelled. for { select { case <-ctx.Done(): @@ -327,9 +327,9 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table }) } if len(candidates) == 0 { - // If no candidates were found, sleep and try again. + // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", + log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { @@ -339,18 +339,18 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table case <-timer.C: } // Got here? Means we iterated all tablets and did not find a - // healthy viable candidate. + // viable candidate. tp.incNoTabletFoundStat() continue } - log.Infof("Tablet picker found healthy tablet for streaming: %s", candidates[0].Tablet.String()) + log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) return candidates[0].Tablet, nil } } // GetMatchingTablets returns a list of TabletInfo for healthy -// tablets that match the cells, keyspace, shard and tabletTypes -// for this TabletPicker. +// serving tablets that match the cells, keyspace, shard and +// tabletTypes for this TabletPicker. func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo { // Special handling for PRIMARY tablet type: since there is only // one primary per shard, we ignore cell and find the primary. @@ -434,7 +434,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if shr.RealtimeStats.HealthError == "" && shr.Serving { return io.EOF // End the stream } - return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving") + return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { tablets = append(tablets, tabletInfo) } @@ -446,7 +446,6 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } func init() { - // TODO(sougou): consolidate this call to be once per process. globalTPStats = newTabletPickerStats() } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 45973eee328..a6d0e8f4c42 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestPickPrimary(t *testing.T) { @@ -481,8 +482,8 @@ func TestPickErrorOnlySpecified(t *testing.T) { // TestPickFallbackType tests that when providing a list of tablet types to // pick from, with the list in preference order, that when the primary/first -// type has no available healthy tablets that we select a healthy tablet from -// the secondary/second type. +// type has no available healthy serving tablets that we select a healthy +// serving tablet from the secondary/second type. func TestPickFallbackType(t *testing.T) { cells := []string{"cell1", "cell2"} localCell := cells[0] diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 88152af57dd..b80e9dd70c1 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -283,8 +283,8 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string) return nil } -// pickSourceTablet picks a healthy tablet to source for the -// vreplication stream. If the source is marked as external, it +// pickSourceTablet picks a healthy serving tablet to source for +// the vreplication stream. If the source is marked as external, it // returns nil. func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (tablet *topodatapb.Tablet, err error) { if ct.source.GetExternalMysql() == "" { From 7c2da14b35457c3fa99617282137a2bb4c4f406c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 15:08:27 -0400 Subject: [PATCH 09/15] Fix vstream unit tests Signed-off-by: Matt Lord --- go/vt/vttablet/sandboxconn/sandboxconn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index e51636ec195..a654d18cea5 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -418,9 +418,9 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target, // SandboxSQRowCount is the default number of fake splits returned. var SandboxSQRowCount = int64(10) -// StreamHealth is not implemented. +// StreamHealth always mocks a "healthy" result. func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { - return fmt.Errorf("not implemented in test") + return nil } // ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos. From da6e0babe57614f2a17aa391b874c5fb307214e5 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 16:04:49 -0400 Subject: [PATCH 10/15] Try simplifying error handling Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 6f62937c803..e6da64df394 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -431,7 +431,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { // Ensure that the tablet is healthy and serving. if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { - if shr.RealtimeStats.HealthError == "" && shr.Serving { + if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { return io.EOF // End the stream } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") From e3a18d25b931446e6c8be7e6fce84d1cddb01d67 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 18:24:26 -0400 Subject: [PATCH 11/15] Remove errant second tp.incNoTabletFoundStat() Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index e6da64df394..199001c8326 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -338,9 +338,6 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") case <-timer.C: } - // Got here? Means we iterated all tablets and did not find a - // viable candidate. - tp.incNoTabletFoundStat() continue } log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) From cabc08ed0ab0f3a220b82cf2288f943e3ca19477 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 19:13:17 -0400 Subject: [PATCH 12/15] Minor changes after final self review Signed-off-by: Matt Lord --- .../tabletmanager/vreplication/controller.go | 39 ++++++++++--------- .../tabletmanager/vreplication/stats.go | 7 +++- .../tabletmanager/vreplication/stats_test.go | 1 + go/vt/wrangler/fake_tablet_test.go | 2 +- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index b80e9dd70c1..737e702a04c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -286,27 +286,28 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string) // pickSourceTablet picks a healthy serving tablet to source for // the vreplication stream. If the source is marked as external, it // returns nil. -func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (tablet *topodatapb.Tablet, err error) { - if ct.source.GetExternalMysql() == "" { - log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", - ct.id, ct.workflow) - tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) - defer tpCancel() - tablet, err = ct.tabletPicker.PickForStreaming(tpCtx) - if err != nil { - select { - case <-ctx.Done(): - default: - ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) - ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) - } - return tablet, err +func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (*topodatapb.Tablet, error) { + if ct.source.GetExternalMysql() != "" { + return nil, nil + } + log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", + ct.id, ct.workflow) + tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) + defer tpCancel() + tablet, err := ct.tabletPicker.PickForStreaming(tpCtx) + if err != nil { + select { + case <-ctx.Done(): + default: + ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) + ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) } - ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) - log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", - tablet.Alias.String(), ct.id, ct.workflow) - ct.sourceTablet.Store(tablet.Alias) + return tablet, err } + ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) + log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", + tablet.Alias.String(), ct.id, ct.workflow) + ct.sourceTablet.Store(tablet.Alias) return tablet, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 909137cf5cf..fe59ec27197 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -24,10 +24,11 @@ import ( "time" "vitess.io/vitess/go/vt/binlog/binlogplayer" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/servenv" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -153,7 +154,9 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Load().(*topodatapb.TabletAlias).String() + if ta := ct.sourceTablet.Load().(*topodatapb.TabletAlias); ta != nil { + result[fmt.Sprintf("%v", ct.id)] = ta.String() + } } return result })) diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index db8a07f38cb..3c9ae4f283c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index b0cbd6b8c0f..687ce93db47 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -51,7 +51,7 @@ import ( ) func init() { - // enforce we will use the right protocol (gRPC) in all unit tests + // Ensure we will use the right protocol (gRPC) in all unit tests. tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") } From 5f97ecbd1674ba9c7163739af4fba2b0885041fe Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 19:26:22 -0400 Subject: [PATCH 13/15] Pedantically arrange imports Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 199001c8326..aff7b5a4461 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -17,6 +17,7 @@ limitations under the License. package discovery import ( + "context" "fmt" "io" "math/rand" @@ -26,22 +27,16 @@ import ( "time" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/vt/log" - - "context" - + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vterrors" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type TabletPickerCellPreference int From 44d7372c58e601144a9e42204bd1a4b60c4f52d7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 24 Jul 2023 20:59:05 -0400 Subject: [PATCH 14/15] Try to deflake wrangler race tests Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/stats.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index fe59ec27197..6379a9ba04f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -154,8 +154,9 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - if ta := ct.sourceTablet.Load().(*topodatapb.TabletAlias); ta != nil { - result[fmt.Sprintf("%v", ct.id)] = ta.String() + ta := ct.sourceTablet.Load() + if ta != nil { + result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String() } } return result From c16f747839f02c4f0afbe2e1985aaa175136fa5c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 25 Jul 2023 15:22:06 -0400 Subject: [PATCH 15/15] Use short timeout when checking tablet health Otherwise we can block until the long running engine context is cancelled. Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index aff7b5a4461..e8d1525b073 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -422,7 +422,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn // Try to connect to the tablet and confirm that it's usable. if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { // Ensure that the tablet is healthy and serving. - if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { return io.EOF // End the stream }