Skip to content

Commit

Permalink
feat: use keyspace event watcher for getting serving keyspaces
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Jul 11, 2023
1 parent 13edc4a commit cd19e9c
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 5 deletions.
26 changes: 26 additions & 0 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, new
return true
}

// isServing returns whether a keyspace has at least one serving shard or not.
func (kss *keyspaceState) isServing() bool {
kss.mu.Lock()
defer kss.mu.Unlock()
for _, state := range kss.shards {
if state.serving {
return true
}
}
return false
}

// newKeyspaceState allocates the internal state required to keep track of availability incidents
// in this keyspace, and starts up a SrvKeyspace watcher on our topology server which will update
// our keyspaceState with any topology changes in real time.
Expand Down Expand Up @@ -471,3 +483,17 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) (*top
}
return nil, false
}

// GetServingKeyspaces gets the serving keyspaces from the keyspace event watcher.
func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
kew.mu.Lock()
defer kew.mu.Unlock()

var servingKeyspaces []string
for ksName, state := range kew.keyspaces {
if state.isServing() {
servingKeyspaces = append(servingKeyspaces, ksName)
}
}
return servingKeyspaces
}
6 changes: 4 additions & 2 deletions go/vt/srvtopo/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package srvtopo

import (
"context"
"sort"

"vitess.io/vitess/go/sqltypes"

"context"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/key"
Expand All @@ -43,6 +42,9 @@ type Gateway interface {

// QueryServiceByAlias returns a QueryService
QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error)

// GetServingKeyspaces returns list of serving keyspaces.
GetServingKeyspaces() []string
}

// A Resolver can resolve keyspace ids and key ranges into ResolvedShard*
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, targ
return queryservice.Wrap(qs, gw.withShardError), NewShardError(err, target)
}

// GetServingKeyspaces returns list of serving keyspaces.
func (gw *TabletGateway) GetServingKeyspaces() []string {
if gw.kev == nil {
return nil
}
return gw.kev.GetServingKeyspaces()
}

// RegisterStats registers the stats to export the lag since the last refresh
// and the checksum of the topology
func (gw *TabletGateway) RegisterStats() {
Expand Down
20 changes: 17 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,28 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) {
return nil, errNoDbAvailable
}

var keyspaces = make([]*vindexes.Keyspace, 0, len(vc.vschema.Keyspaces))
for _, ks := range vc.vschema.Keyspaces {
keyspaces = append(keyspaces, ks.Keyspace)
keyspaceNames := vc.resolver.GetGateway().GetServingKeyspaces()

var keyspaces []*vindexes.Keyspace
for _, ksName := range keyspaceNames {
ks, exists := vc.vschema.Keyspaces[ksName]
if exists {
keyspaces = append(keyspaces, ks.Keyspace)
}
}
if len(keyspaces) == 0 {
for _, ks := range vc.vschema.Keyspaces {
keyspaces = append(keyspaces, ks.Keyspace)
}
}
sort.Slice(keyspaces, func(i, j int) bool {
return keyspaces[i].Name < keyspaces[j].Name
})

sort.Slice(keyspaces, func(i, j int) bool {
return keyspaces[i].Name < keyspaces[j].Name
})

// Look for any sharded keyspace if present, otherwise take the first keyspace,
// sorted alphabetically
for _, ks := range keyspaces {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *queryp
return sbc, nil
}

// GetServingKeyspaces returns list of serving keyspaces.
func (sbc *SandboxConn) GetServingKeyspaces() []string {
return nil
}

// HandlePanic is part of the QueryService interface.
func (sbc *SandboxConn) HandlePanic(err *error) {
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,11 @@ func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *que
panic("not implemented")
}

// GetServingKeyspaces returns list of serving keyspaces.
func (f *FakeQueryService) GetServingKeyspaces() []string {
panic("not implemented")
}

// ReserveBeginExecute satisfies the Gateway interface
func (f *FakeQueryService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) {
panic("implement me")
Expand Down

0 comments on commit cd19e9c

Please sign in to comment.