Skip to content

Commit

Permalink
Revert "updated to the latest version of dskit" (#9953)
Browse files Browse the repository at this point in the history
Reverts #9920
  • Loading branch information
vlad-diachenko authored Jul 17, 2023
1 parent 23e59fa commit be1a9c4
Show file tree
Hide file tree
Showing 43 changed files with 202 additions and 1,149 deletions.
16 changes: 2 additions & 14 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3607,7 +3607,7 @@ The `grpc_client` block configures the gRPC client used to communicate between t
# CLI flag: -<prefix>.grpc-client-rate-limit-burst
[rate_limit_burst: <int> | default = 0]
# Enable backoff and retry when we hit rate limits.
# Enable backoff and retry when we hit ratelimits.
# CLI flag: -<prefix>.backoff-on-ratelimits
[backoff_on_ratelimits: <boolean> | default = false]
Expand All @@ -3624,19 +3624,7 @@ backoff_config:
# CLI flag: -<prefix>.backoff-retries
[max_retries: <int> | default = 10]
# Initial stream window size. Values less than the default are not supported and
# are ignored. Setting this to a value other than the default disables the BDP
# estimator.
# CLI flag: -<prefix>.initial-stream-window-size
[initial_stream_window_size: <int> | default = 63KiB1023B]
# Initial connection window size. Values less than the default are not supported
# and are ignored. Setting this to a value other than the default disables the
# BDP estimator.
# CLI flag: -<prefix>.initial-connection-window-size
[initial_connection_window_size: <int> | default = 63KiB1023B]
# Enable TLS in the gRPC client. This flag needs to be enabled when any other
# Enable TLS in the GRPC client. This flag needs to be enabled when any other
# TLS flag is set. If set to false, insecure connection to gRPC server will be
# used.
# CLI flag: -<prefix>.tls-enabled
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20230706162620-5081d8ed53e6
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1079,8 +1079,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20230706162620-5081d8ed53e6 h1:/19OPOCKP95g9hKLn1mN2dR/qBE4+oEY2F9XZ7G1xJM=
github.com/grafana/dskit v0.0.0-20230706162620-5081d8ed53e6/go.mod h1:M03k2fzuQ2n9TVE1xfVKTESibxsXdw0wYfWT3+9Owp4=
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e h1:ODjv+9dmklDS33O2B4zPgIDKdnji18o9ofD9qWA+mAs=
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e/go.mod h1:M03k2fzuQ2n9TVE1xfVKTESibxsXdw0wYfWT3+9Owp4=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
Expand Down
17 changes: 4 additions & 13 deletions pkg/distributor/instance_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/kv/consul"
"github.com/stretchr/testify/assert"

"github.com/grafana/dskit/ring"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand Down Expand Up @@ -95,18 +91,13 @@ func TestInstanceCountDelegate_CorrectlyInvokesOtherDelegates(t *testing.T) {

sentry1 := map[string]int{}
sentry2 := map[string]int{}
store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var delegate ring.BasicLifecyclerDelegate
delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, 1 /* tokenCount */)
delegate = &sentryDelegate{BasicLifecyclerDelegate: delegate, calls: sentry1} // sentry delegate BEFORE newHealthyInstancesDelegate
delegate = newHealthyInstanceDelegate(counter, time.Second, delegate)
delegate = &sentryDelegate{BasicLifecyclerDelegate: delegate, calls: sentry2} // sentry delegate AFTER newHealthyInstancesDelegate

lifecycler, err := ring.NewBasicLifecycler(ring.BasicLifecyclerConfig{}, "test-ring", "test-ring-key", store, delegate, log.NewNopLogger(), nil)
require.NoError(t, err)

ingesters := ring.NewDesc()
ingesters.AddIngester("ingester-0", "ingester-0:3100", "zone-a", []uint32{1}, ring.ACTIVE, time.Now())

Expand All @@ -120,19 +111,19 @@ func TestInstanceCountDelegate_CorrectlyInvokesOtherDelegates(t *testing.T) {
require.Equal(t, 0, sentry1["Tokens"])
require.Equal(t, 0, sentry2["Tokens"])

delegate.OnRingInstanceHeartbeat(lifecycler, ingesters, nil)
delegate.OnRingInstanceHeartbeat(nil, ingesters, nil)
require.Equal(t, 1, sentry1["Heartbeat"])
require.Equal(t, 1, sentry2["Heartbeat"])

delegate.OnRingInstanceRegister(lifecycler, *ingesters, true, "ingester-0", ring.InstanceDesc{})
delegate.OnRingInstanceRegister(nil, *ingesters, true, "ingester-0", ring.InstanceDesc{})
require.Equal(t, 1, sentry1["Register"])
require.Equal(t, 1, sentry2["Register"])

delegate.OnRingInstanceStopping(lifecycler)
delegate.OnRingInstanceStopping(nil)
require.Equal(t, 1, sentry1["Stopping"])
require.Equal(t, 1, sentry2["Stopping"])

delegate.OnRingInstanceTokens(lifecycler, ring.Tokens{})
delegate.OnRingInstanceTokens(nil, ring.Tokens{})
require.Equal(t, 1, sentry1["Stopping"])
require.Equal(t, 1, sentry2["Stopping"])
}
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (c *Config) Validate() error {
if err := c.LimitsConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.Worker.Validate(); err != nil {
if err := c.Worker.Validate(util_log.Logger); err != nil {
return errors.Wrap(err, "invalid frontend-worker config")
}
if err := c.StorageConfig.BoltDBShipperConfig.Validate(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
reg := prometheus.DefaultRegisterer

t.Cfg.MemberlistKV.MetricsNamespace = "loki"
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
analytics.JSONCodec,
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
}

func (cfg *Config) Validate() error {
func (cfg *Config) Validate(log log.Logger) error {
if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" {
return errors.New("frontend address and scheduler address are mutually exclusive, please use only one")
}
return cfg.GRPCClientConfig.Validate()
return cfg.GRPCClientConfig.Validate(log)
}

// Handler for HTTP requests wrapped in protobuf messages.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/base/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/grafana/dskit/ring"
)

func (r *Ruler) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (r *Ruler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the ruler instance in the ring we want to start from
// a clean situation, so whatever is the state we set it ACTIVE, while we keep existing
// tokens (if any).
Expand All @@ -14,7 +14,7 @@ func (r *Ruler) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.De
}

takenTokens := ringDesc.GetTokens()
newTokens := l.GetTokenGenerator().GenerateTokens(r.cfg.Ring.NumTokens-len(tokens), takenTokens)
newTokens := ring.GenerateTokens(r.cfg.Ring.NumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/base/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
}

func generateSortedTokens(numTokens int) ring.Tokens {
tokens := ring.NewRandomTokenGenerator().GenerateTokens(numTokens, nil)
tokens := ring.GenerateTokens(numTokens, nil)

// Ensure generated tokens are sorted.
sort.Slice(tokens, func(i, j int) bool {
return tokens[i] < tokens[j]
})

return tokens
return ring.Tokens(tokens)
}

// numTokens determines the number of tokens owned by the specified
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/grafana/dskit/ring"
)

func (rm *RingManager) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the scheduler instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
Expand All @@ -14,7 +14,7 @@ func (rm *RingManager) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc
}

takenTokens := ringDesc.GetTokens()
newTokens := l.GetTokenGenerator().GenerateTokens(ringNumTokens-len(tokens), takenTokens)
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/chunk/client/gcp/bigtable_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"cloud.google.com/go/bigtable"
"github.com/go-kit/log"
"github.com/grafana/dskit/grpcclient"
ot "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -56,8 +57,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("bigtable", f)
}

func (cfg *Config) Validate() error {
return cfg.GRPCClientConfig.Validate()
func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}

// storageClientColumnKey implements chunk.storageClient for GCP.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (cfg *Config) Validate() error {
if err := cfg.CassandraStorageConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid Cassandra Storage config")
}
if err := cfg.GCPStorageConfig.Validate(); err != nil {
if err := cfg.GCPStorageConfig.Validate(util_log.Logger); err != nil {
return errors.Wrap(err, "invalid GCP Storage Storage config")
}
if err := cfg.Swift.Validate(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/indexshipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime
return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now)
}

func (c *Compactor) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the compactor instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
Expand All @@ -767,7 +767,7 @@ func (c *Compactor) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc rin
}

takenTokens := ringDesc.GetTokens()
newTokens := l.GetTokenGenerator().GenerateTokens(ringNumTokens-len(tokens), takenTokens)
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/indexgateway/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/grafana/dskit/ring"
)

func (rm *RingManager) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the index gateway instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
Expand All @@ -14,7 +14,7 @@ func (rm *RingManager) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc
}

takenTokens := ringDesc.GetTokens()
newTokens := l.GetTokenGenerator().GenerateTokens(ringNumTokens-len(tokens), takenTokens)
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/grafana/dskit/dns/miekgdns/resolver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 11 additions & 20 deletions vendor/github.com/grafana/dskit/flagext/bytes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 5 additions & 25 deletions vendor/github.com/grafana/dskit/grpcclient/grpcclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit be1a9c4

Please sign in to comment.