Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
filter: Include name from config as tag in stats
Browse files Browse the repository at this point in the history
... as is already done for the listener and writer.

Fixes #20.
  • Loading branch information
mjs committed Feb 21, 2018
1 parent 7890cf8 commit fd57289
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
22 changes: 16 additions & 6 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,23 @@ func (f *Filter) Stop() {
func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
defer f.wg.Done()

totalLine := lineformatter.New("spout_stat_filter", nil,
"passed", "processed", "rejected")
ruleLine := lineformatter.New("spout_stat_filter_rule",
[]string{"rule"}, "triggered")
totalLine := lineformatter.New(
"spout_stat_filter",
[]string{"filter"},
"passed", "processed", "rejected",
)
ruleLine := lineformatter.New(
"spout_stat_filter_rule",
[]string{"filter", "rule"},
"triggered",
)

for {
st := stats.Clone()

// publish the grand stats
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(nil,
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(
[]string{f.c.Name},
st.Get(linesPassed),
st.Get(linesProcessed),
st.Get(linesRejected),
Expand All @@ -168,7 +175,10 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
// publish the per rule stats
for i, subject := range rules.Subjects() {
f.nc.Publish(f.c.NATSSubjectMonitor,
ruleLine.Format([]string{subject}, st.Get(ruleToStatsName(i))),
ruleLine.Format(
[]string{f.c.Name, subject},
st.Get(ruleToStatsName(i)),
),
)
}

Expand Down
5 changes: 3 additions & 2 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
const natsPort = 44446

var conf = config.Config{
Name: "particle",
NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort),
NATSSubject: []string{"filter-test"},
NATSSubjectMonitor: "filter-test-monitor",
Expand Down Expand Up @@ -99,12 +100,12 @@ goodbye,host=gopher01

// Receive total stats
assertReceived(t, statsCh, "stats", `
spout_stat_filter passed=2,processed=3,rejected=1
spout_stat_filter,filter=particle passed=2,processed=3,rejected=1
`)

// Receive rule specific stats
assertReceived(t, statsCh, "rule stats", `
spout_stat_filter_rule,rule=hello-subject triggered=2
spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2
`)
}

Expand Down

0 comments on commit fd57289

Please sign in to comment.