Skip to content

Commit

Permalink
autothrottle skips repeat topic throttled replica updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiealquiza committed Mar 2, 2020
1 parent 3a6b6c2 commit ef0014b
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions cmd/autothrottle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ var (
topicsRegex = []*regexp.Regexp{regexp.MustCompile(".*")}
)

type topicSet map[string]struct{}

func (t1 topicSet) isSubSetOf(t2 topicSet) bool {
for k := range t1 {
if _, exist := t2[k]; !exist {
return false
}
}

return true
}

func main() {
v := flag.Bool("version", false, "version")
flag.StringVar(&Config.APIKey, "api-key", "", "Datadog API key")
Expand Down Expand Up @@ -149,8 +161,8 @@ func main() {
knownThrottles := true

var reassignments kafkazk.Reassignments
var replicatingPreviously map[string]struct{}
var replicatingNow map[string]struct{}
var replicatingPreviously = topicSet{}
var replicatingNow = topicSet{}
var done []string

// Params for the updateReplicationThrottle request.
Expand Down Expand Up @@ -217,6 +229,16 @@ func main() {
replicatingPreviously[t] = struct{}{}
}

// If all of the currently replicating topics are a subset
// of the previously replicating topics, we can stop updating
// the Kafka topic throttled replicas list. This minimizes
// state that must be propagated through the cluster.
if replicatingNow.isSubSetOf(replicatingPreviously) {
throttleMeta.DisableTopicUpdates()
} else {
throttleMeta.DisableTopicUpdates()
}

// Fetch any throttle override config.
overrideCfg, err := getThrottleOverride(zk, overridePath)
if err != nil {
Expand Down

0 comments on commit ef0014b

Please sign in to comment.