Skip to content

Commit

Permalink
feat(stream-metadata-generator): Add support for static partitions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis authored Feb 26, 2025
1 parent 3ee85fe commit 455f418
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion tools/stream-metadata-generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newMetrics(reg prometheus.Registerer) *metrics {
}

type config struct {
NumPartitions int `yaml:"num_partitions"`
NumTenants int `yaml:"num_tenants"`
QPSPerTenant int `yaml:"qps_per_tenant"`
BatchSize int `yaml:"batch_size"`
Expand Down Expand Up @@ -116,6 +117,7 @@ func (s *streamLabelsFlag) Set(value string) error {
}

func (c *config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.IntVar(&c.NumPartitions, "partitions.total", 64, "Number of partitions to generate metadata for")
f.IntVar(&c.NumTenants, "tenants.total", 1, "Number of tenants to generate metadata for")
f.IntVar(&c.QPSPerTenant, "tenants.qps", 10, "Number of QPS per tenant")
f.IntVar(&c.BatchSize, "tenants.streams.batch-size", 100, "Number of streams to send to Kafka per tick")
Expand Down Expand Up @@ -392,7 +394,18 @@ func (s *generator) running(ctx context.Context) error {
func (s *generator) sendStreamsToKafka(ctx context.Context, streams []distributor.KeyedStream, tenant string, errCh chan error) {
for _, stream := range streams {
go func(stream distributor.KeyedStream) {
partitionID, err := s.partitionRing.PartitionRing().ActivePartitionForKey(stream.RingToken)
var (
partitionID int32
err error
)

switch {
case s.cfg.NumPartitions <= 0:
partitionID, err = s.partitionRing.PartitionRing().ActivePartitionForKey(stream.RingToken)
case s.cfg.NumPartitions > 0:
partitionID = int32(stream.HashNoShard % uint64(s.cfg.NumPartitions))
}

if err != nil {
errCh <- fmt.Errorf("failed to find active partition for stream: %w", err)
return
Expand Down

0 comments on commit 455f418

Please sign in to comment.