diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index acff538b78d..d3e9adfeded 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -365,6 +365,18 @@ func (fhc *FakeHealthCheck) GetAllTablets() map[string]*topodatapb.Tablet { return res } +// BroadcastAll broadcasts all the tablets' healthchecks +func (fhc *FakeHealthCheck) BroadcastAll() { + if fhc.ch == nil { + return + } + fhc.mu.Lock() + defer fhc.mu.Unlock() + for _, item := range fhc.items { + fhc.ch <- simpleCopy(item.ts) + } +} + func simpleCopy(th *TabletHealth) *TabletHealth { return &TabletHealth{ Conn: th.Conn, diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 7f4c2dcfe97..de8c9aac5d4 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -130,7 +130,7 @@ func init() { func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) { cell := "aa" - hc := discovery.NewFakeHealthCheck(nil) + hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth)) s := createSandbox(KsTestSharded) s.VSchema = executorVSchema serv := newSandboxForCells([]string{cell}) @@ -162,7 +162,6 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn _ = hc.AddTestTablet(cell, "c0-e0", 1, "TestExecutor", "c0-e0", topodatapb.TabletType_PRIMARY, true, 1, nil) _ = hc.AddTestTablet(cell, "e0-", 1, "TestExecutor", "e0-", topodatapb.TabletType_PRIMARY, true, 1, nil) // Below is needed so that SendAnyWherePlan doesn't fail - _ = hc.AddTestTablet(cell, "random", 1, "TestXBadVSchema", "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) createSandbox(KsTestUnsharded) sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index b874d2a7107..1c1fe42161c 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -2063,6 +2064,49 @@ func TestExecutorClearsWarnings(t *testing.T) { require.Empty(t, session.Warnings) } +// TestServingKeyspaces tests that the dual queries are routed to the correct keyspaces from the list of serving keyspaces. +func TestServingKeyspaces(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + executor.pv = querypb.ExecuteOptions_Gen4 + gw, ok := executor.resolver.resolver.GetGateway().(*TabletGateway) + require.True(t, ok) + hc := gw.hc.(*discovery.FakeHealthCheck) + + // We broadcast twice because we want to ensure the keyspace event watcher has processed all the healthcheck updates + // from the first broadcast. Since we use a channel for broadcasting, it is blocking and hence the second call ensures + // all the updates (specifically the last one) has been processed by the keyspace-event-watcher. + hc.BroadcastAll() + hc.BroadcastAll() + + sbc1.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestExecutor"), + }) + sbclookup.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestUnsharded"), + }) + + require.ElementsMatch(t, []string{"TestExecutor", "TestUnsharded"}, gw.GetServingKeyspaces()) + result, err := executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil) + require.NoError(t, err) + require.Equal(t, `[[VARCHAR("TestExecutor")]]`, fmt.Sprintf("%v", result.Rows)) + + for _, tablet := range hc.GetAllTablets() { + if tablet.Keyspace == "TestExecutor" { + hc.SetServing(tablet, false) + } + } + // Two broadcast calls for the same reason as above. + hc.BroadcastAll() + hc.BroadcastAll() + + // Clear plan cache, to force re-planning of the query. + executor.plans.Clear() + require.ElementsMatch(t, []string{"TestUnsharded"}, gw.GetServingKeyspaces()) + result, err = executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil) + require.NoError(t, err) + require.Equal(t, `[[VARCHAR("TestUnsharded")]]`, fmt.Sprintf("%v", result.Rows)) +} + func TestExecutorOtherRead(t *testing.T) { executor, sbc1, sbc2, sbclookup := createExecutorEnv() diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 127fd0a9a07..b7f9735bb62 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -360,10 +360,6 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) { return keyspaces[i].Name < keyspaces[j].Name }) - sort.Slice(keyspaces, func(i, j int) bool { - return keyspaces[i].Name < keyspaces[j].Name - }) - // Look for any sharded keyspace if present, otherwise take the first keyspace, // sorted alphabetically for _, ks := range keyspaces { diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index c26d77a19d0..d5f6e497339 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -45,6 +45,9 @@ type SandboxConn struct { // These errors work for all functions. MustFailCodes map[vtrpcpb.Code]int + // ServingKeyspaces is a list of serving keyspaces + ServingKeyspaces []string + // These errors are triggered only for specific functions. // For now these are just for the 2PC functions. MustFailPrepare int @@ -511,7 +514,7 @@ func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *queryp // GetServingKeyspaces returns list of serving keyspaces. func (sbc *SandboxConn) GetServingKeyspaces() []string { - return nil + return sbc.ServingKeyspaces } // HandlePanic is part of the QueryService interface.