Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes memberlist usage report #5369

Merged
merged 2 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (t *Loki) setupModuleManager() error {
// Add dependencies
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
UsageReport: {},
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, Server},
TenantConfigs: {RuntimeConfig},
Expand Down Expand Up @@ -540,6 +541,12 @@ func (t *Loki) setupModuleManager() error {
t.deps = deps
t.ModuleManager = mm

if t.isModuleActive(Ingester) {
if err := mm.AddDependency(UsageReport, Ring); err != nil {
return err
}
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
usagestats.JSONCodec,
}

dnsProviderReg := prometheus.WrapRegistererWithPrefix(
Expand Down
54 changes: 49 additions & 5 deletions pkg/usagestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ const (

var (
reportCheckInterval = time.Minute
reportInterval = 1 * time.Hour
reportInterval = 4 * time.Hour

stabilityCheckInterval = 5 * time.Second
stabilityMinimunRequired = 6
)

type Config struct {
Expand Down Expand Up @@ -80,11 +83,12 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
return nil
}
// Try to become leader via the kv client
for backoff := backoff.New(ctx, backoff.Config{
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: time.Minute,
MaxRetries: 0,
}); ; backoff.Ongoing() {
})
for backoff.Ongoing() {
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
Expand All @@ -94,16 +98,19 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
}
}
return seed, true, nil
return &seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
// ensure stability of the cluster seed
stableSeed := ensureStableKey(ctx, kvClient, rep.logger)
seed = *stableSeed
// Fetch the remote cluster seed.
remoteSeed, err := rep.fetchSeed(ctx,
func(err error) bool {
Expand All @@ -115,14 +122,50 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
// we are the leader and we need to save the file.
if err := rep.writeSeedFile(ctx, seed); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
backoff.Wait()
continue
}
return &seed
}
backoff.Wait()
continue
}
return remoteSeed
}
return nil
}

// ensureStableKey ensures that the cluster seed is stable for at least 30seconds.
// This is required when using gossiping kv client like memberlist which will never have the same seed
// but will converge eventually.
func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger) *ClusterSeed {
var (
previous *ClusterSeed
stableCount int
)
for {
time.Sleep(stabilityCheckInterval)
value, err := kvClient.Get(ctx, seedKey)
if err != nil {
level.Debug(logger).Log("msg", "failed to get cluster seed key for stability check", "err", err)
continue
}
if seed, ok := value.(*ClusterSeed); ok && seed != nil {
if previous == nil {
previous = seed
continue
}
if previous.UID != seed.UID {
previous = seed
stableCount = 0
continue
}
stableCount++
if stableCount > stabilityMinimunRequired {
return seed
}
}
}
}

func (rep *Reporter) init(ctx context.Context) {
Expand Down Expand Up @@ -161,6 +204,7 @@ func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) b
readingErr = 0
}
if continueFn == nil || continueFn(err) {
backoff.Wait()
continue
}
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions pkg/usagestats/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
var metrics = storage.NewClientMetrics()

func Test_LeaderElection(t *testing.T) {
stabilityCheckInterval = 100 * time.Millisecond

result := make(chan *ClusterSeed, 10)
objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
Expand Down Expand Up @@ -71,6 +73,7 @@ func Test_ReportLoop(t *testing.T) {
// stub
reportCheckInterval = 100 * time.Millisecond
reportInterval = time.Second
stabilityCheckInterval = 100 * time.Millisecond

totalReport := 0
clusterIDs := []string{}
Expand All @@ -94,12 +97,11 @@ func Test_ReportLoop(t *testing.T) {
Store: "inmemory",
}, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)

r.initLeader(context.Background())
ctx, cancel := context.WithCancel(context.Background())
r.initLeader(ctx)

go func() {
<-time.After(6 * time.Second)
<-time.After(6*time.Second + (stabilityCheckInterval * time.Duration(stabilityMinimunRequired+1)))
cancel()
}()
require.Equal(t, context.Canceled, r.running(ctx))
Expand Down
51 changes: 51 additions & 0 deletions pkg/usagestats/seed.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,69 @@
package usagestats

import (
"fmt"
"time"

jsoniter "github.com/json-iterator/go"
prom "github.com/prometheus/prometheus/web/api/v1"

"github.com/grafana/dskit/kv/memberlist"
)

// ClusterSeed is the seed for the usage stats.
// A unique ID is generated for each cluster.
type ClusterSeed struct {
UID string `json:"UID"`
CreatedAt time.Time `json:"created_at"`
prom.PrometheusVersion `json:"version"`
}

// Merge implements the memberlist.Mergeable interface.
// It allow to merge the content of two different seeds.
func (c *ClusterSeed) Merge(mergeable memberlist.Mergeable, localCAS bool) (change memberlist.Mergeable, error error) {
if mergeable == nil {
return nil, nil
}
other, ok := mergeable.(*ClusterSeed)
if !ok {
return nil, fmt.Errorf("expected *usagestats.ClusterSeed, got %T", mergeable)
}
if other == nil {
return nil, nil
}
// if we already have (c) the oldest key, then should not request change.
if c.CreatedAt.Before(other.CreatedAt) {
return nil, nil
}
if c.CreatedAt == other.CreatedAt {
// if we have the exact same creation date but the key is different
// we take the smallest UID using string alphabetical comparison to ensure stability.
if c.UID > other.UID {
*c = *other
return other, nil
}
return nil, nil
}
// if our seed is not the oldest, then we should request a change.
*c = *other
return other, nil
}

// MergeContent tells if the content of the two seeds are the same.
func (c *ClusterSeed) MergeContent() []string {
return []string{c.UID}
}

// RemoveTombstones is not required for usagestats
func (c *ClusterSeed) RemoveTombstones(limit time.Time) (total, removed int) {
return 0, 0
}

func (c *ClusterSeed) Clone() memberlist.Mergeable {
new := *c
return &new
}

var JSONCodec = jsonCodec{}

type jsonCodec struct{}
Expand Down
103 changes: 103 additions & 0 deletions pkg/usagestats/seed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package usagestats

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/storage"
)

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func createMemberlist(t *testing.T, port, memberID int) *memberlist.KV {
t.Helper()
var cfg memberlist.KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: []string{"localhost"},
BindPort: 0,
}
cfg.GossipInterval = 100 * time.Millisecond
cfg.GossipNodes = 3
cfg.PushPullInterval = 5 * time.Second
cfg.NodeName = fmt.Sprintf("Member-%d", memberID)
cfg.Codecs = []codec.Codec{JSONCodec}

mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
if port != 0 {
_, err := mkv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", port)})
require.NoError(t, err, "%s failed to join the cluster: %v", memberID, err)
}
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.TODO(), mkv)
})
return mkv
}

func Test_Memberlist(t *testing.T) {
stabilityCheckInterval = time.Second

objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
Directory: t.TempDir(),
},
}, metrics)
require.NoError(t, err)
result := make(chan *ClusterSeed, 10)

// create a first memberlist to get a valid listening port.
initMKV := createMemberlist(t, 0, -1)

for i := 0; i < 10; i++ {
go func(i int) {
leader, err := NewReporter(Config{
Leader: true,
}, kv.Config{
Store: "memberlist",
StoreConfig: kv.StoreConfig{
MemberlistKV: func() (*memberlist.KV, error) {
return createMemberlist(t, initMKV.GetListeningPort(), i), nil
},
},
}, objectClient, log.NewLogfmtLogger(os.Stdout), nil)
require.NoError(t, err)
leader.init(context.Background())
result <- leader.cluster
}(i)
}

var UID []string
for i := 0; i < 10; i++ {
cluster := <-result
require.NotNil(t, cluster)
UID = append(UID, cluster.UID)
}
first := UID[0]
for _, uid := range UID {
require.Equal(t, first, uid)
}
}
2 changes: 2 additions & 0 deletions pkg/usagestats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Report struct {
ClusterID string `json:"clusterID"`
CreatedAt time.Time `json:"createdAt"`
Interval time.Time `json:"interval"`
IntervalPeriod float64 `json:"intervalPeriod"`
Target string `json:"target"`
prom.PrometheusVersion `json:"version"`
Os string `json:"os"`
Expand Down Expand Up @@ -92,6 +93,7 @@ func buildReport(seed *ClusterSeed, interval time.Time) Report {
PrometheusVersion: build.GetVersion(),
CreatedAt: seed.CreatedAt,
Interval: interval,
IntervalPeriod: reportInterval.Seconds(),
Os: runtime.GOOS,
Arch: runtime.GOARCH,
Target: targetName,
Expand Down