Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track custom anonymous usage statistics from ingester and querier #2685

Merged
merged 3 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/grafana/mimir/pkg/querier"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -248,18 +249,28 @@ func NewQuerierHandler(
promRouter := route.New().WithPrefix(path.Join(prefix, "/api/v1"))
api.Register(promRouter)

// Track the requests count in the anonymous usage stats.
remoteReadStats := usagestats.NewRequestsMiddleware("querier_remote_read_requests")
instantQueryStats := usagestats.NewRequestsMiddleware("querier_instant_query_requests")
rangeQueryStats := usagestats.NewRequestsMiddleware("querier_range_query_requests")
exemplarsQueryStats := usagestats.NewRequestsMiddleware("querier_exemplars_query_requests")
labelsQueryStats := usagestats.NewRequestsMiddleware("querier_labels_query_requests")
seriesQueryStats := usagestats.NewRequestsMiddleware("querier_series_query_requests")
metadataQueryStats := usagestats.NewRequestsMiddleware("querier_metadata_query_requests")
cardinalityQueryStats := usagestats.NewRequestsMiddleware("querier_cardinality_query_requests")

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(querier.NewMetadataHandler(metadataSupplier))
router.Path(path.Join(prefix, "/api/v1/cardinality/label_names")).Methods("GET", "POST").Handler(querier.LabelNamesCardinalityHandler(distributor, limits))
router.Path(path.Join(prefix, "/api/v1/cardinality/label_values")).Methods("GET", "POST").Handler(querier.LabelValuesCardinalityHandler(distributor, limits))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(remoteReadStats.Wrap(querier.RemoteReadHandler(queryable, logger)))
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryStats.Wrap(promRouter))
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangeQueryStats.Wrap(promRouter))
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(exemplarsQueryStats.Wrap(promRouter))
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(labelsQueryStats.Wrap(promRouter))
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(labelsQueryStats.Wrap(promRouter))
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(seriesQueryStats.Wrap(promRouter))
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(metadataQueryStats.Wrap(querier.NewMetadataHandler(metadataSupplier)))
router.Path(path.Join(prefix, "/api/v1/cardinality/label_names")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelNamesCardinalityHandler(distributor, limits)))
router.Path(path.Join(prefix, "/api/v1/cardinality/label_values")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelValuesCardinalityHandler(distributor, limits)))

// Track execution time.
return stats.NewWallTimeMiddleware().Wrap(router)
Expand Down
60 changes: 60 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package ingester

import (
"context"
"expvar"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -55,6 +56,7 @@ import (
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
util_log "github.com/grafana/mimir/pkg/util/log"
Expand All @@ -74,6 +76,9 @@ const (
// Period at which to attempt purging metadata from memory.
metadataPurgePeriod = 5 * time.Minute

// How frequently update the usage statistics.
usageStatsUpdateInterval = usagestats.DefaultReportSendInterval / 10

// IngesterRingKey is the key under which we store the ingesters ring in the KVStore.
IngesterRingKey = "ring"

Expand All @@ -88,6 +93,12 @@ const (
sampleTooOld = "sample-too-old"
newValueForTimestamp = "new-value-for-timestamp"
sampleOutOfBounds = "sample-out-of-bounds"

replicationFactorStatsName = "ingester_replication_factor"
memorySeriesStatsName = "ingester_inmemory_series"
memoryTenantsStatsName = "ingester_inmemory_tenants"
appendedSamplesStatsName = "ingester_appended_samples"
appendedExemplarsStatsName = "ingester_appended_exemplars"
)

// BlocksUploader interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -225,6 +236,12 @@ type Ingester struct {
// Rate of pushed samples. Used to limit global samples push rate.
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64

// Anonymous usage statistics tracked by ingester.
memorySeriesStats *expvar.Int
memoryTenantsStats *expvar.Int
appendedSamplesStats *usagestats.Counter
appendedExemplarsStats *usagestats.Counter
}

func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
Expand All @@ -237,6 +254,9 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus
return nil, errors.Wrap(err, "failed to create the bucket client")
}

// Track constant usage stats.
usagestats.GetInt(replicationFactorStatsName).Set(int64(cfg.IngesterRing.ReplicationFactor))

return &Ingester{
cfg: cfg,
limits: limits,
Expand All @@ -249,6 +269,11 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus
forceCompactTrigger: make(chan requestWithUsersAndCallback),
shipTrigger: make(chan requestWithUsersAndCallback),
seriesHashCache: hashcache.NewSeriesHashCache(cfg.BlocksStorageConfig.TSDB.SeriesHashCacheMaxBytes),

memorySeriesStats: usagestats.GetAndResetInt(memorySeriesStatsName),
memoryTenantsStats: usagestats.GetAndResetInt(memoryTenantsStatsName),
appendedSamplesStats: usagestats.GetAndResetCounter(appendedSamplesStatsName),
appendedExemplarsStats: usagestats.GetAndResetCounter(appendedExemplarsStatsName),
}, nil
}

Expand Down Expand Up @@ -427,6 +452,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
metadataPurgeTicker := time.NewTicker(metadataPurgePeriod)
defer metadataPurgeTicker.Stop()

usageStatsUpdateTicker := time.NewTicker(util.DurationWithJitter(usageStatsUpdateInterval, 0.2))
pracucci marked this conversation as resolved.
Show resolved Hide resolved
defer usageStatsUpdateTicker.Stop()

for {
select {
case <-metadataPurgeTicker.C:
Expand All @@ -447,6 +475,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
case <-activeSeriesTickerChan:
i.updateActiveSeries(time.Now())

case <-usageStatsUpdateTicker.C:
i.updateUsageStats()

case <-ctx.Done():
return nil
case err := <-i.subservicesWatcher.Chan():
Expand Down Expand Up @@ -495,6 +526,30 @@ func (i *Ingester) updateActiveSeries(now time.Time) {
}
}

// updateUsageStats updated some anonymous usage statistics tracked by the ingester.
// This function is expected to be called periodically.
func (i *Ingester) updateUsageStats() {
memoryUsersCount := int64(0)
memorySeriesCount := int64(0)

for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
continue
}

// Count only tenants with at least 1 series.
if numSeries := userDB.Head().NumSeries(); numSeries > 0 {
memoryUsersCount++
memorySeriesCount += int64(numSeries)
}
}

// Track anonymous usage stats.
i.memorySeriesStats.Set(memorySeriesCount)
i.memoryTenantsStats.Set(memoryUsersCount)
}

// applyTSDBSettings goes through all tenants and applies
// * The current max-exemplars setting. If it changed, tsdb will resize the buffer; if it didn't change tsdb will return quickly.
// * The current out-of-order time window. If it changes from 0 to >0, then a new Write-Behind-Log gets created for that tenant.
Expand Down Expand Up @@ -797,6 +852,8 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
i.metrics.ingestedSamplesFail.WithLabelValues(userID).Add(float64(failedSamplesCount))
i.metrics.ingestedExemplars.Add(float64(succeededExemplarsCount))
i.metrics.ingestedExemplarsFail.Add(float64(failedExemplarsCount))
i.appendedSamplesStats.Inc(int64(succeededSamplesCount))
i.appendedExemplarsStats.Inc(int64(succeededExemplarsCount))

if sampleOutOfBoundsCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
Expand Down Expand Up @@ -1693,6 +1750,9 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
return err
}

// Update the usage statistics once all TSDBs have been opened.
i.updateUsageStats()

level.Info(i.logger).Log("msg", "successfully opened existing TSDBs")
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/chunkcompat"
util_math "github.com/grafana/mimir/pkg/util/math"
Expand Down Expand Up @@ -801,6 +802,27 @@ func TestIngester_Push(t *testing.T) {
// Check tracked Prometheus metrics
err = testutil.GatherAndCompare(registry, strings.NewReader(testData.expectedMetrics), mn...)
assert.NoError(t, err)

// Check anonymous usage stats.
expectedTenantsCount := 0
expectedSamplesCount := 0
expectedExemplarsCount := 0
if len(testData.expectedIngested) > 0 {
expectedTenantsCount = 1
}
for _, stream := range testData.expectedIngested {
expectedSamplesCount += len(stream.Values)
}
for _, series := range testData.expectedExemplarsIngested {
expectedExemplarsCount += len(series.Exemplars)
}

i.updateUsageStats()

assert.Equal(t, int64(len(testData.expectedIngested)), usagestats.GetInt(memorySeriesStatsName).Value())
assert.Equal(t, int64(expectedTenantsCount), usagestats.GetInt(memoryTenantsStatsName).Value())
assert.Equal(t, int64(expectedSamplesCount), usagestats.GetCounter(appendedSamplesStatsName).Total())
assert.Equal(t, int64(expectedExemplarsCount), usagestats.GetCounter(appendedExemplarsStatsName).Total())
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,9 @@ func (t *Mimir) initUsageStats() (services.Service, error) {
return nil, err
}

// Track anonymous usage statistics.
usagestats.GetString("blocks_storage_backend").Set(t.Cfg.BlocksStorage.Bucket.Backend)

t.UsageStatsReporter = usagestats.NewReporter(bucketClient, util_log.Logger, prometheus.DefaultRegisterer)
return t.UsageStatsReporter, nil
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/usagestats/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-License-Identifier: AGPL-3.0-only

package usagestats

import (
"net/http"
)

// RequestsMiddleware tracks the number of requests.
type RequestsMiddleware struct {
counter *Counter
}

// NewRequestsMiddleware makes a new RequestsMiddleware.
func NewRequestsMiddleware(name string) *RequestsMiddleware {
return &RequestsMiddleware{
counter: GetCounter(name),
}
}

// Wrap implements middleware.Interface.
func (m *RequestsMiddleware) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
m.counter.Inc(1)
next.ServeHTTP(w, r)
})
}
28 changes: 27 additions & 1 deletion pkg/usagestats/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package usagestats
import (
"expvar"
"runtime"
"strings"
"time"

prom "github.com/prometheus/prometheus/web/api/v1"
Expand Down Expand Up @@ -78,11 +79,36 @@ func buildReport(seed ClusterSeed, reportAt time.Time, reportInterval time.Durat

// buildMetrics builds the metrics part of the report to be sent to the stats server.
func buildMetrics() map[string]interface{} {
return map[string]interface{}{
result := map[string]interface{}{
"memstats": buildMemstats(),
"num_cpu": runtime.NumCPU(),
"num_goroutine": runtime.NumGoroutine(),
}

expvar.Do(func(kv expvar.KeyValue) {
if !strings.HasPrefix(kv.Key, statsPrefix) || kv.Key == statsPrefix+targetKey || kv.Key == statsPrefix+editionKey {
return
}

var value interface{}
switch v := kv.Value.(type) {
case *expvar.Int:
value = v.Value()
case *expvar.String:
value = v.Value()
case *Counter:
v.updateRate()
value = v.Value()
v.reset()
default:
// Unsupported.
return
}

result[strings.TrimPrefix(kv.Key, statsPrefix)] = value
})

return result
}

func buildMemstats() interface{} {
Expand Down
10 changes: 10 additions & 0 deletions pkg/usagestats/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/util/version"
)
Expand All @@ -25,6 +26,10 @@ func TestBuildReport(t *testing.T) {
version.Branch = "dev-branch"
version.Revision = "dev-revision"

GetString("custom_string").Set("my_value")
GetInt("custom_int").Set(111)
GetCounter("custom_counter").Inc(222)

report := buildReport(seed, reportAt, reportInterval)
assert.Equal(t, "test", report.ClusterID)
assert.Equal(t, clusterCreatedAt, report.CreatedAt)
Expand All @@ -39,4 +44,9 @@ func TestBuildReport(t *testing.T) {
assert.Equal(t, "dev-branch", report.Version.Branch)
assert.Equal(t, "dev-revision", report.Version.Revision)
assert.Equal(t, runtime.Version(), report.Version.GoVersion)
assert.Equal(t, "my_value", report.Metrics["custom_string"])
assert.Equal(t, int64(111), report.Metrics["custom_int"])

require.IsType(t, map[string]interface{}{}, report.Metrics["custom_counter"])
assert.Equal(t, int64(222), report.Metrics["custom_counter"].(map[string]interface{})["total"])
}
6 changes: 4 additions & 2 deletions pkg/usagestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
)

const (
// DefaultReportSendInterval is the interval at which anonymous usage statistics are reported.
DefaultReportSendInterval = 4 * time.Hour

defaultReportCheckInterval = time.Minute
defaultReportSendInterval = 4 * time.Hour
defaultStatsServerURL = "https://stats.grafana.org/mimir-usage-report"
)

Expand Down Expand Up @@ -74,7 +76,7 @@ func NewReporter(bucketClient objstore.InstrumentedBucket, logger log.Logger, re
client: http.Client{Timeout: 5 * time.Second},
serverURL: defaultStatsServerURL,
reportCheckInterval: defaultReportCheckInterval,
reportSendInterval: defaultReportSendInterval,
reportSendInterval: DefaultReportSendInterval,
seedFileMinStability: clusterSeedFileMinStability,

requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down
Loading