Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rhythm] Changes to simplify operations #4389

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (t *App) initGenerator() (services.Service, error) {

if t.cfg.Target == SingleBinary && len(t.cfg.Generator.AssignedPartitions) == 0 {
// In SingleBinary mode always use partition 0. This is for small installs or local/debugging setups.
t.cfg.Generator.AssignedPartitions = append(t.cfg.Generator.AssignedPartitions, 0)
t.cfg.Generator.AssignedPartitions = map[string][]int32{t.cfg.Generator.InstanceID: {0}}
}

genSvc, err := generator.New(&t.cfg.Generator, t.Overrides, prometheus.DefaultRegisterer, t.partitionRing, t.store, log.Logger)
Expand Down Expand Up @@ -348,7 +348,7 @@ func (t *App) initBlockBuilder() (services.Service, error) {

if t.cfg.Target == SingleBinary && len(t.cfg.BlockBuilder.AssignedPartitions) == 0 {
// In SingleBinary mode always use partition 0. This is for small installs or local/debugging setups.
t.cfg.BlockBuilder.AssignedPartitions = append(t.cfg.BlockBuilder.AssignedPartitions, 0)
t.cfg.BlockBuilder.AssignedPartitions = map[string][]int32{t.cfg.BlockBuilder.InstanceID: {0}}
}

t.blockBuilder = blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store)
Expand Down
2 changes: 1 addition & 1 deletion modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSecti
func (b *BlockBuilder) getAssignedActivePartitions() []int32 {
activePartitionsCount := b.partitionRing.PartitionRing().ActivePartitionsCount()
assignedActivePartitions := make([]int32, 0, activePartitionsCount)
for _, partition := range b.cfg.AssignedPartitions {
for _, partition := range b.cfg.AssignedPartitions[b.cfg.InstanceID] {
if partition > int32(activePartitionsCount) {
break
}
Expand Down
2 changes: 1 addition & 1 deletion modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func blockbuilderConfig(_ *testing.T, address string) Config {
cfg.IngestStorageConfig.Kafka.Topic = testTopic
cfg.IngestStorageConfig.Kafka.ConsumerGroup = "test-consumer-group"

cfg.AssignedPartitions = []int32{0}
cfg.AssignedPartitions = map[string][]int32{cfg.InstanceID: {0}}
cfg.LookbackOnNoCommit = 15 * time.Second
cfg.ConsumeCycleDuration = 5 * time.Second

Expand Down
30 changes: 23 additions & 7 deletions modules/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"encoding/json"
"flag"
"fmt"
"os"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/pkg/util/log"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)
Expand All @@ -29,8 +32,9 @@ func (c *BlockConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagS
}

type Config struct {
AssignedPartitions []int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."`
ConsumeCycleDuration time.Duration `yaml:"consume_cycle_duration" doc:"Interval between consumption cycles."`
InstanceID string `yaml:"instance_id" doc:"Instance id."`
AssignedPartitions map[string][]int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."`
ConsumeCycleDuration time.Duration `yaml:"consume_cycle_duration" doc:"Interval between consumption cycles."`

LookbackOnNoCommit time.Duration `yaml:"lookback_on_no_commit" category:"advanced"`

Expand All @@ -49,6 +53,13 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
}

func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
hostname, err := os.Hostname()
if err != nil {
level.Error(log.Logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}

f.StringVar(&c.InstanceID, "block-builder.instance-id", hostname, "Instance id.")
f.Var(newPartitionAssignmentVar(&c.AssignedPartitions), prefix+".assigned-partitions", "List of partitions assigned to this block builder.")
f.DurationVar(&c.ConsumeCycleDuration, prefix+".consume-cycle-duration", 5*time.Minute, "Interval between consumption cycles.")
// TODO - Review default
Expand All @@ -58,19 +69,24 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
}

type partitionAssignmentVar struct {
p *[]int32
p *map[string][]int32
}

func newPartitionAssignmentVar(p *[]int32) *partitionAssignmentVar {
func newPartitionAssignmentVar(p *map[string][]int32) *partitionAssignmentVar {
return &partitionAssignmentVar{p}
}

func (p *partitionAssignmentVar) Set(s string) error {
var partitions []int32
if err := json.Unmarshal([]byte(s), &partitions); err != nil {
if s == "" {
return nil
}

val := make(map[string][]int32)
if err := json.Unmarshal([]byte(s), &val); err != nil {
return err
}
*p.p = partitions
*p.p = val

return nil
}

Expand Down
41 changes: 39 additions & 2 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package generator

import (
"encoding/json"
"flag"
"fmt"
"os"
"time"

"github.com/grafana/tempo/modules/generator/processor/localblocks"
Expand Down Expand Up @@ -39,8 +41,9 @@ type Config struct {
OverrideRingKey string `yaml:"override_ring_key"`

// This config is dynamically injected because defined outside the generator config.
Ingest ingest.Config `yaml:"-"`
AssignedPartitions []int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."`
Ingest ingest.Config `yaml:"-"`
AssignedPartitions map[string][]int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."`
InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
}

// RegisterFlagsAndApplyDefaults registers the flags.
Expand All @@ -56,6 +59,14 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.MetricsIngestionSlack = 30 * time.Second
cfg.QueryTimeout = 30 * time.Second
cfg.OverrideRingKey = generatorRingKey

hostname, err := os.Hostname()
if err != nil {
fmt.Printf("failed to get hostname: %v", err)
os.Exit(1)
}
f.StringVar(&cfg.InstanceID, prefix+".instance-id", hostname, "Instance id.")
f.Var(newPartitionAssignmentVar(&cfg.AssignedPartitions), prefix+".assigned-partitions", "List of partitions assigned to this metrics generator.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -158,3 +169,29 @@ func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userI

return copyCfg, nil
}

type partitionAssignmentVar struct {
p *map[string][]int32
}

func newPartitionAssignmentVar(p *map[string][]int32) *partitionAssignmentVar {
return &partitionAssignmentVar{p}
}

func (p *partitionAssignmentVar) Set(s string) error {
if s == "" {
return nil
}

val := make(map[string][]int32)
if err := json.Unmarshal([]byte(s), &val); err != nil {
return err
}
*p.p = val

return nil
}

func (p *partitionAssignmentVar) String() string {
return fmt.Sprintf("%v", *p.p)
}
2 changes: 1 addition & 1 deletion modules/generator/generator_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin
func (g *Generator) getAssignedActivePartitions() []int32 {
activePartitionsCount := g.partitionRing.PartitionRing().ActivePartitionsCount()
assignedActivePartitions := make([]int32, 0, activePartitionsCount)
for _, partition := range g.cfg.AssignedPartitions {
for _, partition := range g.cfg.AssignedPartitions[g.cfg.InstanceID] {
if partition > int32(activePartitionsCount) {
break
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)
Expand Down Expand Up @@ -44,6 +45,7 @@ var (
ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
ErrInconsistentSASLCredentials = errors.New("the SASL username and password must be both configured to enable SASL authentication")
)

type Config struct {
Expand Down Expand Up @@ -71,6 +73,9 @@ type KafkaConfig struct {
DialTimeout time.Duration `yaml:"dial_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`

SASLUsername string `yaml:"sasl_username"`
SASLPassword flagext.Secret `yaml:"sasl_password"`

ConsumerGroup string `yaml:"consumer_group"`
ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`

Expand All @@ -97,6 +102,9 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.")
f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.")

f.StringVar(&cfg.SASLUsername, prefix+".sasl-username", "", "The SASL username for authentication.")
f.Var(&cfg.SASLPassword, prefix+".sasl-password", "The SASL password for authentication.")

f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '<partition>' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.")
f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.")

Expand Down Expand Up @@ -130,6 +138,10 @@ func (cfg *KafkaConfig) Validate() error {
return ErrInvalidMaxConsumerLagAtStartup
}

if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") {
return ErrInconsistentSASLCredentials
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/ingest/writer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/plugin/kotel"
"github.com/twmb/franz-go/plugin/kprom"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -141,6 +142,16 @@ func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger lo
opts = append(opts, kgo.AllowAutoTopicCreation())
}

// SASL plain auth.
if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" {
opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) {
return plain.Auth{
User: cfg.SASLUsername,
Pass: cfg.SASLPassword.String(),
}, nil
})))
}

tracer := kotel.NewTracer(
kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(onlySampledTraces{propagation.TraceContext{}})),
)
Expand Down
60 changes: 60 additions & 0 deletions vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ github.com/twmb/franz-go/pkg/kgo
github.com/twmb/franz-go/pkg/kgo/internal/sticky
github.com/twmb/franz-go/pkg/kversion
github.com/twmb/franz-go/pkg/sasl
github.com/twmb/franz-go/pkg/sasl/plain
# github.com/twmb/franz-go/pkg/kadm v1.13.0
## explicit; go 1.21
github.com/twmb/franz-go/pkg/kadm
Expand Down