Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtorc: improve handling of partial cell topo results #17718

Merged
merged 6 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,26 @@ func NewFakeTopoFactory() *FakeFactory {
mu: sync.Mutex{},
cells: map[string][]*FakeConn{},
}
factory.cells[topo.GlobalCell] = []*FakeConn{newFakeConnection()}
factory.cells[topo.GlobalCell] = []*FakeConn{NewFakeConnection()}
return factory
}

// AddCell is used to add a cell to the factory. It returns the fake connection created. This connection can then be used to set get and update errors
func (f *FakeFactory) AddCell(cell string) *FakeConn {
conn := newFakeConnection()
f.mu.Lock()
defer f.mu.Unlock()
conn := NewFakeConnection()
f.cells[cell] = []*FakeConn{conn}
return conn
}

// SetCell is used to set a cell in the factory.
func (f *FakeFactory) SetCell(cell string, fakeConn *FakeConn) {
f.mu.Lock()
defer f.mu.Unlock()
f.cells[cell] = []*FakeConn{fakeConn}
}

// HasGlobalReadOnlyCell implements the Factory interface
func (f *FakeFactory) HasGlobalReadOnlyCell(serverAddr, root string) bool {
return false
Expand All @@ -70,7 +79,7 @@ func (f *FakeFactory) Create(cell, serverAddr, root string) (topo.Conn, error) {
if !ok || len(connections) == 0 {
return nil, topo.NewError(topo.NoNode, cell)
}
// pick the first connection and remove it from the list
// pick the first connection and remove it from the list.
conn := connections[0]
f.cells[cell] = connections[1:]

Expand All @@ -84,15 +93,19 @@ type FakeConn struct {
cell string
serverAddr string

// mutex to protect all the operations
// mutex to protect all the operations.
mu sync.Mutex

// getResultMap is a map storing the results for each filepath
// getResultMap is a map storing the results for each filepath.
getResultMap map[string]result
// updateErrors stores whether update function call should error or not
// listResultMap is a map storing the resuls for each filepath prefix.
listResultMap map[string][]topo.KVInfo
// updateErrors stores whether update function call should error or not.
updateErrors []updateError
// getErrors stores whether the get function call should error or not
// getErrors stores whether the get function call should error or not.
getErrors []bool
// listErrors stores whether the list function call should error or not.
listErrors []bool

// watches is a map of all watches for this connection to the cell keyed by the filepath.
watches map[string][]chan *topo.WatchData
Expand All @@ -105,13 +118,15 @@ type updateError struct {
writePersists bool
}

// newFakeConnection creates a new fake connection
func newFakeConnection() *FakeConn {
// NewFakeConnection creates a new fake connection
func NewFakeConnection() *FakeConn {
return &FakeConn{
getResultMap: map[string]result{},
watches: map[string][]chan *topo.WatchData{},
getErrors: []bool{},
updateErrors: []updateError{},
getResultMap: map[string]result{},
listResultMap: map[string][]topo.KVInfo{},
watches: map[string][]chan *topo.WatchData{},
getErrors: []bool{},
listErrors: []bool{},
updateErrors: []updateError{},
}
}

Expand All @@ -122,6 +137,20 @@ func (f *FakeConn) AddGetError(shouldErr bool) {
f.getErrors = append(f.getErrors, shouldErr)
}

// AddListError is used to add a list error to the fake connection
func (f *FakeConn) AddListError(shouldErr bool) {
f.mu.Lock()
defer f.mu.Unlock()
f.listErrors = append(f.listErrors, shouldErr)
}

// AddListResult is used to add a list result to the fake connection
func (f *FakeConn) AddListResult(filePathPrefix string, result []topo.KVInfo) {
f.mu.Lock()
defer f.mu.Unlock()
f.listResultMap[filePathPrefix] = result
}

// AddUpdateError is used to add an update error to the fake connection
func (f *FakeConn) AddUpdateError(shouldErr bool, writePersists bool) {
f.mu.Lock()
Expand Down Expand Up @@ -261,7 +290,20 @@ func (f *FakeConn) GetVersion(ctx context.Context, filePath string, version int6

// List is part of the topo.Conn interface.
func (f *FakeConn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in fake topo")
f.mu.Lock()
defer f.mu.Unlock()
if len(f.listErrors) > 0 {
shouldErr := f.listErrors[0]
f.listErrors = f.listErrors[1:]
if shouldErr {
return nil, topo.NewError(topo.Timeout, filePathPrefix)
}
}
kvInfos, isPresent := f.listResultMap[filePathPrefix]
if !isPresent {
return nil, topo.NewError(topo.NoNode, filePathPrefix)
}
return kvInfos, nil
}

// Delete implements the Conn interface
Expand Down
67 changes: 40 additions & 27 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,30 @@ func OpenTabletDiscovery() <-chan time.Time {
return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// getAllTablets gets all tablets from all cells using a goroutine per cell.
func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo {
var tabletsMu sync.Mutex
tablets := make([]*topo.TabletInfo, 0)
// getAllTablets gets all tablets from all cells using a goroutine per cell. It returns a map of
// cells (string) to slices of tablets (as topo.TabletInfo) and a slice of cells (string) that
// failed to return a result.
func getAllTablets(ctx context.Context, cells []string) (tabletsByCell map[string][]*topo.TabletInfo, failedCells []string) {
var mu sync.Mutex
failedCells = make([]string, 0, len(cells))
tabletsByCell = make(map[string][]*topo.TabletInfo, len(cells))
eg, ctx := errgroup.WithContext(ctx)
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, nil)
tablets, err := ts.GetTabletsByCell(ctx, cell, nil)
mu.Lock()
defer mu.Unlock()
if err != nil {
log.Errorf("Failed to load tablets from cell %s: %+v", cell, err)
return nil
failedCells = append(failedCells, cell)
} else {
tabletsByCell[cell] = tablets
}
tabletsMu.Lock()
defer tabletsMu.Unlock()
tablets = append(tablets, t...)
return nil
})
}
_ = eg.Wait() // always nil
return tablets
return tabletsByCell, failedCells
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
Expand All @@ -182,35 +186,44 @@ func refreshAllTablets(ctx context.Context) error {
// refreshTabletsUsing refreshes tablets using a provided loader.
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {
// Get all cells.
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
cells, err := ts.GetKnownCells(ctx)
cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cellsCancel()
cells, err := ts.GetKnownCells(cellsCtx)
if err != nil {
return err
}

// Get all tablets from all cells.
getTabletsCtx, getTabletsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getTabletsCancel()
tablets := getAllTablets(getTabletsCtx, cells)
if len(tablets) == 0 {
log.Error("Found no tablets")
tabletsByCell, failedCells := getAllTablets(getTabletsCtx, cells)
if len(tabletsByCell) == 0 {
log.Error("Found no cells with tablets")
return nil
}
if len(failedCells) > 0 {
log.Errorf("Got partial topo result. Failed cells: %s", strings.Join(failedCells, ", "))
}

// Filter tablets that should not be watched using shardsToWatch map.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
for _, t := range tablets {
if shouldWatchTablet(t.Tablet) {
matchedTablets = append(matchedTablets, t)
// Update each cell that provided a response. This ensures only cells that provided a
// response are updated in the backend and are considered for forgetting stale tablets.
for cell, tablets := range tabletsByCell {
// Filter tablets that should not be watched using func shouldWatchTablet.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
for _, t := range tablets {
if shouldWatchTablet(t.Tablet) {
matchedTablets = append(matchedTablets, t)
}
}
}
}()
}()

// Refresh the filtered tablets and forget stale tablets.
query := "select alias from vitess_tablet where cell = ?"
args := sqlutils.Args(cell)
refreshTablets(matchedTablets, query, args, loader, forceRefresh, nil)
}

// Refresh the filtered tablets.
query := "select alias from vitess_tablet"
refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil)
return nil
}

Expand Down
60 changes: 60 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package logic
import (
"context"
"fmt"
"slices"
"strings"
"sync/atomic"
"testing"
Expand All @@ -34,6 +35,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/faketopo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil"
Expand Down Expand Up @@ -840,3 +842,61 @@ func TestSetReplicationSource(t *testing.T) {
})
}
}

func TestGetAllTablets(t *testing.T) {
tablet := &topodatapb.Tablet{
Hostname: t.Name(),
}
tabletProto, _ := tablet.MarshalVT()

factory := faketopo.NewFakeTopoFactory()

// zone1 (success)
goodCell1 := faketopo.NewFakeConnection()
goodCell1.AddListResult("tablets", []topo.KVInfo{
{
Key: []byte("zone1-00000001"),
Value: tabletProto,
},
})
factory.SetCell("zone1", goodCell1)

// zone2 (success)
goodCell2 := faketopo.NewFakeConnection()
goodCell2.AddListResult("tablets", []topo.KVInfo{
{
Key: []byte("zone2-00000002"),
Value: tabletProto,
},
})
factory.SetCell("zone2", goodCell2)

// zone3 (fail)
badCell1 := faketopo.NewFakeConnection()
badCell1.AddListError(true)
factory.SetCell("zone3", badCell1)

// zone4 (fail)
badCell2 := faketopo.NewFakeConnection()
badCell2.AddListError(true)
factory.SetCell("zone4", badCell2)

oldTs := ts
defer func() {
ts = oldTs
}()
ctx := context.Background()
ts = faketopo.NewFakeTopoServer(ctx, factory)

// confirm zone1 + zone2 succeeded and zone3 + zone4 failed
tabletsByCell, failedCells := getAllTablets(ctx, []string{"zone1", "zone2", "zone3", "zone4"})
require.Len(t, tabletsByCell, 2)
slices.Sort(failedCells)
require.Equal(t, []string{"zone3", "zone4"}, failedCells)
for _, tablets := range tabletsByCell {
require.Len(t, tablets, 1)
for _, tablet := range tablets {
require.Equal(t, t.Name(), tablet.Tablet.GetHostname())
}
}
}
Loading