From 3582e39822b4be3956261296c2a6e12f8b656798 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 24 Apr 2018 15:41:35 +1200 Subject: [PATCH] filter: Expose NATS dropped messages metric --- filter/filter.go | 13 +++++++++++++ filter/filter_medium_test.go | 2 ++ spouttest/e2e_test.go | 1 + 3 files changed, 16 insertions(+) diff --git a/filter/filter.go b/filter/filter.go index bcab1e0..1fb1696 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -34,6 +34,7 @@ const ( statProcessed = "processed" statRejected = "rejected" statInvalidTime = "invalid_time" + statNATSDropped = "nats_dropped" ) // StartFilter creates a Filter instance, sets up its rules based on @@ -113,6 +114,7 @@ func initStats(rules *RuleSet) *stats.Stats { statProcessed, statRejected, statInvalidTime, + statNATSDropped, } for i := 0; i < rules.Count(); i++ { statNames = append(statNames, ruleToStatsName(i)) @@ -164,6 +166,8 @@ func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) { } for { + f.updateNATSDropped(st) + now := time.Now() snap, ruleCounts := splitSnapshot(st.Snapshot()) @@ -193,6 +197,15 @@ func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) { } } +func (f *Filter) updateNATSDropped(st *stats.Stats) { + dropped, err := f.sub.Dropped() + if err != nil { + log.Printf("NATS: failed to read subscription drops: %v", err) + return + } + st.Max(statNATSDropped, dropped) +} + const rulePrefix = "rule-" // ruleToStatsName converts a rule index to a name to a key for use diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index debe8da..1bd45a2 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -109,6 +109,7 @@ goodbye,host=gopher01 `processed{component="filter",name="particle"} 3`, `rejected{component="filter",name="particle"} 1`, `invalid_time{component="filter",name="particle"} 0`, + `nats_dropped{component="filter",name="particle"} 0`, `triggered{component="filter",name="particle",rule="hello-subject"} 2`, }) } @@ -166,6 +167,7 @@ func TestInvalidTimeStamps(t *testing.T) { `processed{component="filter",name="particle"} 4`, `rejected{component="filter",name="particle"} 0`, `invalid_time{component="filter",name="particle"} 2`, + `nats_dropped{component="filter",name="particle"} 0`, `triggered{component="filter",name="particle",rule="hello-subject"} 2`, }) } diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go index 0203663..70701cc 100644 --- a/spouttest/e2e_test.go +++ b/spouttest/e2e_test.go @@ -133,6 +133,7 @@ func TestEndToEnd(t *testing.T) { failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 0 invalid_time{component="filter",name="filter"} 0 max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} \d+ +nats_dropped{component="filter",name="filter"} 0 passed{component="filter",name="filter"} 10 processed{component="filter",name="filter"} 20 read_errors{component="listener",name="listener"} 0