Skip to content

Commit

Permalink
Merge pull request #379 from bluesliverx/master
Browse files Browse the repository at this point in the history
Add topic and group exclusion parameters
  • Loading branch information
danielqsj authored May 23, 2023
2 parents 9d9cd65 + 8780e8f commit f21d49e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ This image is configurable using different flags
| server.tls.cert-file | | The certificate file for the web server |
| server.tls.key-file | | The key file for the web server |
| topic.filter | .* | Regex that determines which topics to collect |
| topic.exclude | ^$ | Regex that determines which topics to exclude |
| group.filter | .* | Regex that determines which consumer groups to collect |
| group.exclude | ^$ | Regex that determines which consumer groups to exclude |
| web.listen-address | :9308 | Address to listen on for web interface and telemetry |
| web.telemetry-path | /metrics | Path under which to expose metrics |
| log.enable-sarama | false | Turn on Sarama logging |
Expand Down
20 changes: 14 additions & 6 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ var (
type Exporter struct {
client sarama.Client
topicFilter *regexp.Regexp
topicExclude *regexp.Regexp
groupFilter *regexp.Regexp
groupExclude *regexp.Regexp
mu sync.Mutex
useZooKeeperLag bool
zookeeperClient *kazoo.Kazoo
Expand Down Expand Up @@ -149,7 +151,7 @@ func canReadFile(path string) bool {
}

// NewExporter returns an initialized Exporter.
func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Exporter, error) {
func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupFilter string, groupExclude string) (*Exporter, error) {
var zookeeperClient *kazoo.Kazoo
config := sarama.NewConfig()
config.ClientID = clientID
Expand Down Expand Up @@ -264,7 +266,9 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor
return &Exporter{
client: client,
topicFilter: regexp.MustCompile(topicFilter),
topicExclude: regexp.MustCompile(topicExclude),
groupFilter: regexp.MustCompile(groupFilter),
groupExclude: regexp.MustCompile(groupExclude),
useZooKeeperLag: opts.useZooKeeperLag,
zookeeperClient: zookeeperClient,
nextMetadataRefresh: time.Now(),
Expand Down Expand Up @@ -396,7 +400,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
getTopicMetrics := func(topic string) {
defer wg.Done()

if !e.topicFilter.MatchString(topic) {
if !e.topicFilter.MatchString(topic) || e.topicExclude.MatchString(topic) {
return
}

Expand Down Expand Up @@ -530,7 +534,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
}

for _, topic := range topics {
if e.topicFilter.MatchString(topic) {
if e.topicFilter.MatchString(topic) && !e.topicExclude.MatchString(topic) {
wg.Add(1)
topicChannel <- topic
}
Expand All @@ -554,7 +558,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
}
groupIds := make([]string, 0)
for groupId := range groups.Groups {
if e.groupFilter.MatchString(groupId) {
if e.groupFilter.MatchString(groupId) && !e.groupExclude.MatchString(groupId) {
groupIds = append(groupIds, groupId)
}
}
Expand Down Expand Up @@ -709,7 +713,9 @@ func main() {
listenAddress = toFlagString("web.listen-address", "Address to listen on for web interface and telemetry.", ":9308")
metricsPath = toFlagString("web.telemetry-path", "Path under which to expose metrics.", "/metrics")
topicFilter = toFlagString("topic.filter", "Regex that determines which topics to collect.", ".*")
topicExclude = toFlagString("topic.exclude", "Regex that determines which topics to exclude.", "^$")
groupFilter = toFlagString("group.filter", "Regex that determines which consumer groups to collect.", ".*")
groupExclude = toFlagString("group.exclude", "Regex that determines which consumer groups to exclude.", "^$")
logSarama = toFlagBool("log.enable-sarama", "Turn on Sarama logging, default is false.", false, "false")

opts = kafkaOpts{}
Expand Down Expand Up @@ -767,14 +773,16 @@ func main() {
}
}

setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *logSarama, opts, labels)
setup(*listenAddress, *metricsPath, *topicFilter, *topicExclude, *groupFilter, *groupExclude, *logSarama, opts, labels)
}

func setup(
listenAddress string,
metricsPath string,
topicFilter string,
topicExclude string,
groupFilter string,
groupExclude string,
logSarama bool,
opts kafkaOpts,
labels map[string]string,
Expand Down Expand Up @@ -888,7 +896,7 @@ func setup(
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}

exporter, err := NewExporter(opts, topicFilter, groupFilter)
exporter, err := NewExporter(opts, topicFilter, topicExclude, groupFilter, groupExclude)
if err != nil {
klog.Fatalln(err)
}
Expand Down

0 comments on commit f21d49e

Please sign in to comment.