Skip to content

Commit

Permalink
Merge pull request #393 from DataDog/jamie/sequential-admin-conf
Browse files Browse the repository at this point in the history
jamie/sequential admin conf
  • Loading branch information
jamiealquiza authored Mar 7, 2022
2 parents c50d6f7 + d31ca33 commit a758ac8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
8 changes: 5 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ services:
- ssl_setup
- zookeeper
environment:
HOSTNAME_COMMAND: hostname
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://_{HOSTNAME_COMMAND}:9092,SASL_SSL://_{HOSTNAME_COMMAND}:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_SSL://:9093
HOSTNAME_COMMAND: hostname -i
PORT_COMMAND: "docker port $$(hostname) 9092/tcp | cut -d: -f2"
KAFKA_ADVERTISED_LISTENERS: OUTSIDE://localhost:_{PORT_COMMAND},INSIDE://:9094,SASL_SSL://_{HOSTNAME_COMMAND}:9093
KAFKA_LISTENERS: OUTSIDE://:9092,INSIDE://:9094,SASL_SSL://:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: OUTSIDE:PLAINTEXT,INSIDE:PLAINTEXT,SASL_SSL:SASL_SSL
KAFKA_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test1:1:3,test2:2:2"
Expand Down
35 changes: 18 additions & 17 deletions kafkaadmin/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ var (
// list of names and returns a ResourceConfigs for all dynamic configurations
// discovered for each resource by name.
func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error) {
var configResources []kafka.ConfigResource

var ckgType kafka.ResourceType
switch kind {
case "topic":
Expand All @@ -32,28 +30,31 @@ func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []stri
return nil, fmt.Errorf("no resource names provided")
}

// Populate the ConfigResource request.
// Populate the results into the ResourceConfigs.
var results = make(ResourceConfigs)

// Fetch the config for each resource sequentially.
// TODO(jamie) do this in batch when it becomes possible.
for _, n := range names {
// Populate the ConfigResource request.
cr := kafka.ConfigResource{
Type: ckgType,
Name: n,
}
configResources = append(configResources, cr)
}

// Request configs.
resourceConfigs, err := c.c.DescribeConfigs(ctx, configResources)
if err != nil {
return nil, err
}
// Request.
resourceConfigs, err := c.c.DescribeConfigs(ctx, []kafka.ConfigResource{cr})
if err != nil {
return nil, err
}

// Populate the results into the ResourceConfigs.
var results = make(ResourceConfigs)
for _, config := range resourceConfigs {
for _, v := range config.Config {
// Only return dynamic configs.
if v.Source == kafka.ConfigSourceDynamicTopic || v.Source == kafka.ConfigSourceDynamicBroker {
results.AddConfigEntry(config.Name, v)
// Populate results.
for _, config := range resourceConfigs {
for _, v := range config.Config {
// Only return dynamic configs.
if v.Source == kafka.ConfigSourceDynamicTopic || v.Source == kafka.ConfigSourceDynamicBroker {
results.AddConfigEntry(config.Name, v)
}
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions kafkaadmin/throttles.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ func (c Client) SetThrottle(ctx context.Context, cfg SetThrottleConfig) error {
}
}

if len(throttleConfigs) > 0 {
// Apply the configs.
// TODO(jamie) review whether the kafak.SetAdminIncremental AlterConfigsAdminOption
// Apply the configs in sequence.
for _, config := range throttleConfigs {
// TODO(jamie) perform these in batch once the 'Only one ConfigResource of
// type BROKER is allowed per call' error is no longer encountered.
// TODO(jamie) review whether the kafka.SetAdminIncremental AlterConfigsAdminOption
// actually works here.
if _, err = c.c.AlterConfigs(ctx, throttleConfigs); err != nil {
if _, err = c.c.AlterConfigs(ctx, []kafka.ConfigResource{config}); err != nil {
return ErrSetThrottle{Message: err.Error()}
}
}
Expand Down

0 comments on commit a758ac8

Please sign in to comment.