Skip to content

Commit

Permalink
feat(dataobj): tenant partition consumer (#16065)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Feb 3, 2025
1 parent f2bff20 commit 4399775
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 24 deletions.
4 changes: 0 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,10 +808,6 @@ dataobj_consumer:
# CLI flag: -dataobj-consumer.buffer-size
[buffer_size: <int> | default = 16MiB]

# The tenant ID to use for the data object builder.
# CLI flag: -dataobj-consumer.tenant-id
[tenant_id: <string> | default = "fake"]

# The prefix to use for the storage bucket.
# CLI flag: -dataobj-consumer.storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]
Expand Down
1 change: 1 addition & 0 deletions pkg/dataobj/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func (b *Builder) Reset() {
// reg must contain additional labels to differentiate between them.
func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg)

return b.metrics.Register(reg)
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/dataobj/consumer/config.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
package consumer

import (
"errors"
"flag"

"github.com/grafana/loki/v3/pkg/dataobj"
)

type Config struct {
dataobj.BuilderConfig
TenantID string `yaml:"tenant_id"`
// StorageBucketPrefix is the prefix to use for the storage bucket.
StorageBucketPrefix string `yaml:"storage_bucket_prefix"`
}

func (cfg *Config) Validate() error {
if cfg.TenantID == "" {
return errors.New("tenantID is required")
}
return cfg.BuilderConfig.Validate()
}

Expand All @@ -27,6 +22,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
f.StringVar(&cfg.TenantID, prefix+"tenant-id", "fake", "The tenant ID to use for the data object builder.")
f.StringVar(&cfg.StorageBucketPrefix, prefix+"storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.")
}
29 changes: 25 additions & 4 deletions pkg/dataobj/consumer/partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ type partitionProcessor struct {
logger log.Logger
}

func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, bucket objstore.Bucket, tenantID string, topic string, partition int32, logger log.Logger, reg prometheus.Registerer) *partitionProcessor {
func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer) *partitionProcessor {
ctx, cancel := context.WithCancel(ctx)
decoder, err := kafka.NewDecoder()
if err != nil {
panic(err)
}
reg = prometheus.WrapRegistererWith(prometheus.Labels{
"shard": strconv.Itoa(int(virtualShard)),
"partition": strconv.Itoa(int(partition)),
"topic": topic,
}, reg)

metrics := newPartitionOffsetMetrics()
Expand All @@ -70,7 +72,7 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d

return &partitionProcessor{
client: client,
logger: log.With(logger, "topic", topic, "partition", partition),
logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID),
topic: topic,
partition: partition,
records: make(chan *kgo.Record, 1000),
Expand All @@ -90,15 +92,18 @@ func (p *partitionProcessor) start() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer close(p.records)

level.Info(p.logger).Log("msg", "started partition processor")
for {
select {
case <-p.ctx.Done():
level.Info(p.logger).Log("msg", "stopping partition processor")
return
case record := <-p.records:
case record, ok := <-p.records:
if !ok {
// Channel was closed
return
}
p.processRecord(record)
}
}
Expand All @@ -114,6 +119,21 @@ func (p *partitionProcessor) stop() {
p.metrics.unregister(p.reg)
}

// Drops records from the channel if the processor is stopped.
// Returns false if the processor is stopped, true otherwise.
func (p *partitionProcessor) Append(records []*kgo.Record) bool {
for _, record := range records {
select {
// must check per-record in order to not block on a full channel
// after receiver has been stopped.
case <-p.ctx.Done():
return false
case p.records <- record:
}
}
return true
}

func (p *partitionProcessor) initBuilder() error {
var initErr error
p.builderOnce.Do(func() {
Expand Down Expand Up @@ -146,6 +166,7 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {

// todo: handle multi-tenant
if !bytes.Equal(record.Key, p.tenantID) {
level.Error(p.logger).Log("msg", "record key does not match tenant ID", "key", record.Key, "tenant_id", p.tenantID)
return
}
stream, err := p.decoder.DecodeWithoutLabels(record.Value)
Expand Down
23 changes: 13 additions & 10 deletions pkg/dataobj/consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/kafka/partitionring/consumer"
Expand All @@ -33,20 +34,22 @@ type Service struct {

cfg Config
bucket objstore.Bucket
codec distributor.TenantPrefixCodec

// Partition management
partitionMtx sync.RWMutex
partitionHandlers map[string]map[int32]*partitionProcessor
}

func New(kafkaCfg kafka.Config, cfg Config, bucket objstore.Bucket, instanceID string, partitionRing ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) *Service {
func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore.Bucket, instanceID string, partitionRing ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) *Service {
if cfg.StorageBucketPrefix != "" {
bucket = objstore.NewPrefixedBucket(bucket, cfg.StorageBucketPrefix)
}
s := &Service{
logger: log.With(logger, "component", groupName),
cfg: cfg,
bucket: bucket,
codec: distributor.TenantPrefixCodec(topicPrefix),
partitionHandlers: make(map[string]map[int32]*partitionProcessor),
reg: reg,
}
Expand Down Expand Up @@ -80,12 +83,19 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie
defer s.partitionMtx.Unlock()

for topic, parts := range partitions {
tenant, virtualShard, err := s.codec.Decode(topic)
// TODO: should propage more effectively
if err != nil {
level.Error(s.logger).Log("msg", "failed to decode topic", "topic", topic, "err", err)
continue
}

if _, ok := s.partitionHandlers[topic]; !ok {
s.partitionHandlers[topic] = make(map[int32]*partitionProcessor)
}

for _, partition := range parts {
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.bucket, s.cfg.TenantID, topic, partition, s.logger, s.reg)
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg)
s.partitionHandlers[topic][partition] = processor
processor.start()
}
Expand Down Expand Up @@ -155,14 +165,7 @@ func (s *Service) run(ctx context.Context) error {
return
}

for _, record := range records {
select {
case <-processor.ctx.Done():
return
case processor.records <- record:
// Record sent successfully
}
}
_ = processor.Append(records)
})
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/partitionring/consumer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Client struct {
func NewGroupClient(kafkaCfg kafka.Config, partitionRing ring.PartitionRingReader, groupName string, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*Client, error) {
defaultOpts := []kgo.Opt{
kgo.ConsumerGroup(groupName),
kgo.ConsumeRegex(),
kgo.ConsumeTopics(kafkaCfg.Topic),
kgo.Balancers(NewCooperativeActiveStickyBalancer(partitionRing)),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,7 @@ func (t *Loki) initDataObjConsumer() (services.Service, error) {
t.dataObjConsumer = consumer.New(
t.Cfg.KafkaConfig,
t.Cfg.DataObjConsumer,
t.Cfg.Distributor.TenantTopic.TopicPrefix,
store,
t.Cfg.Ingester.LifecyclerConfig.ID,
t.partitionRing,
Expand Down

0 comments on commit 4399775

Please sign in to comment.