Skip to content

Commit

Permalink
Allow to specify a list of brokers to create topic on
Browse files Browse the repository at this point in the history
If broker tag is also passed, takes the intersection of tagged brokers
and specified broker ids.
  • Loading branch information
dopuskh3 committed Apr 22, 2022
1 parent 0b23df9 commit 77e5775
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
11 changes: 11 additions & 0 deletions registry/server/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,23 @@ func TestCreateTaggedTopic(t *testing.T) {
Topic: &pb.Topic{Name: "many_partitions", Partitions: 24, Replication: 3},
TargetBrokerTags: []string{"key:value"},
},
3: {
Topic: &pb.Topic{Name: "new_topic4", Partitions: 1, Replication: 1},
TargetBrokerTags: []string{"key:value"},
TargetBrokerIds: []uint32{1001},
},
4: {
Topic: &pb.Topic{Name: "new_topic5", Partitions: 1, Replication: 1},
TargetBrokerIds: []uint32{1001},
},
}

expectedErrors := map[int]error{
0: nil,
1: ErrInsufficientBrokers,
2: ErrInsufficientBrokers,
3: nil,
4: nil,
}

for i := 0; i < len(tests); i++ {
Expand Down
42 changes: 36 additions & 6 deletions registry/server/api_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
ErrTopicAlreadyExists = errors.New("topic already exists")
// ErrInsufficientBrokers error.
ErrInsufficientBrokers = errors.New("insufficient number of brokers")
// ErrInvalidBrokerId error.
ErrInvalidBrokerId = errors.New("invalid broker id")
// Misc.
allTopicsRegex = regexp.MustCompile(".*")
)
Expand Down Expand Up @@ -185,25 +187,53 @@ func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (*
// If we're targeting a specific set of brokers by tag, build
// a replica assignment.
var assignment kafkaadmin.ReplicaAssignment
if req.TargetBrokerTags != nil {
if req.TargetBrokerTags != nil || req.TargetBrokerIds != nil {
// Create a stub map with the provided request dimensions.
opts := kafkazk.Populate(
req.Topic.Name,
int(req.Topic.Partitions),
int(req.Topic.Replication),
)
pMap := kafkazk.NewPartitionMap(opts)

// Fetch brokers by tag.
// Fetch brokers by tag. If no tag was specified, this will return all the brokers:
reqParams := &pb.BrokerRequest{Tag: req.TargetBrokerTags}
resp, err := s.ListBrokers(ctx, reqParams)
if err != nil {
return empty, err
}
type Empty struct{}
selectedBrokerIds := make(map[uint32]Empty)
existingBrokers := make(map[uint32]Empty)

// Populate a map of existing brokers returned by the ListBrokers call:
for _, id := range resp.Ids {
existingBrokers[id] = Empty{}
}

targetBrokerIDs := make([]int, len(resp.Ids))
for i := range resp.Ids {
targetBrokerIDs[i] = int(resp.Ids[i])
for _, id := range req.TargetBrokerIds {
selectedBrokerIds[id] = Empty{}
}
// Validate that the passed broker ids actually exists or were part of the returned brokers.
// If that's not the case that's either because the specified broker id does not exist or does not
// contain passed broker tags. In this case return ErrInvalidBrokerId error.
if len(selectedBrokerIds) > 0 {
for id := range selectedBrokerIds {
if _, ok := existingBrokers[id]; !ok {
return nil, ErrInvalidBrokerId
}
}
}
var targetBrokerIDs []int
for brokerId := range existingBrokers {
// If we have specified a list of selected brokers, only consider that broker if it's in that list:
if len(selectedBrokerIds) > 0 {
if _, ok := selectedBrokerIds[brokerId]; ok {
targetBrokerIDs = append(targetBrokerIDs, int(brokerId))
}
} else {
// We haven't specified explicit broker ids. Select returned broker in that case:
targetBrokerIDs = append(targetBrokerIDs, int(brokerId))
}
}

if len(targetBrokerIDs) < int(req.Topic.Replication) {
Expand Down

0 comments on commit 77e5775

Please sign in to comment.