Skip to content

Commit

Permalink
Add topic exclusion
Browse files Browse the repository at this point in the history
  • Loading branch information
saville committed Mar 20, 2023
1 parent 18098d3 commit 2f64eec
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ This image is configurable using different flags
| server.tls.cert-file | | The certificate file for the web server |
| server.tls.key-file | | The key file for the web server |
| topic.filter | .* | Regex that determines which topics to collect |
| topic.exclude | .* | Regex that determines which topics to exclude |
| group.filter | .* | Regex that determines which consumer groups to collect |
| group.exclude | ^$ | Regex that determines which consumer groups to exclude |
| web.listen-address | :9308 | Address to listen on for web interface and telemetry |
Expand Down
14 changes: 9 additions & 5 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var (
type Exporter struct {
client sarama.Client
topicFilter *regexp.Regexp
topicExclude *regexp.Regexp
groupFilter *regexp.Regexp
groupExclude *regexp.Regexp
mu sync.Mutex
Expand Down Expand Up @@ -150,7 +151,7 @@ func canReadFile(path string) bool {
}

// NewExporter returns an initialized Exporter.
func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string, groupExclude string) (*Exporter, error) {
func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupFilter string, groupExclude string) (*Exporter, error) {
var zookeeperClient *kazoo.Kazoo
config := sarama.NewConfig()
config.ClientID = clientID
Expand Down Expand Up @@ -265,6 +266,7 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string, groupEx
return &Exporter{
client: client,
topicFilter: regexp.MustCompile(topicFilter),
topicExclude: regexp.MustCompile(topicExclude),
groupFilter: regexp.MustCompile(groupFilter),
groupExclude: regexp.MustCompile(groupExclude),
useZooKeeperLag: opts.useZooKeeperLag,
Expand Down Expand Up @@ -398,7 +400,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
getTopicMetrics := func(topic string) {
defer wg.Done()

if !e.topicFilter.MatchString(topic) {
if !e.topicFilter.MatchString(topic) || e.topicExclude.MatchString(topic) {
return
}

Expand Down Expand Up @@ -532,7 +534,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
}

for _, topic := range topics {
if e.topicFilter.MatchString(topic) {
if e.topicFilter.MatchString(topic) && !e.topicExclude.MatchString(topic) {
wg.Add(1)
topicChannel <- topic
}
Expand Down Expand Up @@ -711,6 +713,7 @@ func main() {
listenAddress = toFlagString("web.listen-address", "Address to listen on for web interface and telemetry.", ":9308")
metricsPath = toFlagString("web.telemetry-path", "Path under which to expose metrics.", "/metrics")
topicFilter = toFlagString("topic.filter", "Regex that determines which topics to collect.", ".*")
topicExclude = toFlagString("topic.exclude", "Regex that determines which topics to exclude.", "^$")
groupFilter = toFlagString("group.filter", "Regex that determines which consumer groups to collect.", ".*")
groupExclude = toFlagString("group.exclude", "Regex that determines which consumer groups to exclude.", "^$")
logSarama = toFlagBool("log.enable-sarama", "Turn on Sarama logging, default is false.", false, "false")
Expand Down Expand Up @@ -770,13 +773,14 @@ func main() {
}
}

setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *groupExclude, *logSarama, opts, labels)
setup(*listenAddress, *metricsPath, *topicFilter, *topicExclude, *groupFilter, *groupExclude, *logSarama, opts, labels)
}

func setup(
listenAddress string,
metricsPath string,
topicFilter string,
topicExclude string,
groupFilter string,
groupExclude string,
logSarama bool,
Expand Down Expand Up @@ -892,7 +896,7 @@ func setup(
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}

exporter, err := NewExporter(opts, topicFilter, groupFilter, groupExclude)
exporter, err := NewExporter(opts, topicFilter, topicExclude, groupFilter, groupExclude)
if err != nil {
klog.Fatalln(err)
}
Expand Down

0 comments on commit 2f64eec

Please sign in to comment.