diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 2b9570a9b6e..42c403a2295 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -110,6 +110,14 @@ const ( ThrottleCheckSelf ) +// throttlerTopoService represents the functionality we expect from a TopoServer, abstracted so that +// it can be mocked in unit tests +type throttlerTopoService interface { + GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) + FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) + GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) +} + // Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data, // aggregates, reads inventory, provides information, etc. type Throttler struct { @@ -125,7 +133,7 @@ type Throttler struct { env tabletenv.Env pool *connpool.Pool tabletTypeFunc func() topodatapb.TabletType - ts *topo.Server + ts throttlerTopoService srvTopoServer srvtopo.Server heartbeatWriter heartbeat.HeartbeatWriter @@ -602,10 +610,10 @@ func (throttler *Throttler) Operate(ctx context.Context) { recentCheckTicker := addTicker(time.Second) tmClient := tmclient.NewTabletManagerClient() - defer tmClient.Close() go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") + defer tmClient.Close() for _, t := range tickers { defer t.Stop() // since we just started the tickers now, speed up the ticks by forcing an immediate tick @@ -786,8 +794,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm var throttleMetricFunc func() *mysql.MySQLThrottleMetric if clusterName == selfStoreName { + // Throttler is probing its own tablet's metrics: throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe) } else { + // Throttler probing other tablets: throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, tmClient, clusterName, probe) } throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc) @@ -801,7 +811,6 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm // refreshMySQLInventory will re-structure the inventory based on reading config settings func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { - // distribute the query/threshold from the throttler down to the cluster settings and from there to the probes metricsQuery := throttler.GetMetricsQuery() metricsThreshold := throttler.MetricsThreshold.Load() @@ -844,13 +853,20 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { } if clusterName == selfStoreName { - // special case: just looking at this tablet's MySQL server + // special case: just looking at this tablet's MySQL server. // We will probe this "cluster" (of one server) is a special way. addInstanceKey(nil, "", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes) throttler.mysqlClusterProbesChan <- clusterProbes return } if !throttler.isLeader.Load() { + // This tablet may have used to be the primary, but it isn't now. It may have a recollection + // of previous clusters it used to probe. It may have recollection of specific probes for such clusters. + // This now ensures any existing cluster probes are overrridden with an empty list of probes. + // `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn + // be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in + // updateMySQLClusterProbes(). + throttler.mysqlClusterProbesChan <- clusterProbes // not the leader (primary tablet)? Then no more work for us. return } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 2995e03b654..c47466df522 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -7,13 +7,57 @@ package throttle import ( + "context" + "fmt" + "sync/atomic" "testing" "time" "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +const ( + waitForProbesTimeout = 30 * time.Second ) +type FakeTopoServer struct { +} + +func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) { + tablet := &topo.TabletInfo{ + Tablet: &topodatapb.Tablet{ + Alias: alias, + Hostname: "127.0.0.1", + MysqlHostname: "127.0.0.1", + MysqlPort: 3306, + PortMap: map[string]int32{"vt": 5000}, + Type: topodatapb.TabletType_REPLICA, + }, + } + return tablet, nil +} + +func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) { + aliases := []*topodatapb.TabletAlias{ + {Cell: "zone1", Uid: 100}, + {Cell: "zone2", Uid: 101}, + } + return aliases, nil +} + +func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { + ks := &topodatapb.SrvKeyspace{} + return ks, nil +} + type FakeHeartbeatWriter struct { } @@ -50,6 +94,7 @@ func TestIsAppThrottled(t *testing.T) { } func TestIsAppExempted(t *testing.T) { + throttler := Throttler{ throttledApps: cache.New(cache.NoExpiration, 0), heartbeatWriter: FakeHeartbeatWriter{}, @@ -75,3 +120,102 @@ func TestIsAppExempted(t *testing.T) { throttler.UnthrottleApp("schema-tracker") // meaningless. App is statically exempted assert.True(t, throttler.IsAppExempted("schema-tracker")) } + +// TestRefreshMySQLInventory tests the behavior of the throttler's RefreshMySQLInventory() function, which +// is called periodically in actual throttler. For a given cluster name, it generates a list of probes +// the throttler will use to check metrics. +// On a "self" cluster, that list is expect to probe the tablet itself. +// On any other cluster, the list is expected to be empty if non-leader (only leader throttler, on a +// `PRIMARY` tablet, probes other tablets). On the leader, the list is expected to be non-empty. +func TestRefreshMySQLInventory(t *testing.T) { + metricsQuery := "select 1" + config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{ + selfStoreName: {}, + "ks1": {}, + "ks2": {}, + } + clusters := config.Settings().Stores.MySQL.Clusters + for _, s := range clusters { + s.MetricQuery = metricsQuery + s.ThrottleThreshold = &atomic.Uint64{} + s.ThrottleThreshold.Store(1) + } + + throttler := &Throttler{ + mysqlClusterProbesChan: make(chan *mysql.ClusterProbes), + mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), + ts: &FakeTopoServer{}, + mysqlInventory: mysql.NewInventory(), + } + throttler.metricsQuery.Store(metricsQuery) + throttler.initThrottleTabletTypes() + + validateClusterProbes := func(t *testing.T, ctx context.Context) { + testName := fmt.Sprintf("leader=%t", throttler.isLeader.Load()) + t.Run(testName, func(t *testing.T) { + // validateProbesCount expectes number of probes according to cluster name and throttler's leadership status + validateProbesCount := func(t *testing.T, clusterName string, probes *mysql.Probes) { + if clusterName == selfStoreName { + assert.Equal(t, 1, len(*probes)) + } else if throttler.isLeader.Load() { + assert.NotZero(t, len(*probes)) + } else { + assert.Empty(t, *probes) + } + } + t.Run("waiting for probes", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout) + defer cancel() + numClusterProbesResults := 0 + for { + select { + case probes := <-throttler.mysqlClusterProbesChan: + // Worth noting that in this unit test, the throttler is _closed_. Its own Operate() function does + // not run, and therefore there is none but us to both populate `mysqlClusterProbesChan` as well as + // read from it. We do not compete here with any other goroutine. + assert.NotNil(t, probes) + + throttler.updateMySQLClusterProbes(ctx, probes) + + numClusterProbesResults++ + validateProbesCount(t, probes.ClusterName, probes.InstanceProbes) + + if numClusterProbesResults == len(clusters) { + // Achieved our goal + return + } + case <-ctx.Done(): + assert.FailNowf(t, ctx.Err().Error(), "waiting for %d cluster probes", len(clusters)) + } + } + }) + t.Run("validating probes", func(t *testing.T) { + for clusterName := range clusters { + probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName] + require.True(t, ok) + validateProbesCount(t, clusterName, probes) + } + }) + }) + } + // + ctx := context.Background() + + t.Run("initial, not leader", func(t *testing.T) { + throttler.isLeader.Store(false) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) + + t.Run("promote", func(t *testing.T) { + throttler.isLeader.Store(true) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) + + t.Run("demote, expect cleanup", func(t *testing.T) { + throttler.isLeader.Store(false) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) +}