diff --git a/registry/server/api_integration_test.go b/registry/server/api_integration_test.go index c70591c7..fe46af76 100644 --- a/registry/server/api_integration_test.go +++ b/registry/server/api_integration_test.go @@ -110,12 +110,23 @@ func TestCreateTaggedTopic(t *testing.T) { Topic: &pb.Topic{Name: "many_partitions", Partitions: 24, Replication: 3}, TargetBrokerTags: []string{"key:value"}, }, + 3: { + Topic: &pb.Topic{Name: "new_topic4", Partitions: 1, Replication: 1}, + TargetBrokerTags: []string{"key:value"}, + TargetBrokerIds: []uint32{1001}, + }, + 4: { + Topic: &pb.Topic{Name: "new_topic5", Partitions: 1, Replication: 1}, + TargetBrokerIds: []uint32{1001}, + }, } expectedErrors := map[int]error{ 0: nil, 1: ErrInsufficientBrokers, 2: ErrInsufficientBrokers, + 3: nil, + 4: nil, } for i := 0; i < len(tests); i++ { diff --git a/registry/server/api_topics.go b/registry/server/api_topics.go index 93ed0d21..0b9b507d 100644 --- a/registry/server/api_topics.go +++ b/registry/server/api_topics.go @@ -27,6 +27,8 @@ var ( ErrTopicAlreadyExists = errors.New("topic already exists") // ErrInsufficientBrokers error. ErrInsufficientBrokers = errors.New("insufficient number of brokers") + // ErrInvalidBrokerId error. + ErrInvalidBrokerId = errors.New("invalid broker id") // Misc. allTopicsRegex = regexp.MustCompile(".*") ) @@ -185,7 +187,7 @@ func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (* // If we're targeting a specific set of brokers by tag, build // a replica assignment. var assignment kafkaadmin.ReplicaAssignment - if req.TargetBrokerTags != nil { + if req.TargetBrokerTags != nil || req.TargetBrokerIds != nil { // Create a stub map with the provided request dimensions. opts := kafkazk.Populate( req.Topic.Name, @@ -193,17 +195,45 @@ func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (* int(req.Topic.Replication), ) pMap := kafkazk.NewPartitionMap(opts) - - // Fetch brokers by tag. + // Fetch brokers by tag. If no tag was specified, this will return all the brokers: reqParams := &pb.BrokerRequest{Tag: req.TargetBrokerTags} resp, err := s.ListBrokers(ctx, reqParams) if err != nil { return empty, err } + type Empty struct{} + selectedBrokerIds := make(map[uint32]Empty) + existingBrokers := make(map[uint32]Empty) + + // Populate a map of existing brokers returned by the ListBrokers call: + for _, id := range resp.Ids { + existingBrokers[id] = Empty{} + } - targetBrokerIDs := make([]int, len(resp.Ids)) - for i := range resp.Ids { - targetBrokerIDs[i] = int(resp.Ids[i]) + for _, id := range req.TargetBrokerIds { + selectedBrokerIds[id] = Empty{} + } + // Validate that the passed broker ids actually exists or were part of the returned brokers. + // If that's not the case that's either because the specified broker id does not exist or does not + // contain passed broker tags. In this case return ErrInvalidBrokerId error. + if len(selectedBrokerIds) > 0 { + for id := range selectedBrokerIds { + if _, ok := existingBrokers[id]; !ok { + return nil, ErrInvalidBrokerId + } + } + } + var targetBrokerIDs []int + for brokerId := range existingBrokers { + // If we have specified a list of selected brokers, only consider that broker if it's in that list: + if len(selectedBrokerIds) > 0 { + if _, ok := selectedBrokerIds[brokerId]; ok { + targetBrokerIDs = append(targetBrokerIDs, int(brokerId)) + } + } else { + // We haven't specified explicit broker ids. Select returned broker in that case: + targetBrokerIDs = append(targetBrokerIDs, int(brokerId)) + } } if len(targetBrokerIDs) < int(req.Topic.Replication) {