Skip to content

Commit

Permalink
Fixes memberlist usage report (#5369)
Browse files Browse the repository at this point in the history
* Fixes memberlist usage report

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes the linter and improve comment

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Feb 11, 2022
1 parent b19ce62 commit d44b689
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 8 deletions.
7 changes: 7 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (t *Loki) setupModuleManager() error {
// Add dependencies
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
UsageReport: {},
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, Server},
TenantConfigs: {RuntimeConfig},
Expand Down Expand Up @@ -540,6 +541,12 @@ func (t *Loki) setupModuleManager() error {
t.deps = deps
t.ModuleManager = mm

if t.isModuleActive(Ingester) {
if err := mm.AddDependency(UsageReport, Ring); err != nil {
return err
}
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
usagestats.JSONCodec,
}

dnsProviderReg := prometheus.WrapRegistererWithPrefix(
Expand Down
54 changes: 49 additions & 5 deletions pkg/usagestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ const (

var (
reportCheckInterval = time.Minute
reportInterval = 1 * time.Hour
reportInterval = 4 * time.Hour

stabilityCheckInterval = 5 * time.Second
stabilityMinimunRequired = 6
)

type Config struct {
Expand Down Expand Up @@ -80,11 +83,12 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
return nil
}
// Try to become leader via the kv client
for backoff := backoff.New(ctx, backoff.Config{
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: time.Minute,
MaxRetries: 0,
}); ; backoff.Ongoing() {
})
for backoff.Ongoing() {
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
Expand All @@ -94,16 +98,19 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
}
}
return seed, true, nil
return &seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
// ensure stability of the cluster seed
stableSeed := ensureStableKey(ctx, kvClient, rep.logger)
seed = *stableSeed
// Fetch the remote cluster seed.
remoteSeed, err := rep.fetchSeed(ctx,
func(err error) bool {
Expand All @@ -115,14 +122,50 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
// we are the leader and we need to save the file.
if err := rep.writeSeedFile(ctx, seed); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
backoff.Wait()
continue
}
return &seed
}
backoff.Wait()
continue
}
return remoteSeed
}
return nil
}

// ensureStableKey ensures that the cluster seed is stable for at least 30seconds.
// This is required when using gossiping kv client like memberlist which will never have the same seed
// but will converge eventually.
func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger) *ClusterSeed {
var (
previous *ClusterSeed
stableCount int
)
for {
time.Sleep(stabilityCheckInterval)
value, err := kvClient.Get(ctx, seedKey)
if err != nil {
level.Debug(logger).Log("msg", "failed to get cluster seed key for stability check", "err", err)
continue
}
if seed, ok := value.(*ClusterSeed); ok && seed != nil {
if previous == nil {
previous = seed
continue
}
if previous.UID != seed.UID {
previous = seed
stableCount = 0
continue
}
stableCount++
if stableCount > stabilityMinimunRequired {
return seed
}
}
}
}

func (rep *Reporter) init(ctx context.Context) {
Expand Down Expand Up @@ -161,6 +204,7 @@ func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) b
readingErr = 0
}
if continueFn == nil || continueFn(err) {
backoff.Wait()
continue
}
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions pkg/usagestats/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
var metrics = storage.NewClientMetrics()

func Test_LeaderElection(t *testing.T) {
stabilityCheckInterval = 100 * time.Millisecond

result := make(chan *ClusterSeed, 10)
objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
Expand Down Expand Up @@ -71,6 +73,7 @@ func Test_ReportLoop(t *testing.T) {
// stub
reportCheckInterval = 100 * time.Millisecond
reportInterval = time.Second
stabilityCheckInterval = 100 * time.Millisecond

totalReport := 0
clusterIDs := []string{}
Expand All @@ -94,12 +97,11 @@ func Test_ReportLoop(t *testing.T) {
Store: "inmemory",
}, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)

r.initLeader(context.Background())
ctx, cancel := context.WithCancel(context.Background())
r.initLeader(ctx)

go func() {
<-time.After(6 * time.Second)
<-time.After(6*time.Second + (stabilityCheckInterval * time.Duration(stabilityMinimunRequired+1)))
cancel()
}()
require.Equal(t, context.Canceled, r.running(ctx))
Expand Down
51 changes: 51 additions & 0 deletions pkg/usagestats/seed.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,69 @@
package usagestats

import (
"fmt"
"time"

jsoniter "github.com/json-iterator/go"
prom "github.com/prometheus/prometheus/web/api/v1"

"github.com/grafana/dskit/kv/memberlist"
)

// ClusterSeed is the seed for the usage stats.
// A unique ID is generated for each cluster.
type ClusterSeed struct {
UID string `json:"UID"`
CreatedAt time.Time `json:"created_at"`
prom.PrometheusVersion `json:"version"`
}

// Merge implements the memberlist.Mergeable interface.
// It allow to merge the content of two different seeds.
func (c *ClusterSeed) Merge(mergeable memberlist.Mergeable, localCAS bool) (change memberlist.Mergeable, error error) {
if mergeable == nil {
return nil, nil
}
other, ok := mergeable.(*ClusterSeed)
if !ok {
return nil, fmt.Errorf("expected *usagestats.ClusterSeed, got %T", mergeable)
}
if other == nil {
return nil, nil
}
// if we already have (c) the oldest key, then should not request change.
if c.CreatedAt.Before(other.CreatedAt) {
return nil, nil
}
if c.CreatedAt == other.CreatedAt {
// if we have the exact same creation date but the key is different
// we take the smallest UID using string alphabetical comparison to ensure stability.
if c.UID > other.UID {
*c = *other
return other, nil
}
return nil, nil
}
// if our seed is not the oldest, then we should request a change.
*c = *other
return other, nil
}

// MergeContent tells if the content of the two seeds are the same.
func (c *ClusterSeed) MergeContent() []string {
return []string{c.UID}
}

// RemoveTombstones is not required for usagestats
func (c *ClusterSeed) RemoveTombstones(limit time.Time) (total, removed int) {
return 0, 0
}

func (c *ClusterSeed) Clone() memberlist.Mergeable {
new := *c
return &new
}

var JSONCodec = jsonCodec{}

type jsonCodec struct{}
Expand Down
103 changes: 103 additions & 0 deletions pkg/usagestats/seed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package usagestats

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/storage"
)

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func createMemberlist(t *testing.T, port, memberID int) *memberlist.KV {
t.Helper()
var cfg memberlist.KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: []string{"localhost"},
BindPort: 0,
}
cfg.GossipInterval = 100 * time.Millisecond
cfg.GossipNodes = 3
cfg.PushPullInterval = 5 * time.Second
cfg.NodeName = fmt.Sprintf("Member-%d", memberID)
cfg.Codecs = []codec.Codec{JSONCodec}

mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
if port != 0 {
_, err := mkv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", port)})
require.NoError(t, err, "%s failed to join the cluster: %v", memberID, err)
}
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.TODO(), mkv)
})
return mkv
}

func Test_Memberlist(t *testing.T) {
stabilityCheckInterval = time.Second

objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
Directory: t.TempDir(),
},
}, metrics)
require.NoError(t, err)
result := make(chan *ClusterSeed, 10)

// create a first memberlist to get a valid listening port.
initMKV := createMemberlist(t, 0, -1)

for i := 0; i < 10; i++ {
go func(i int) {
leader, err := NewReporter(Config{
Leader: true,
}, kv.Config{
Store: "memberlist",
StoreConfig: kv.StoreConfig{
MemberlistKV: func() (*memberlist.KV, error) {
return createMemberlist(t, initMKV.GetListeningPort(), i), nil
},
},
}, objectClient, log.NewLogfmtLogger(os.Stdout), nil)
require.NoError(t, err)
leader.init(context.Background())
result <- leader.cluster
}(i)
}

var UID []string
for i := 0; i < 10; i++ {
cluster := <-result
require.NotNil(t, cluster)
UID = append(UID, cluster.UID)
}
first := UID[0]
for _, uid := range UID {
require.Equal(t, first, uid)
}
}
2 changes: 2 additions & 0 deletions pkg/usagestats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Report struct {
ClusterID string `json:"clusterID"`
CreatedAt time.Time `json:"createdAt"`
Interval time.Time `json:"interval"`
IntervalPeriod float64 `json:"intervalPeriod"`
Target string `json:"target"`
prom.PrometheusVersion `json:"version"`
Os string `json:"os"`
Expand Down Expand Up @@ -92,6 +93,7 @@ func buildReport(seed *ClusterSeed, interval time.Time) Report {
PrometheusVersion: build.GetVersion(),
CreatedAt: seed.CreatedAt,
Interval: interval,
IntervalPeriod: reportInterval.Seconds(),
Os: runtime.GOOS,
Arch: runtime.GOARCH,
Target: targetName,
Expand Down

0 comments on commit d44b689

Please sign in to comment.