Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #28 from jumptrading/filter-config-name
Browse files Browse the repository at this point in the history
filter: Include name from config as tag in stats
  • Loading branch information
mjs authored Feb 26, 2018
2 parents 7890cf8 + fd9814a commit 938fbe4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 11 deletions.
22 changes: 16 additions & 6 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,23 @@ func (f *Filter) Stop() {
func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
defer f.wg.Done()

totalLine := lineformatter.New("spout_stat_filter", nil,
"passed", "processed", "rejected")
ruleLine := lineformatter.New("spout_stat_filter_rule",
[]string{"rule"}, "triggered")
totalLine := lineformatter.New(
"spout_stat_filter",
[]string{"filter"},
"passed", "processed", "rejected",
)
ruleLine := lineformatter.New(
"spout_stat_filter_rule",
[]string{"filter", "rule"},
"triggered",
)

for {
st := stats.Clone()

// publish the grand stats
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(nil,
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(
[]string{f.c.Name},
st.Get(linesPassed),
st.Get(linesProcessed),
st.Get(linesRejected),
Expand All @@ -168,7 +175,10 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
// publish the per rule stats
for i, subject := range rules.Subjects() {
f.nc.Publish(f.c.NATSSubjectMonitor,
ruleLine.Format([]string{subject}, st.Get(ruleToStatsName(i))),
ruleLine.Format(
[]string{f.c.Name, subject},
st.Get(ruleToStatsName(i)),
),
)
}

Expand Down
29 changes: 24 additions & 5 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
const natsPort = 44446

var conf = config.Config{
Name: "particle",
NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort),
NATSSubject: []string{"filter-test"},
NATSSubjectMonitor: "filter-test-monitor",
Expand Down Expand Up @@ -98,22 +99,40 @@ goodbye,host=gopher01
`)

// Receive total stats
assertReceived(t, statsCh, "stats", `
spout_stat_filter passed=2,processed=3,rejected=1
assertReceivedMulti(t, statsCh, "stats", `
spout_stat_filter,filter=particle passed=2,processed=3,rejected=1
`)

// Receive rule specific stats
assertReceived(t, statsCh, "rule stats", `
spout_stat_filter_rule,rule=hello-subject triggered=2
assertReceivedMulti(t, statsCh, "rule stats", `
spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2
`)
}

func assertReceived(t *testing.T, ch <-chan string, label, expected string) {
expected = expected[1:]

select {
case received := <-ch:
assert.Equal(t, expected, received)
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for " + label)
t.Fatalf("timed out waiting for %s", label)
}
}

func assertReceivedMulti(t *testing.T, ch <-chan string, label, expected string) {
expected = expected[1:]

var received string
timeout := time.After(spouttest.LongWait)
for {
select {
case received = <-ch:
if expected == received {
return
}
case <-timeout:
t.Fatalf("timed out waiting for %s. last received: %q", label, received)
}
}
}

0 comments on commit 938fbe4

Please sign in to comment.