diff --git a/README.md b/README.md index 3316c3aa..23f379f5 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,9 @@ This image is configurable using different flags | server.tls.cert-file | | The certificate file for the web server | | server.tls.key-file | | The key file for the web server | | topic.filter | .* | Regex that determines which topics to collect | +| topic.exclude | ^$ | Regex that determines which topics to exclude | | group.filter | .* | Regex that determines which consumer groups to collect | +| group.exclude | ^$ | Regex that determines which consumer groups to exclude | | web.listen-address | :9308 | Address to listen on for web interface and telemetry | | web.telemetry-path | /metrics | Path under which to expose metrics | | log.enable-sarama | false | Turn on Sarama logging | diff --git a/kafka_exporter.go b/kafka_exporter.go index 36a51956..f67ca90a 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -63,7 +63,9 @@ var ( type Exporter struct { client sarama.Client topicFilter *regexp.Regexp + topicExclude *regexp.Regexp groupFilter *regexp.Regexp + groupExclude *regexp.Regexp mu sync.Mutex useZooKeeperLag bool zookeeperClient *kazoo.Kazoo @@ -149,7 +151,7 @@ func canReadFile(path string) bool { } // NewExporter returns an initialized Exporter. -func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Exporter, error) { +func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupFilter string, groupExclude string) (*Exporter, error) { var zookeeperClient *kazoo.Kazoo config := sarama.NewConfig() config.ClientID = clientID @@ -264,7 +266,9 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor return &Exporter{ client: client, topicFilter: regexp.MustCompile(topicFilter), + topicExclude: regexp.MustCompile(topicExclude), groupFilter: regexp.MustCompile(groupFilter), + groupExclude: regexp.MustCompile(groupExclude), useZooKeeperLag: opts.useZooKeeperLag, zookeeperClient: zookeeperClient, nextMetadataRefresh: time.Now(), @@ -396,7 +400,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { getTopicMetrics := func(topic string) { defer wg.Done() - if !e.topicFilter.MatchString(topic) { + if !e.topicFilter.MatchString(topic) || e.topicExclude.MatchString(topic) { return } @@ -530,7 +534,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } for _, topic := range topics { - if e.topicFilter.MatchString(topic) { + if e.topicFilter.MatchString(topic) && !e.topicExclude.MatchString(topic) { wg.Add(1) topicChannel <- topic } @@ -554,7 +558,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } groupIds := make([]string, 0) for groupId := range groups.Groups { - if e.groupFilter.MatchString(groupId) { + if e.groupFilter.MatchString(groupId) && !e.groupExclude.MatchString(groupId) { groupIds = append(groupIds, groupId) } } @@ -709,7 +713,9 @@ func main() { listenAddress = toFlagString("web.listen-address", "Address to listen on for web interface and telemetry.", ":9308") metricsPath = toFlagString("web.telemetry-path", "Path under which to expose metrics.", "/metrics") topicFilter = toFlagString("topic.filter", "Regex that determines which topics to collect.", ".*") + topicExclude = toFlagString("topic.exclude", "Regex that determines which topics to exclude.", "^$") groupFilter = toFlagString("group.filter", "Regex that determines which consumer groups to collect.", ".*") + groupExclude = toFlagString("group.exclude", "Regex that determines which consumer groups to exclude.", "^$") logSarama = toFlagBool("log.enable-sarama", "Turn on Sarama logging, default is false.", false, "false") opts = kafkaOpts{} @@ -767,14 +773,16 @@ func main() { } } - setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *logSarama, opts, labels) + setup(*listenAddress, *metricsPath, *topicFilter, *topicExclude, *groupFilter, *groupExclude, *logSarama, opts, labels) } func setup( listenAddress string, metricsPath string, topicFilter string, + topicExclude string, groupFilter string, + groupExclude string, logSarama bool, opts kafkaOpts, labels map[string]string, @@ -888,7 +896,7 @@ func setup( sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } - exporter, err := NewExporter(opts, topicFilter, groupFilter) + exporter, err := NewExporter(opts, topicFilter, topicExclude, groupFilter, groupExclude) if err != nil { klog.Fatalln(err) }