Skip to content

Commit

Permalink
Periodically send anonymous usage stats report (#2662)
Browse files Browse the repository at this point in the history
* Periodically send anonymous usage stats report

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Addressed review comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Addressed review comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Aug 8, 2022
1 parent 823b74c commit dd7ff02
Show file tree
Hide file tree
Showing 8 changed files with 655 additions and 12 deletions.
3 changes: 3 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,9 @@ func (t *Mimir) Run() error {
level.Warn(util_log.Logger).Log("msg", "skipped registration of custom process metrics collector", "err", err)
}

// Update the usage stats before we initialize modules.
usagestats.SetTarget(t.Cfg.Target.String())

for _, module := range t.Cfg.Target {
if !t.ModuleManager.IsUserVisibleModule(module) {
level.Warn(util_log.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func (t *Mimir) initUsageStats() (services.Service, error) {
return nil, err
}

t.UsageStatsReporter = usagestats.NewReporter(bucketClient, util_log.Logger)
t.UsageStatsReporter = usagestats.NewReporter(bucketClient, util_log.Logger, prometheus.DefaultRegisterer)
return t.UsageStatsReporter, nil
}

Expand Down
112 changes: 112 additions & 0 deletions pkg/usagestats/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// SPDX-License-Identifier: AGPL-3.0-only

package usagestats

import (
"expvar"
"runtime"
"time"

prom "github.com/prometheus/prometheus/web/api/v1"

"github.com/grafana/mimir/pkg/util/version"
)

// Report is the JSON object sent to the stats server
type Report struct {
// ClusterID is the unique Mimir cluster ID.
ClusterID string `json:"clusterID"`

// CreatedAt is when the cluster was created.
CreatedAt time.Time `json:"createdAt"`

// Interval is when the report was created (value is aligned across all replicas of the same Mimir cluster).
Interval time.Time `json:"interval"`

// IntervalPeriod is how frequently the report is sent, in seconds.
IntervalPeriod float64 `json:"intervalPeriod"`

// Target used to run Mimir.
Target string `json:"target"`

// Version holds information about the Mimir version.
Version prom.PrometheusVersion `json:"version"`

// Os is the operating system where Mimir is running.
Os string `json:"os"`

// Arch is the architecture where Mimir is running.
Arch string `json:"arch"`

// Edition is the Mimir edition ("oss" or "enterprise").
Edition string `json:"edition"`

// Metrics holds custom metrics tracked by Mimir. Can contain nested objects.
Metrics map[string]interface{} `json:"metrics"`
}

// buildReport builds the report to be sent to the stats server.
func buildReport(seed ClusterSeed, reportAt time.Time, reportInterval time.Duration) Report {
var (
targetName string
editionName string
)
if target := expvar.Get(statsPrefix + targetKey); target != nil {
if target, ok := target.(*expvar.String); ok {
targetName = target.Value()
}
}
if edition := expvar.Get(statsPrefix + editionKey); edition != nil {
if edition, ok := edition.(*expvar.String); ok {
editionName = edition.Value()
}
}

return Report{
ClusterID: seed.UID,
CreatedAt: seed.CreatedAt,
Version: buildVersion(),
Interval: reportAt,
IntervalPeriod: reportInterval.Seconds(),
Os: runtime.GOOS,
Arch: runtime.GOARCH,
Target: targetName,
Edition: editionName,
Metrics: buildMetrics(),
}
}

// buildMetrics builds the metrics part of the report to be sent to the stats server.
func buildMetrics() map[string]interface{} {
return map[string]interface{}{
"memstats": buildMemstats(),
"num_cpu": runtime.NumCPU(),
"num_goroutine": runtime.NumGoroutine(),
}
}

func buildMemstats() interface{} {
stats := new(runtime.MemStats)
runtime.ReadMemStats(stats)

return map[string]interface{}{
"alloc": stats.Alloc,
"total_alloc": stats.TotalAlloc,
"sys": stats.Sys,
"heap_alloc": stats.HeapAlloc,
"heap_inuse": stats.HeapInuse,
"stack_inuse": stats.StackInuse,
"pause_total_ns": stats.PauseTotalNs,
"num_gc": stats.NumGC,
"gc_cpu_fraction": stats.GCCPUFraction,
}
}

func buildVersion() prom.PrometheusVersion {
return prom.PrometheusVersion{
Version: version.Version,
Revision: version.Revision,
Branch: version.Branch,
GoVersion: version.GoVersion,
}
}
42 changes: 42 additions & 0 deletions pkg/usagestats/report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-License-Identifier: AGPL-3.0-only

package usagestats

import (
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/grafana/mimir/pkg/util/version"
)

func TestBuildReport(t *testing.T) {
var (
clusterCreatedAt = time.Now().Add(-time.Hour)
seed = ClusterSeed{UID: "test", CreatedAt: clusterCreatedAt}
reportAt = time.Now()
reportInterval = time.Hour
)

SetTarget("all")
version.Version = "dev-version"
version.Branch = "dev-branch"
version.Revision = "dev-revision"

report := buildReport(seed, reportAt, reportInterval)
assert.Equal(t, "test", report.ClusterID)
assert.Equal(t, clusterCreatedAt, report.CreatedAt)
assert.Equal(t, reportAt, report.Interval)
assert.Equal(t, reportInterval.Seconds(), report.IntervalPeriod)
assert.Equal(t, "all", report.Target)
assert.Equal(t, runtime.GOOS, report.Os)
assert.Equal(t, runtime.GOARCH, report.Arch)
assert.Equal(t, "oss", report.Edition)
assert.NotNil(t, report.Metrics["memstats"])
assert.Equal(t, "dev-version", report.Version.Version)
assert.Equal(t, "dev-branch", report.Version.Branch)
assert.Equal(t, "dev-revision", report.Version.Revision)
assert.Equal(t, runtime.Version(), report.Version.GoVersion)
}
153 changes: 147 additions & 6 deletions pkg/usagestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,34 @@
package usagestats

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"math"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/grafana/dskit/services"

"github.com/grafana/mimir/pkg/storage/bucket"
)

const (
defaultReportCheckInterval = time.Minute
defaultReportSendInterval = 4 * time.Hour
defaultStatsServerURL = "https://stats.grafana.org/mimir-usage-report"
)

type Config struct {
Enabled bool `yaml:"enabled" category:"experimental"`
}
Expand All @@ -28,24 +44,60 @@ type Reporter struct {
logger log.Logger
bucket objstore.InstrumentedBucket

// How frequently check if there's a report to send.
reportCheckInterval time.Duration

// How frequently send a new report.
reportSendInterval time.Duration

// How long to wait for a cluster seed file creation before using it.
seedFileMinStability time.Duration

client http.Client
serverURL string

services.Service

// Metrics.
requestsTotal prometheus.Counter
requestsFailedTotal prometheus.Counter
requestsLatency prometheus.Histogram
}

func NewReporter(bucketClient objstore.InstrumentedBucket, logger log.Logger) *Reporter {
func NewReporter(bucketClient objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) *Reporter {
// The cluster seed file is stored in a prefix dedicated to Mimir internals.
bucketClient = bucket.NewPrefixedBucketClient(bucketClient, bucket.MimirInternalsPrefix)

r := &Reporter{
logger: logger,
bucket: bucketClient,
logger: logger,
bucket: bucketClient,
client: http.Client{Timeout: 5 * time.Second},
serverURL: defaultStatsServerURL,
reportCheckInterval: defaultReportCheckInterval,
reportSendInterval: defaultReportSendInterval,
seedFileMinStability: clusterSeedFileMinStability,

requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_usage_stats_report_sends_total",
Help: "The total number of attempted send requests.",
}),
requestsFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_usage_stats_report_sends_failed_total",
Help: "The total number of failed send requests.",
}),
requestsLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_usage_stats_report_sends_latency_seconds",
Help: "The latency of report send requests in seconds (include both succeeded and failed requests).",
Buckets: prometheus.DefBuckets,
}),
}
r.Service = services.NewBasicService(nil, r.running, nil)
return r
}

func (r *Reporter) running(ctx context.Context) error {
// Init or get the cluster seed.
seed, err := initSeedFile(ctx, r.bucket, clusterSeedFileMinStability, r.logger)
seed, err := initSeedFile(ctx, r.bucket, r.seedFileMinStability, r.logger)
if errors.Is(err, context.Canceled) {
return nil
}
Expand All @@ -55,7 +107,96 @@ func (r *Reporter) running(ctx context.Context) error {

level.Info(r.logger).Log("msg", "usage stats reporter initialized", "cluster_id", seed.UID)

// TODO Periodically send usage report.
// Find when to send the next report. We want all instances of the same Mimir cluster computing the same value.
nextReportAt := nextReport(r.reportSendInterval, seed.CreatedAt, time.Now())

ticker := time.NewTicker(r.reportCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if time.Now().Before(nextReportAt) {
continue
}

// If the send is failing since a long time and the report is falling behind,
// we'll skip this one and try to send the next one.
if time.Since(nextReportAt) >= r.reportSendInterval {
nextReportAt = nextReport(r.reportSendInterval, seed.CreatedAt, time.Now())
level.Info(r.logger).Log("msg", "failed to send anonymous usage stats report for too long, skipping to next report", "next_report_at", nextReportAt.String())
continue
}

level.Debug(r.logger).Log("msg", "sending anonymous usage stats report")
if err := r.sendReport(ctx, buildReport(seed, nextReportAt, r.reportSendInterval)); err != nil {
level.Info(r.logger).Log("msg", "failed to send anonymous usage stats report", "err", err)

// We'll try at the next check interval.
continue
}

nextReportAt = nextReport(r.reportSendInterval, seed.CreatedAt, time.Now())
case <-ctx.Done():
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
return err
}
return nil
}
}
}

// sendReport sends the report to the stats server.
func (r *Reporter) sendReport(ctx context.Context, report Report) (returnErr error) {
startTime := time.Now()
r.requestsTotal.Inc()

defer func() {
r.requestsLatency.Observe(time.Since(startTime).Seconds())
if returnErr != nil {
r.requestsFailedTotal.Inc()
}
}()

data, err := json.Marshal(report)
if err != nil {
return errors.Wrap(err, "marshal the report")
}
req, err := http.NewRequest(http.MethodPost, r.serverURL, bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "create the request")
}

req.Header.Set("Content-Type", "application/json")
resp, err := r.client.Do(req.WithContext(ctx))
if err != nil {
return errors.Wrap(err, "send the report to the stats server")
}

// Ensure the body reader is always closed.
defer resp.Body.Close()

// Consume all the response.
body, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "read the response from the stats server")
}

if resp.StatusCode/100 != 2 {
// Limit the body response that we log.
maxBodyLength := 128
if len(body) > maxBodyLength {
body = body[:maxBodyLength]
}
return fmt.Errorf("received status code: %s and body: %q", resp.Status, string(body))
}

return nil
}

// nextReport compute the next report time based on the interval.
// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time.
func nextReport(interval time.Duration, createdAt, now time.Time) time.Time {
// createdAt * (x * interval ) >= now
return createdAt.Add(time.Duration(math.Ceil(float64(now.Sub(createdAt))/float64(interval))) * interval)
}
Loading

0 comments on commit dd7ff02

Please sign in to comment.