Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

filter: Include name from config as tag in stats #28

Merged
merged 2 commits into from
Feb 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,23 @@ func (f *Filter) Stop() {
func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
defer f.wg.Done()

totalLine := lineformatter.New("spout_stat_filter", nil,
"passed", "processed", "rejected")
ruleLine := lineformatter.New("spout_stat_filter_rule",
[]string{"rule"}, "triggered")
totalLine := lineformatter.New(
"spout_stat_filter",
[]string{"filter"},
"passed", "processed", "rejected",
)
ruleLine := lineformatter.New(
"spout_stat_filter_rule",
[]string{"filter", "rule"},
"triggered",
)

for {
st := stats.Clone()

// publish the grand stats
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(nil,
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(
[]string{f.c.Name},
st.Get(linesPassed),
st.Get(linesProcessed),
st.Get(linesRejected),
Expand All @@ -168,7 +175,10 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
// publish the per rule stats
for i, subject := range rules.Subjects() {
f.nc.Publish(f.c.NATSSubjectMonitor,
ruleLine.Format([]string{subject}, st.Get(ruleToStatsName(i))),
ruleLine.Format(
[]string{f.c.Name, subject},
st.Get(ruleToStatsName(i)),
),
)
}

Expand Down
29 changes: 24 additions & 5 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
const natsPort = 44446

var conf = config.Config{
Name: "particle",
NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort),
NATSSubject: []string{"filter-test"},
NATSSubjectMonitor: "filter-test-monitor",
Expand Down Expand Up @@ -98,22 +99,40 @@ goodbye,host=gopher01
`)

// Receive total stats
assertReceived(t, statsCh, "stats", `
spout_stat_filter passed=2,processed=3,rejected=1
assertReceivedMulti(t, statsCh, "stats", `
spout_stat_filter,filter=particle passed=2,processed=3,rejected=1
`)

// Receive rule specific stats
assertReceived(t, statsCh, "rule stats", `
spout_stat_filter_rule,rule=hello-subject triggered=2
assertReceivedMulti(t, statsCh, "rule stats", `
spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2
`)
}

func assertReceived(t *testing.T, ch <-chan string, label, expected string) {
expected = expected[1:]

select {
case received := <-ch:
assert.Equal(t, expected, received)
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for " + label)
t.Fatalf("timed out waiting for %s", label)
}
}

func assertReceivedMulti(t *testing.T, ch <-chan string, label, expected string) {
expected = expected[1:]

var received string
timeout := time.After(spouttest.LongWait)
for {
select {
case received = <-ch:
if expected == received {
return
}
case <-timeout:
t.Fatalf("timed out waiting for %s. last received: %q", label, received)
}
}
}