From 18098d3ddf58fa2910e5a077147519ea05ca394a Mon Sep 17 00:00:00 2001 From: saville Date: Mon, 20 Mar 2023 14:35:05 -0600 Subject: [PATCH 1/3] Add group exclusion --- README.md | 1 + kafka_exporter.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3316c3aa..e3adc9a8 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,7 @@ This image is configurable using different flags | server.tls.key-file | | The key file for the web server | | topic.filter | .* | Regex that determines which topics to collect | | group.filter | .* | Regex that determines which consumer groups to collect | +| group.exclude | ^$ | Regex that determines which consumer groups to exclude | | web.listen-address | :9308 | Address to listen on for web interface and telemetry | | web.telemetry-path | /metrics | Path under which to expose metrics | | log.enable-sarama | false | Turn on Sarama logging | diff --git a/kafka_exporter.go b/kafka_exporter.go index 36a51956..4fb3097f 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -64,6 +64,7 @@ type Exporter struct { client sarama.Client topicFilter *regexp.Regexp groupFilter *regexp.Regexp + groupExclude *regexp.Regexp mu sync.Mutex useZooKeeperLag bool zookeeperClient *kazoo.Kazoo @@ -149,7 +150,7 @@ func canReadFile(path string) bool { } // NewExporter returns an initialized Exporter. -func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Exporter, error) { +func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string, groupExclude string) (*Exporter, error) { var zookeeperClient *kazoo.Kazoo config := sarama.NewConfig() config.ClientID = clientID @@ -265,6 +266,7 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor client: client, topicFilter: regexp.MustCompile(topicFilter), groupFilter: regexp.MustCompile(groupFilter), + groupExclude: regexp.MustCompile(groupExclude), useZooKeeperLag: opts.useZooKeeperLag, zookeeperClient: zookeeperClient, nextMetadataRefresh: time.Now(), @@ -554,7 +556,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } groupIds := make([]string, 0) for groupId := range groups.Groups { - if e.groupFilter.MatchString(groupId) { + if e.groupFilter.MatchString(groupId) && !e.groupExclude.MatchString(groupId) { groupIds = append(groupIds, groupId) } } @@ -710,6 +712,7 @@ func main() { metricsPath = toFlagString("web.telemetry-path", "Path under which to expose metrics.", "/metrics") topicFilter = toFlagString("topic.filter", "Regex that determines which topics to collect.", ".*") groupFilter = toFlagString("group.filter", "Regex that determines which consumer groups to collect.", ".*") + groupExclude = toFlagString("group.exclude", "Regex that determines which consumer groups to exclude.", "^$") logSarama = toFlagBool("log.enable-sarama", "Turn on Sarama logging, default is false.", false, "false") opts = kafkaOpts{} @@ -767,7 +770,7 @@ func main() { } } - setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *logSarama, opts, labels) + setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *groupExclude, *logSarama, opts, labels) } func setup( @@ -775,6 +778,7 @@ func setup( metricsPath string, topicFilter string, groupFilter string, + groupExclude string, logSarama bool, opts kafkaOpts, labels map[string]string, @@ -888,7 +892,7 @@ func setup( sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } - exporter, err := NewExporter(opts, topicFilter, groupFilter) + exporter, err := NewExporter(opts, topicFilter, groupFilter, groupExclude) if err != nil { klog.Fatalln(err) } From 2f64eeceeb9a26f95111594304b9ecaf25b8d5c8 Mon Sep 17 00:00:00 2001 From: saville Date: Mon, 20 Mar 2023 14:38:19 -0600 Subject: [PATCH 2/3] Add topic exclusion --- README.md | 1 + kafka_exporter.go | 14 +++++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e3adc9a8..58ac2e8e 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,7 @@ This image is configurable using different flags | server.tls.cert-file | | The certificate file for the web server | | server.tls.key-file | | The key file for the web server | | topic.filter | .* | Regex that determines which topics to collect | +| topic.exclude | .* | Regex that determines which topics to exclude | | group.filter | .* | Regex that determines which consumer groups to collect | | group.exclude | ^$ | Regex that determines which consumer groups to exclude | | web.listen-address | :9308 | Address to listen on for web interface and telemetry | diff --git a/kafka_exporter.go b/kafka_exporter.go index 4fb3097f..f67ca90a 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -63,6 +63,7 @@ var ( type Exporter struct { client sarama.Client topicFilter *regexp.Regexp + topicExclude *regexp.Regexp groupFilter *regexp.Regexp groupExclude *regexp.Regexp mu sync.Mutex @@ -150,7 +151,7 @@ func canReadFile(path string) bool { } // NewExporter returns an initialized Exporter. -func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string, groupExclude string) (*Exporter, error) { +func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupFilter string, groupExclude string) (*Exporter, error) { var zookeeperClient *kazoo.Kazoo config := sarama.NewConfig() config.ClientID = clientID @@ -265,6 +266,7 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string, groupEx return &Exporter{ client: client, topicFilter: regexp.MustCompile(topicFilter), + topicExclude: regexp.MustCompile(topicExclude), groupFilter: regexp.MustCompile(groupFilter), groupExclude: regexp.MustCompile(groupExclude), useZooKeeperLag: opts.useZooKeeperLag, @@ -398,7 +400,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { getTopicMetrics := func(topic string) { defer wg.Done() - if !e.topicFilter.MatchString(topic) { + if !e.topicFilter.MatchString(topic) || e.topicExclude.MatchString(topic) { return } @@ -532,7 +534,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } for _, topic := range topics { - if e.topicFilter.MatchString(topic) { + if e.topicFilter.MatchString(topic) && !e.topicExclude.MatchString(topic) { wg.Add(1) topicChannel <- topic } @@ -711,6 +713,7 @@ func main() { listenAddress = toFlagString("web.listen-address", "Address to listen on for web interface and telemetry.", ":9308") metricsPath = toFlagString("web.telemetry-path", "Path under which to expose metrics.", "/metrics") topicFilter = toFlagString("topic.filter", "Regex that determines which topics to collect.", ".*") + topicExclude = toFlagString("topic.exclude", "Regex that determines which topics to exclude.", "^$") groupFilter = toFlagString("group.filter", "Regex that determines which consumer groups to collect.", ".*") groupExclude = toFlagString("group.exclude", "Regex that determines which consumer groups to exclude.", "^$") logSarama = toFlagBool("log.enable-sarama", "Turn on Sarama logging, default is false.", false, "false") @@ -770,13 +773,14 @@ func main() { } } - setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *groupExclude, *logSarama, opts, labels) + setup(*listenAddress, *metricsPath, *topicFilter, *topicExclude, *groupFilter, *groupExclude, *logSarama, opts, labels) } func setup( listenAddress string, metricsPath string, topicFilter string, + topicExclude string, groupFilter string, groupExclude string, logSarama bool, @@ -892,7 +896,7 @@ func setup( sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } - exporter, err := NewExporter(opts, topicFilter, groupFilter, groupExclude) + exporter, err := NewExporter(opts, topicFilter, topicExclude, groupFilter, groupExclude) if err != nil { klog.Fatalln(err) } From 8780e8fa178a81989075d6b7187f980c3c78c9a5 Mon Sep 17 00:00:00 2001 From: Brian Saville Date: Mon, 22 May 2023 07:21:21 -0600 Subject: [PATCH 3/3] Fix typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 58ac2e8e..23f379f5 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,7 @@ This image is configurable using different flags | server.tls.cert-file | | The certificate file for the web server | | server.tls.key-file | | The key file for the web server | | topic.filter | .* | Regex that determines which topics to collect | -| topic.exclude | .* | Regex that determines which topics to exclude | +| topic.exclude | ^$ | Regex that determines which topics to exclude | | group.filter | .* | Regex that determines which consumer groups to collect | | group.exclude | ^$ | Regex that determines which consumer groups to exclude | | web.listen-address | :9308 | Address to listen on for web interface and telemetry |