Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically send anonymous usage stats report #2662

Merged
merged 3 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,9 @@ func (t *Mimir) Run() error {
level.Warn(util_log.Logger).Log("msg", "skipped registration of custom process metrics collector", "err", err)
}

// Update the usage stats before we initialize modules.
usagestats.SetTarget(t.Cfg.Target.String())

for _, module := range t.Cfg.Target {
if !t.ModuleManager.IsUserVisibleModule(module) {
level.Warn(util_log.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func (t *Mimir) initUsageStats() (services.Service, error) {
return nil, err
}

t.UsageStatsReporter = usagestats.NewReporter(bucketClient, util_log.Logger)
t.UsageStatsReporter = usagestats.NewReporter(bucketClient, util_log.Logger, prometheus.DefaultRegisterer)
return t.UsageStatsReporter, nil
}

Expand Down
112 changes: 112 additions & 0 deletions pkg/usagestats/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// SPDX-License-Identifier: AGPL-3.0-only

package usagestats

import (
"expvar"
"runtime"
"time"

prom "github.com/prometheus/prometheus/web/api/v1"

"github.com/grafana/mimir/pkg/util/version"
)

// Report is the JSON object sent to the stats server
type Report struct {
// ClusterID is the unique Mimir cluster ID.
ClusterID string `json:"clusterID"`

// CreatedAt is when the cluster was created.
CreatedAt time.Time `json:"createdAt"`

// Interval is when the report was created (value is aligned across all replicas of the same Mimir cluster).
Interval time.Time `json:"interval"`
colega marked this conversation as resolved.
Show resolved Hide resolved

// IntervalPeriod is how frequently the report is sent, in seconds.
IntervalPeriod float64 `json:"intervalPeriod"`

// Target used to run Mimir.
Target string `json:"target"`

// Version holds information about the Mimir version.
Version prom.PrometheusVersion `json:"version"`

// Os is the operating system where Mimir is running.
Os string `json:"os"`

// Arch is the architecture where Mimir is running.
Arch string `json:"arch"`

// Edition is the Mimir edition ("oss" or "enterprise").
Edition string `json:"edition"`

// Metrics holds custom metrics tracked by Mimir. Can contain nested objects.
Metrics map[string]interface{} `json:"metrics"`
}

// buildReport builds the report to be sent to the stats server.
func buildReport(seed ClusterSeed, reportAt time.Time, reportInterval time.Duration) Report {
var (
targetName string
editionName string
)
if target := expvar.Get(statsPrefix + targetKey); target != nil {
if target, ok := target.(*expvar.String); ok {
targetName = target.Value()
}
}
if edition := expvar.Get(statsPrefix + editionKey); edition != nil {
if edition, ok := edition.(*expvar.String); ok {
editionName = edition.Value()
}
}
colega marked this conversation as resolved.
Show resolved Hide resolved

return Report{
ClusterID: seed.UID,
CreatedAt: seed.CreatedAt,
Version: buildVersion(),
Interval: reportAt,
IntervalPeriod: reportInterval.Seconds(),
Os: runtime.GOOS,
Arch: runtime.GOARCH,
Target: targetName,
Edition: editionName,
Metrics: buildMetrics(),
}
}

// buildMetrics builds the metrics part of the report to be sent to the stats server.
func buildMetrics() map[string]interface{} {
return map[string]interface{}{
"memstats": buildMemstats(),
"num_cpu": runtime.NumCPU(),
"num_goroutine": runtime.NumGoroutine(),
}
}

func buildMemstats() interface{} {
stats := new(runtime.MemStats)
runtime.ReadMemStats(stats)

return map[string]interface{}{
"alloc": stats.Alloc,
"total_alloc": stats.TotalAlloc,
"sys": stats.Sys,
"heap_alloc": stats.HeapAlloc,
"heap_inuse": stats.HeapInuse,
"stack_inuse": stats.StackInuse,
"pause_total_ns": stats.PauseTotalNs,
"num_gc": stats.NumGC,
"gc_cpu_fraction": stats.GCCPUFraction,
}
}

func buildVersion() prom.PrometheusVersion {
return prom.PrometheusVersion{
Version: version.Version,
Revision: version.Revision,
Branch: version.Branch,
GoVersion: version.GoVersion,
}
}
42 changes: 42 additions & 0 deletions pkg/usagestats/report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-License-Identifier: AGPL-3.0-only

package usagestats

import (
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/grafana/mimir/pkg/util/version"
)

func TestBuildReport(t *testing.T) {
var (
clusterCreatedAt = time.Now().Add(-time.Hour)
seed = ClusterSeed{UID: "test", CreatedAt: clusterCreatedAt}
reportAt = time.Now()
reportInterval = time.Hour
)

SetTarget("all")
version.Version = "dev-version"
version.Branch = "dev-branch"
version.Revision = "dev-revision"

report := buildReport(seed, reportAt, reportInterval)
assert.Equal(t, "test", report.ClusterID)
assert.Equal(t, clusterCreatedAt, report.CreatedAt)
assert.Equal(t, reportAt, report.Interval)
assert.Equal(t, reportInterval.Seconds(), report.IntervalPeriod)
assert.Equal(t, "all", report.Target)
assert.Equal(t, runtime.GOOS, report.Os)
assert.Equal(t, runtime.GOARCH, report.Arch)
assert.Equal(t, "oss", report.Edition)
assert.NotNil(t, report.Metrics["memstats"])
assert.Equal(t, "dev-version", report.Version.Version)
assert.Equal(t, "dev-branch", report.Version.Branch)
assert.Equal(t, "dev-revision", report.Version.Revision)
assert.Equal(t, runtime.Version(), report.Version.GoVersion)
}
153 changes: 147 additions & 6 deletions pkg/usagestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,34 @@
package usagestats

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"math"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/grafana/dskit/services"

"github.com/grafana/mimir/pkg/storage/bucket"
)

const (
defaultReportCheckInterval = time.Minute
defaultReportSendInterval = 4 * time.Hour
defaultStatsServerURL = "https://stats.grafana.org/mimir-usage-report"
Copy link
Collaborator Author

@pracucci pracucci Aug 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: the server side API endpoint doesn't exist yet. I'm building the support in Mimir following the Loki specs.

)

type Config struct {
Enabled bool `yaml:"enabled" category:"experimental"`
}
Expand All @@ -28,24 +44,60 @@ type Reporter struct {
logger log.Logger
bucket objstore.InstrumentedBucket

// How frequently check if there's a report to send.
reportCheckInterval time.Duration

// How frequently send a new report.
reportSendInterval time.Duration

// How long to wait for a cluster seed file creation before using it.
seedFileMinStability time.Duration

client http.Client
serverURL string

services.Service

// Metrics.
requestsTotal prometheus.Counter
requestsFailedTotal prometheus.Counter
requestsLatency prometheus.Histogram
}

func NewReporter(bucketClient objstore.InstrumentedBucket, logger log.Logger) *Reporter {
func NewReporter(bucketClient objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) *Reporter {
// The cluster seed file is stored in a prefix dedicated to Mimir internals.
bucketClient = bucket.NewPrefixedBucketClient(bucketClient, bucket.MimirInternalsPrefix)

r := &Reporter{
logger: logger,
bucket: bucketClient,
logger: logger,
bucket: bucketClient,
client: http.Client{Timeout: 5 * time.Second},
serverURL: defaultStatsServerURL,
reportCheckInterval: defaultReportCheckInterval,
reportSendInterval: defaultReportSendInterval,
seedFileMinStability: clusterSeedFileMinStability,

requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_usage_stats_report_sends_total",
Help: "The total number of attempted send requests.",
}),
requestsFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_usage_stats_report_sends_failed_total",
Help: "The total number of failed send requests.",
}),
requestsLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_usage_stats_report_sends_latency_seconds",
Help: "The latency of report send requests in seconds (include both succeeded and failed requests).",
Buckets: prometheus.DefBuckets,
}),
}
r.Service = services.NewBasicService(nil, r.running, nil)
return r
}

func (r *Reporter) running(ctx context.Context) error {
// Init or get the cluster seed.
seed, err := initSeedFile(ctx, r.bucket, clusterSeedFileMinStability, r.logger)
seed, err := initSeedFile(ctx, r.bucket, r.seedFileMinStability, r.logger)
if errors.Is(err, context.Canceled) {
return nil
}
Expand All @@ -55,7 +107,96 @@ func (r *Reporter) running(ctx context.Context) error {

level.Info(r.logger).Log("msg", "usage stats reporter initialized", "cluster_id", seed.UID)

// TODO Periodically send usage report.
// Find when to send the next report. We want all instances of the same Mimir cluster computing the same value.
nextReportAt := nextReport(r.reportSendInterval, seed.CreatedAt, time.Now())

ticker := time.NewTicker(r.reportCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if time.Now().Before(nextReportAt) {
continue
}

// If the send is failing since a long time and the report is falling behind,
// we'll skip this one and try to send the next one.
if time.Since(nextReportAt) >= r.reportSendInterval {
nextReportAt = nextReport(r.reportSendInterval, seed.CreatedAt, time.Now())
level.Info(r.logger).Log("msg", "failed to send anonymous usage stats report for too long, skipping to next report", "next_report_at", nextReportAt.String())
continue
}

level.Debug(r.logger).Log("msg", "sending anonymous usage stats report")
if err := r.sendReport(ctx, buildReport(seed, nextReportAt, r.reportSendInterval)); err != nil {
level.Info(r.logger).Log("msg", "failed to send anonymous usage stats report", "err", err)

// We'll try at the next check interval.
continue
}
colega marked this conversation as resolved.
Show resolved Hide resolved

nextReportAt = nextReport(r.reportSendInterval, seed.CreatedAt, time.Now())
case <-ctx.Done():
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offtopic: I feel like this should be done in the BasicService, not in all the service implementers.

return err
}
return nil
}
}
}

// sendReport sends the report to the stats server.
func (r *Reporter) sendReport(ctx context.Context, report Report) (returnErr error) {
startTime := time.Now()
r.requestsTotal.Inc()

defer func() {
r.requestsLatency.Observe(time.Since(startTime).Seconds())
if returnErr != nil {
r.requestsFailedTotal.Inc()
}
}()

data, err := json.Marshal(report)
if err != nil {
return errors.Wrap(err, "marshal the report")
}
req, err := http.NewRequest(http.MethodPost, r.serverURL, bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "create the request")
}

req.Header.Set("Content-Type", "application/json")
resp, err := r.client.Do(req.WithContext(ctx))
if err != nil {
return errors.Wrap(err, "send the report to the stats server")
}

// Ensure the body reader is always closed.
defer resp.Body.Close()

// Consume all the response.
body, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "read the response from the stats server")
}

if resp.StatusCode/100 != 2 {
// Limit the body response that we log.
maxBodyLength := 128
if len(body) > maxBodyLength {
body = body[:maxBodyLength]
}
return fmt.Errorf("received status code: %s and body: %q", resp.Status, string(body))
}

return nil
}

// nextReport compute the next report time based on the interval.
// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time.
func nextReport(interval time.Duration, createdAt, now time.Time) time.Time {
// createdAt * (x * interval ) >= now
return createdAt.Add(time.Duration(math.Ceil(float64(now.Sub(createdAt))/float64(interval))) * interval)
}
Loading