diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 0c88b95e3da..c49072c043f 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -45,17 +45,26 @@ func NewFakeTopoFactory() *FakeFactory { mu: sync.Mutex{}, cells: map[string][]*FakeConn{}, } - factory.cells[topo.GlobalCell] = []*FakeConn{newFakeConnection()} + factory.cells[topo.GlobalCell] = []*FakeConn{NewFakeConnection()} return factory } // AddCell is used to add a cell to the factory. It returns the fake connection created. This connection can then be used to set get and update errors func (f *FakeFactory) AddCell(cell string) *FakeConn { - conn := newFakeConnection() + f.mu.Lock() + defer f.mu.Unlock() + conn := NewFakeConnection() f.cells[cell] = []*FakeConn{conn} return conn } +// SetCell is used to set a cell in the factory. +func (f *FakeFactory) SetCell(cell string, fakeConn *FakeConn) { + f.mu.Lock() + defer f.mu.Unlock() + f.cells[cell] = []*FakeConn{fakeConn} +} + // HasGlobalReadOnlyCell implements the Factory interface func (f *FakeFactory) HasGlobalReadOnlyCell(serverAddr, root string) bool { return false @@ -70,7 +79,7 @@ func (f *FakeFactory) Create(cell, serverAddr, root string) (topo.Conn, error) { if !ok || len(connections) == 0 { return nil, topo.NewError(topo.NoNode, cell) } - // pick the first connection and remove it from the list + // pick the first connection and remove it from the list. conn := connections[0] f.cells[cell] = connections[1:] @@ -84,15 +93,19 @@ type FakeConn struct { cell string serverAddr string - // mutex to protect all the operations + // mutex to protect all the operations. mu sync.Mutex - // getResultMap is a map storing the results for each filepath + // getResultMap is a map storing the results for each filepath. getResultMap map[string]result - // updateErrors stores whether update function call should error or not + // listResultMap is a map storing the resuls for each filepath prefix. + listResultMap map[string][]topo.KVInfo + // updateErrors stores whether update function call should error or not. updateErrors []updateError - // getErrors stores whether the get function call should error or not + // getErrors stores whether the get function call should error or not. getErrors []bool + // listErrors stores whether the list function call should error or not. + listErrors []bool // watches is a map of all watches for this connection to the cell keyed by the filepath. watches map[string][]chan *topo.WatchData @@ -105,13 +118,15 @@ type updateError struct { writePersists bool } -// newFakeConnection creates a new fake connection -func newFakeConnection() *FakeConn { +// NewFakeConnection creates a new fake connection +func NewFakeConnection() *FakeConn { return &FakeConn{ - getResultMap: map[string]result{}, - watches: map[string][]chan *topo.WatchData{}, - getErrors: []bool{}, - updateErrors: []updateError{}, + getResultMap: map[string]result{}, + listResultMap: map[string][]topo.KVInfo{}, + watches: map[string][]chan *topo.WatchData{}, + getErrors: []bool{}, + listErrors: []bool{}, + updateErrors: []updateError{}, } } @@ -122,6 +137,20 @@ func (f *FakeConn) AddGetError(shouldErr bool) { f.getErrors = append(f.getErrors, shouldErr) } +// AddListError is used to add a list error to the fake connection +func (f *FakeConn) AddListError(shouldErr bool) { + f.mu.Lock() + defer f.mu.Unlock() + f.listErrors = append(f.listErrors, shouldErr) +} + +// AddListResult is used to add a list result to the fake connection +func (f *FakeConn) AddListResult(filePathPrefix string, result []topo.KVInfo) { + f.mu.Lock() + defer f.mu.Unlock() + f.listResultMap[filePathPrefix] = result +} + // AddUpdateError is used to add an update error to the fake connection func (f *FakeConn) AddUpdateError(shouldErr bool, writePersists bool) { f.mu.Lock() @@ -261,7 +290,20 @@ func (f *FakeConn) GetVersion(ctx context.Context, filePath string, version int6 // List is part of the topo.Conn interface. func (f *FakeConn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) { - return nil, topo.NewError(topo.NoImplementation, "List not supported in fake topo") + f.mu.Lock() + defer f.mu.Unlock() + if len(f.listErrors) > 0 { + shouldErr := f.listErrors[0] + f.listErrors = f.listErrors[1:] + if shouldErr { + return nil, topo.NewError(topo.Timeout, filePathPrefix) + } + } + kvInfos, isPresent := f.listResultMap[filePathPrefix] + if !isPresent { + return nil, topo.NewError(topo.NoNode, filePathPrefix) + } + return kvInfos, nil } // Delete implements the Conn interface diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index c5c23df0cd0..d90247409aa 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -150,26 +150,30 @@ func OpenTabletDiscovery() <-chan time.Time { return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker } -// getAllTablets gets all tablets from all cells using a goroutine per cell. -func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo { - var tabletsMu sync.Mutex - tablets := make([]*topo.TabletInfo, 0) +// getAllTablets gets all tablets from all cells using a goroutine per cell. It returns a map of +// cells (string) to slices of tablets (as topo.TabletInfo) and a slice of cells (string) that +// failed to return a result. +func getAllTablets(ctx context.Context, cells []string) (tabletsByCell map[string][]*topo.TabletInfo, failedCells []string) { + var mu sync.Mutex + failedCells = make([]string, 0, len(cells)) + tabletsByCell = make(map[string][]*topo.TabletInfo, len(cells)) eg, ctx := errgroup.WithContext(ctx) for _, cell := range cells { eg.Go(func() error { - t, err := ts.GetTabletsByCell(ctx, cell, nil) + tablets, err := ts.GetTabletsByCell(ctx, cell, nil) + mu.Lock() + defer mu.Unlock() if err != nil { log.Errorf("Failed to load tablets from cell %s: %+v", cell, err) - return nil + failedCells = append(failedCells, cell) + } else { + tabletsByCell[cell] = tablets } - tabletsMu.Lock() - defer tabletsMu.Unlock() - tablets = append(tablets, t...) return nil }) } _ = eg.Wait() // always nil - return tablets + return tabletsByCell, failedCells } // refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while @@ -182,9 +186,9 @@ func refreshAllTablets(ctx context.Context) error { // refreshTabletsUsing refreshes tablets using a provided loader. func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error { // Get all cells. - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - cells, err := ts.GetKnownCells(ctx) + cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cellsCancel() + cells, err := ts.GetKnownCells(cellsCtx) if err != nil { return err } @@ -192,25 +196,34 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f // Get all tablets from all cells. getTabletsCtx, getTabletsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer getTabletsCancel() - tablets := getAllTablets(getTabletsCtx, cells) - if len(tablets) == 0 { - log.Error("Found no tablets") + tabletsByCell, failedCells := getAllTablets(getTabletsCtx, cells) + if len(tabletsByCell) == 0 { + log.Error("Found no cells with tablets") return nil } + if len(failedCells) > 0 { + log.Errorf("Got partial topo result. Failed cells: %s", strings.Join(failedCells, ", ")) + } - // Filter tablets that should not be watched using shardsToWatch map. - matchedTablets := make([]*topo.TabletInfo, 0, len(tablets)) - func() { - for _, t := range tablets { - if shouldWatchTablet(t.Tablet) { - matchedTablets = append(matchedTablets, t) + // Update each cell that provided a response. This ensures only cells that provided a + // response are updated in the backend and are considered for forgetting stale tablets. + for cell, tablets := range tabletsByCell { + // Filter tablets that should not be watched using func shouldWatchTablet. + matchedTablets := make([]*topo.TabletInfo, 0, len(tablets)) + func() { + for _, t := range tablets { + if shouldWatchTablet(t.Tablet) { + matchedTablets = append(matchedTablets, t) + } } - } - }() + }() + + // Refresh the filtered tablets and forget stale tablets. + query := "select alias from vitess_tablet where cell = ?" + args := sqlutils.Args(cell) + refreshTablets(matchedTablets, query, args, loader, forceRefresh, nil) + } - // Refresh the filtered tablets. - query := "select alias from vitess_tablet" - refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil) return nil } diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 4514ef81724..c351fb41c0a 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -19,6 +19,7 @@ package logic import ( "context" "fmt" + "slices" "strings" "sync/atomic" "testing" @@ -34,6 +35,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vttime" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/faketopo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil" @@ -840,3 +842,61 @@ func TestSetReplicationSource(t *testing.T) { }) } } + +func TestGetAllTablets(t *testing.T) { + tablet := &topodatapb.Tablet{ + Hostname: t.Name(), + } + tabletProto, _ := tablet.MarshalVT() + + factory := faketopo.NewFakeTopoFactory() + + // zone1 (success) + goodCell1 := faketopo.NewFakeConnection() + goodCell1.AddListResult("tablets", []topo.KVInfo{ + { + Key: []byte("zone1-00000001"), + Value: tabletProto, + }, + }) + factory.SetCell("zone1", goodCell1) + + // zone2 (success) + goodCell2 := faketopo.NewFakeConnection() + goodCell2.AddListResult("tablets", []topo.KVInfo{ + { + Key: []byte("zone2-00000002"), + Value: tabletProto, + }, + }) + factory.SetCell("zone2", goodCell2) + + // zone3 (fail) + badCell1 := faketopo.NewFakeConnection() + badCell1.AddListError(true) + factory.SetCell("zone3", badCell1) + + // zone4 (fail) + badCell2 := faketopo.NewFakeConnection() + badCell2.AddListError(true) + factory.SetCell("zone4", badCell2) + + oldTs := ts + defer func() { + ts = oldTs + }() + ctx := context.Background() + ts = faketopo.NewFakeTopoServer(ctx, factory) + + // confirm zone1 + zone2 succeeded and zone3 + zone4 failed + tabletsByCell, failedCells := getAllTablets(ctx, []string{"zone1", "zone2", "zone3", "zone4"}) + require.Len(t, tabletsByCell, 2) + slices.Sort(failedCells) + require.Equal(t, []string{"zone3", "zone4"}, failedCells) + for _, tablets := range tabletsByCell { + require.Len(t, tablets, 1) + for _, tablet := range tablets { + require.Equal(t, t.Name(), tablet.Tablet.GetHostname()) + } + } +}