From fd57289d16d1c8bede2f128ace44d2e0de345375 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Thu, 22 Feb 2018 00:07:12 +1300 Subject: [PATCH 1/2] filter: Include name from config as tag in stats ... as is already done for the listener and writer. Fixes #20. --- filter/filter.go | 22 ++++++++++++++++------ filter/filter_medium_test.go | 5 +++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/filter/filter.go b/filter/filter.go index f77d545..51556c8 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -150,16 +150,23 @@ func (f *Filter) Stop() { func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { defer f.wg.Done() - totalLine := lineformatter.New("spout_stat_filter", nil, - "passed", "processed", "rejected") - ruleLine := lineformatter.New("spout_stat_filter_rule", - []string{"rule"}, "triggered") + totalLine := lineformatter.New( + "spout_stat_filter", + []string{"filter"}, + "passed", "processed", "rejected", + ) + ruleLine := lineformatter.New( + "spout_stat_filter_rule", + []string{"filter", "rule"}, + "triggered", + ) for { st := stats.Clone() // publish the grand stats - f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(nil, + f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format( + []string{f.c.Name}, st.Get(linesPassed), st.Get(linesProcessed), st.Get(linesRejected), @@ -168,7 +175,10 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { // publish the per rule stats for i, subject := range rules.Subjects() { f.nc.Publish(f.c.NATSSubjectMonitor, - ruleLine.Format([]string{subject}, st.Get(ruleToStatsName(i))), + ruleLine.Format( + []string{f.c.Name, subject}, + st.Get(ruleToStatsName(i)), + ), ) } diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index ccbccdf..be601b5 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -32,6 +32,7 @@ import ( const natsPort = 44446 var conf = config.Config{ + Name: "particle", NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort), NATSSubject: []string{"filter-test"}, NATSSubjectMonitor: "filter-test-monitor", @@ -99,12 +100,12 @@ goodbye,host=gopher01 // Receive total stats assertReceived(t, statsCh, "stats", ` -spout_stat_filter passed=2,processed=3,rejected=1 +spout_stat_filter,filter=particle passed=2,processed=3,rejected=1 `) // Receive rule specific stats assertReceived(t, statsCh, "rule stats", ` -spout_stat_filter_rule,rule=hello-subject triggered=2 +spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2 `) } From fd9814a607b37a47cf5f2f54a7d290a68813ac74 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 27 Feb 2018 10:10:55 +1300 Subject: [PATCH 2/2] filter: Check stats output multiple times Depending on timing, the test can sometimes see output from the stats goroutine before the test lines have been pushed through. The stats channel is now checked multiple times for the expected output to arrive. --- filter/filter_medium_test.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index be601b5..9b88409 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -99,22 +99,40 @@ goodbye,host=gopher01 `) // Receive total stats - assertReceived(t, statsCh, "stats", ` + assertReceivedMulti(t, statsCh, "stats", ` spout_stat_filter,filter=particle passed=2,processed=3,rejected=1 `) // Receive rule specific stats - assertReceived(t, statsCh, "rule stats", ` + assertReceivedMulti(t, statsCh, "rule stats", ` spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2 `) } func assertReceived(t *testing.T, ch <-chan string, label, expected string) { expected = expected[1:] + select { case received := <-ch: assert.Equal(t, expected, received) case <-time.After(spouttest.LongWait): - t.Fatal("timed out waiting for " + label) + t.Fatalf("timed out waiting for %s", label) + } +} + +func assertReceivedMulti(t *testing.T, ch <-chan string, label, expected string) { + expected = expected[1:] + + var received string + timeout := time.After(spouttest.LongWait) + for { + select { + case received = <-ch: + if expected == received { + return + } + case <-timeout: + t.Fatalf("timed out waiting for %s. last received: %q", label, received) + } } }