diff --git a/cmd/autothrottle/main.go b/cmd/autothrottle/main.go index 11070a7..e529083 100644 --- a/cmd/autothrottle/main.go +++ b/cmd/autothrottle/main.go @@ -49,6 +49,18 @@ var ( topicsRegex = []*regexp.Regexp{regexp.MustCompile(".*")} ) +type topicSet map[string]struct{} + +func (t1 topicSet) isSubSetOf(t2 topicSet) bool { + for k := range t1 { + if _, exist := t2[k]; !exist { + return false + } + } + + return true +} + func main() { v := flag.Bool("version", false, "version") flag.StringVar(&Config.APIKey, "api-key", "", "Datadog API key") @@ -149,8 +161,8 @@ func main() { knownThrottles := true var reassignments kafkazk.Reassignments - var replicatingPreviously map[string]struct{} - var replicatingNow map[string]struct{} + var replicatingPreviously = topicSet{} + var replicatingNow = topicSet{} var done []string // Params for the updateReplicationThrottle request. @@ -217,6 +229,16 @@ func main() { replicatingPreviously[t] = struct{}{} } + // If all of the currently replicating topics are a subset + // of the previously replicating topics, we can stop updating + // the Kafka topic throttled replicas list. This minimizes + // state that must be propagated through the cluster. + if replicatingNow.isSubSetOf(replicatingPreviously) { + throttleMeta.DisableTopicUpdates() + } else { + throttleMeta.DisableTopicUpdates() + } + // Fetch any throttle override config. overrideCfg, err := getThrottleOverride(zk, overridePath) if err != nil {