From 4df0864f45f98f49e57424cf212bbff612b84bb7 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 10:57:38 +1200 Subject: [PATCH 01/16] stats: Add Snapshot method This will make it more convenient to generate Prometheus lines from a Stats. --- stats/stats.go | 19 +++++++++++++++++++ stats/stats_small_test.go | 16 +++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/stats/stats.go b/stats/stats.go index fb60abd..db4112d 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -60,6 +60,25 @@ func (s *Stats) Inc(name string) int { return s.counts[name] } +// CounterPair holds the and value for one Stats counter at a given +// point in time. +type CounterPair struct { + Name string + Value int +} + +// Snapshot returns the current values of all the counters. +func (s *Stats) Snapshot() []CounterPair { + s.mu.Lock() + defer s.mu.Unlock() + + out := make([]CounterPair, 0, len(s.counts)) + for name, count := range s.counts { + out = append(out, CounterPair{name, count}) + } + return out +} + // Clone returns a new Stats instance, copying the source Stats // counts. func (s *Stats) Clone() *Stats { diff --git a/stats/stats_small_test.go b/stats/stats_small_test.go index b0b96d9..1a0f641 100644 --- a/stats/stats_small_test.go +++ b/stats/stats_small_test.go @@ -49,8 +49,21 @@ func TestInvalid(t *testing.T) { assert.Equal(t, 0, s.Get("foo")) } +func TestSnapshot(t *testing.T) { + s := stats.New("foo", "bar", "qaz") + s.Inc("foo") + s.Inc("bar") + s.Inc("bar") + + assert.ElementsMatch(t, []stats.CounterPair{ + {"foo", 1}, + {"bar", 2}, + {"qaz", 0}, + }, s.Snapshot()) +} + func TestClone(t *testing.T) { - s := stats.New("foo", "bar") + s := stats.New("foo", "bar", "qaz") s.Inc("foo") s.Inc("bar") s.Inc("bar") @@ -58,6 +71,7 @@ func TestClone(t *testing.T) { s2 := s.Clone() assert.Equal(t, 1, s2.Get("foo")) assert.Equal(t, 2, s2.Get("bar")) + assert.Equal(t, 0, s2.Get("qaz")) // Make sure they're independent s2.Inc("foo") From 849759e6272deb6531e027150afa91ff901ec0dd Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 11:48:21 +1200 Subject: [PATCH 02/16] stats: Add SnapshotToPrometheus New function to takes a Stats Snapshot and convert it to Prometheus metrics lines. --- stats/prometheus.go | 51 +++++++++++++++++++++++++++ stats/prometheus_small_test.go | 63 ++++++++++++++++++++++++++++++++++ stats/stats.go | 4 ++- 3 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 stats/prometheus.go create mode 100644 stats/prometheus_small_test.go diff --git a/stats/prometheus.go b/stats/prometheus.go new file mode 100644 index 0000000..4bb177a --- /dev/null +++ b/stats/prometheus.go @@ -0,0 +1,51 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stats + +import ( + "time" + + "github.com/jumptrading/influx-spout/prometheus" +) + +// SnapshotToPrometheus takes Snapshot produced by a Stats instance +// and formats it into Prometheus metrics lines using the timestamp +// and labels provided. +func SnapshotToPrometheus( + snap Snapshot, + now time.Time, + labels map[string]string, +) []byte { + millis := now.UnixNano() / int64(time.Millisecond) + + labelPairs := make(prometheus.LabelPairs, 0, len(labels)) + for name, value := range labels { + labelPairs = append(labelPairs, prometheus.LabelPair{ + Name: []byte(name), + Value: []byte(value), + }) + } + + set := prometheus.NewMetricSet() + for _, counter := range snap { + set.Update(&prometheus.Metric{ + Name: []byte(counter.Name), + Labels: labelPairs, + Value: int64(counter.Value), + Milliseconds: millis, + }) + } + return set.ToBytes() +} diff --git a/stats/prometheus_small_test.go b/stats/prometheus_small_test.go new file mode 100644 index 0000000..51be66d --- /dev/null +++ b/stats/prometheus_small_test.go @@ -0,0 +1,63 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build small + +package stats_test + +import ( + "testing" + "time" + + "github.com/jumptrading/influx-spout/stats" + "github.com/stretchr/testify/assert" +) + +func TestToPrometheus(t *testing.T) { + snap := stats.Snapshot{ + {"foo", 42}, + {"bar", 99}, + {"qaz", 0}, + } + ts := time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) + + actual := stats.SnapshotToPrometheus(snap, ts, nil) + expected := []byte(` +bar 99 1234567890000 +foo 42 1234567890000 +qaz 0 1234567890000 +`[1:]) + assert.Equal(t, expected, actual) +} + +func TestToPrometheusWithLabels(t *testing.T) { + snap := stats.Snapshot{ + {"foo", 42}, + {"bar", 99}, + {"qaz", 0}, + } + ts := time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) + labels := map[string]string{ + "host": "nyc01", + "level": "high", + } + + actual := stats.SnapshotToPrometheus(snap, ts, labels) + expected := []byte(` +bar{host="nyc01",level="high"} 99 1234567890000 +foo{host="nyc01",level="high"} 42 1234567890000 +qaz{host="nyc01",level="high"} 0 1234567890000 +`[1:]) + assert.Equal(t, expected, actual) +} diff --git a/stats/stats.go b/stats/stats.go index db4112d..853cde9 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -67,8 +67,10 @@ type CounterPair struct { Value int } +type Snapshot []CounterPair + // Snapshot returns the current values of all the counters. -func (s *Stats) Snapshot() []CounterPair { +func (s *Stats) Snapshot() Snapshot { s.mu.Lock() defer s.mu.Unlock() From 7a80da7318970ec562e1d02ab70f0e145244ce12 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 13:56:04 +1200 Subject: [PATCH 03/16] listener: Convert to sending Prometheus metrics to monitor subject --- listener/listener.go | 41 +++++++++++--------------------- listener/listener_medium_test.go | 38 +++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/listener/listener.go b/listener/listener.go index df19b84..903bb85 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -29,22 +29,19 @@ import ( "github.com/nats-io/go-nats" "github.com/jumptrading/influx-spout/config" - "github.com/jumptrading/influx-spout/lineformatter" "github.com/jumptrading/influx-spout/stats" ) const ( // Listener stats counters - linesReceived = "lines-received" - batchesSent = "batches-sent" - readErrors = "read-errors" + statReceived = "received" + statSent = "sent" + statReadErrors = "read_errors" // The maximum possible UDP read size. udpMaxDatagramSize = 65536 ) -var allStats = []string{linesReceived, batchesSent, readErrors} - var statsInterval = 3 * time.Second // StartListener initialises a listener, starts its statistician @@ -132,7 +129,7 @@ func newListener(c *config.Config) (*Listener, error) { c: c, ready: make(chan struct{}), stop: make(chan struct{}), - stats: stats.New(allStats...), + stats: stats.New(statReceived, statSent, statReadErrors), buf: make([]byte, c.ListenerBatchBytes), // If more than batchSizeThreshold bytes has been written to @@ -194,7 +191,7 @@ func (l *Listener) listenUDP(sc *net.UDPConn) { sc.SetReadDeadline(time.Now().Add(time.Second)) sz, _, err := sc.ReadFromUDP(l.buf[l.batchSize:]) if err != nil && !isTimeout(err) { - l.stats.Inc(readErrors) + l.stats.Inc(statReadErrors) } // Attempt to process the read even on error as Read may @@ -221,7 +218,7 @@ func (l *Listener) setupHTTP() *http.Server { if err != nil { if err != io.EOF { - l.stats.Inc(readErrors) + l.stats.Inc(statReadErrors) } break } @@ -255,7 +252,7 @@ func (l *Listener) processRead(sz int) { return // Empty read } - linesReceived := l.stats.Inc(linesReceived) + statReceived := l.stats.Inc(statReceived) l.batchSize += sz if l.c.Debug { @@ -264,8 +261,8 @@ func (l *Listener) processRead(sz int) { // Send when sufficient reads have been batched or the batch // buffer is almost full. - if linesReceived%l.c.BatchMessages == 0 || l.batchSize > l.batchSizeThreshold { - l.stats.Inc(batchesSent) + if statReceived%l.c.BatchMessages == 0 || l.batchSize > l.batchSizeThreshold { + l.stats.Inc(statSent) if err := l.nc.Publish(l.c.NATSSubject[0], l.buf[:l.batchSize]); err != nil { l.handleNatsError(err) } @@ -280,22 +277,12 @@ func (l *Listener) handleNatsError(err error) { func (l *Listener) startStatistician() { defer l.wg.Done() - statsLine := lineformatter.New( - "spout_stat_listener", - []string{"listener"}, - "received", - "sent", - "read_errors", - ) - tagVals := []string{l.c.Name} + labels := map[string]string{ + "listener": l.c.Name, + } for { - stats := l.stats.Clone() // Sample counts - l.nc.Publish(l.c.NATSSubjectMonitor, statsLine.Format( - tagVals, - stats.Get(linesReceived), - stats.Get(batchesSent), - stats.Get(readErrors), - )) + lines := stats.SnapshotToPrometheus(l.stats.Snapshot(), time.Now(), labels) + l.nc.Publish(l.c.NATSSubjectMonitor, lines) select { case <-time.After(statsInterval): case <-l.stop: diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index 8161f31..9fc29a7 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -22,6 +22,7 @@ import ( "net" "net/http" "os" + "strconv" "strings" "testing" "time" @@ -295,19 +296,42 @@ func assertNoMore(t *testing.T, ch chan string) { } func assertMonitor(t *testing.T, monitorCh chan string, received, sent int) { - expected := fmt.Sprintf( - "spout_stat_listener,listener=testlistener received=%d,sent=%d,read_errors=0\n", - received, sent) - var line string + remaining := map[string]bool{ + fmt.Sprintf(`received{listener="testlistener"} %d`, received): true, + fmt.Sprintf(`sent{listener="testlistener"} %d`, sent): true, + `read_errors{listener="testlistener"} 0`: true, + } + + var allLines string timeout := time.After(spouttest.LongWait) for { select { - case line = <-monitorCh: - if line == expected { + case lines := <-monitorCh: + for _, line := range strings.Split(lines, "\n") { + if len(line) == 0 { + continue + } + line = stripTimestamp(t, line) + allLines += fmt.Sprintf("%q\n", line) + delete(remaining, line) + } + if len(remaining) < 1 { return } case <-timeout: - t.Fatalf("timed out waiting for expected stats. last received: %v", line) + t.Fatalf("timed out waiting for expected stats. received: %s", allLines) } } } + +func stripTimestamp(t *testing.T, s string) string { + i := strings.LastIndexByte(s, ' ') + require.True(t, i >= 0) + + // Check that end looks like a timestamp + _, err := strconv.Atoi(s[i+1:]) + require.NoError(t, err) + + // Strip off the timestamp + return s[:i] +} From 8f96283757a4592fe2e3c59b77abf94b54a697ce Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:05:59 +1200 Subject: [PATCH 04/16] filter: Emit monitor metrics in Prometheus format --- filter/filter.go | 101 ++++++++++++++++++++----------- filter/filter_medium_test.go | 46 +++++++------- filter/worker.go | 8 +-- listener/listener_medium_test.go | 43 ++----------- spouttest/asserts.go | 47 ++++++++++++++ 5 files changed, 142 insertions(+), 103 deletions(-) diff --git a/filter/filter.go b/filter/filter.go index 1beeaa9..e5d4072 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -18,22 +18,23 @@ package filter import ( "fmt" "log" - "strconv" + "sort" + "strings" "sync" "time" "github.com/jumptrading/influx-spout/config" - "github.com/jumptrading/influx-spout/lineformatter" + "github.com/jumptrading/influx-spout/prometheus" "github.com/jumptrading/influx-spout/stats" "github.com/nats-io/go-nats" ) // Name for supported stats const ( - linesPassed = "passed" - linesProcessed = "processed" - linesRejected = "rejected" - linesInvalidTime = "invalid-time" + statPassed = "passed" + statProcessed = "processed" + statRejected = "rejected" + statInvalidTime = "invalid_time" ) // StartFilter creates a Filter instance, sets up its rules based on @@ -109,10 +110,10 @@ func (f *Filter) natsConnect() (natsConn, error) { func initStats(rules *RuleSet) *stats.Stats { // Initialise statNames := []string{ - linesPassed, - linesProcessed, - linesRejected, - linesInvalidTime, + statPassed, + statProcessed, + statRejected, + statInvalidTime, } for i := 0; i < rules.Count(); i++ { statNames = append(statNames, ruleToStatsName(i)) @@ -155,40 +156,36 @@ func (f *Filter) Stop() { // startStatistician defines a goroutine that is responsible for // regularly sending the filter's statistics to the monitoring // backend. -func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { +func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) { defer f.wg.Done() - totalLine := lineformatter.New( - "spout_stat_filter", - []string{"filter"}, - linesPassed, linesProcessed, linesRejected, linesInvalidTime, - ) - ruleLine := lineformatter.New( - "spout_stat_filter_rule", - []string{"filter", "rule"}, - "triggered", - ) + generalLabels := map[string]string{ + "filter": f.c.Name, + } for { - st := stats.Clone() + now := time.Now() + snap, ruleCounts := splitSnapshot(st.Snapshot()) - // publish the grand stats - f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format( - []string{f.c.Name}, - st.Get(linesPassed), - st.Get(linesProcessed), - st.Get(linesRejected), - st.Get(linesInvalidTime), - )) + // publish the general stats + lines := stats.SnapshotToPrometheus(snap, now, generalLabels) + f.nc.Publish(f.c.NATSSubjectMonitor, lines) // publish the per rule stats + // XXX merge with SnapshotToPrometheus + millis := now.UnixNano() / int64(time.Millisecond) for i, subject := range rules.Subjects() { - f.nc.Publish(f.c.NATSSubjectMonitor, - ruleLine.Format( - []string{f.c.Name, subject}, - st.Get(ruleToStatsName(i)), - ), - ) + metric := &prometheus.Metric{ + Name: []byte("triggered"), + Labels: prometheus.LabelPairs{ + {[]byte("filter"), []byte(f.c.Name)}, + {[]byte("rule"), []byte(subject)}, + }, + Value: int64(ruleCounts[i]), + Milliseconds: millis, + } + + f.nc.Publish(f.c.NATSSubjectMonitor, metric.ToBytes()) } select { @@ -199,8 +196,38 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { } } +const rulePrefix = "rule-" + // ruleToStatsName converts a rule index to a name to a key for use // with a stats.Stats instance. func ruleToStatsName(i int) string { - return "rule" + strconv.Itoa(i) + return fmt.Sprintf("%s%06d", rulePrefix, i) +} + +// splitSnapshot takes a Snapshot and splits out the rule counters +// from the others. The rule counters are returned in an ordered slice +// while the other counters are returned as a new (smaller) Snapshot. +func splitSnapshot(snap stats.Snapshot) (stats.Snapshot, []int) { + var genSnap stats.Snapshot + var ruleSnap stats.Snapshot + + // Split up rule counters from the others. + for _, counter := range snap { + if strings.HasPrefix(counter.Name, rulePrefix) { + ruleSnap = append(ruleSnap, counter) + } else { + genSnap = append(genSnap, counter) + } + } + + // Sort the rule counters by name and extract just the counts. + sort.Slice(ruleSnap, func(i, j int) bool { + return ruleSnap[i].Name < ruleSnap[j].Name + }) + ruleCounts := make([]int, len(ruleSnap)) + for i, counter := range ruleSnap { + ruleCounts[i] = counter.Value + } + + return genSnap, ruleCounts } diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index a01c1f3..c4487db 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -76,10 +76,10 @@ func TestFilterWorker(t *testing.T) { }) require.NoError(t, err) - // Subscribe to stats output - statsCh := make(chan string, 10) + // Subscribe to monitor output + monitorCh := make(chan string, 10) _, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) { - statsCh <- string(msg.Data) + monitorCh <- string(msg.Data) }) require.NoError(t, err) @@ -103,15 +103,14 @@ hello,host=gopher01 goodbye,host=gopher01 `) - // Receive total stats - spouttest.AssertRecvMulti(t, statsCh, "stats", ` -spout_stat_filter,filter=particle passed=2,processed=3,rejected=1,invalid-time=0 -`) - - // Receive rule specific stats - spouttest.AssertRecvMulti(t, statsCh, "rule stats", ` -spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2 -`) + // Receive monitor metrics + spouttest.AssertMonitor(t, monitorCh, []string{ + `passed{filter="particle"} 2`, + `processed{filter="particle"} 3`, + `rejected{filter="particle"} 1`, + `invalid_time{filter="particle"} 0`, + `triggered{filter="particle",rule="hello-subject"} 2`, + }) } func TestInvalidTimeStamps(t *testing.T) { @@ -136,10 +135,10 @@ func TestInvalidTimeStamps(t *testing.T) { }) require.NoError(t, err) - // Subscribe to stats output - statsCh := make(chan string, 10) + // Subscribe to monitor output + monitorCh := make(chan string, 10) _, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) { - statsCh <- string(msg.Data) + monitorCh <- string(msg.Data) }) require.NoError(t, err) @@ -161,13 +160,12 @@ func TestInvalidTimeStamps(t *testing.T) { // Expect to see the 3rd & 4th lines. spouttest.AssertRecv(t, helloCh, "helloCh", strings.Join(lines[2:], "\n")) - // Receive total stats. - spouttest.AssertRecvMulti(t, statsCh, "stats", ` -spout_stat_filter,filter=particle passed=2,processed=4,rejected=0,invalid-time=2 -`) - - // Receive rule specific stats - spouttest.AssertRecvMulti(t, statsCh, "rule stats", ` -spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2 -`) + // Receive monitor metrics. + spouttest.AssertMonitor(t, monitorCh, []string{ + `passed{filter="particle"} 2`, + `processed{filter="particle"} 4`, + `rejected{filter="particle"} 0`, + `invalid_time{filter="particle"} 2`, + `triggered{filter="particle",rule="hello-subject"} 2`, + }) } diff --git a/filter/worker.go b/filter/worker.go index 9dd836a..15926ae 100644 --- a/filter/worker.go +++ b/filter/worker.go @@ -91,13 +91,13 @@ func (w *worker) processBatch(batch []byte) { if len(line) == 0 { continue } - w.stats.Inc(linesProcessed) + w.stats.Inc(statProcessed) ts := extractTimestamp(line, now) if minTs < ts && ts < maxTs { w.processLine(line) } else { - w.stats.Inc(linesInvalidTime) + w.stats.Inc(statInvalidTime) if w.debug { log.Printf("invalid line timestamp: %q", string(line)) } @@ -112,7 +112,7 @@ func (w *worker) processLine(line []byte) { idx := w.rules.Lookup(line) if idx == -1 { // no rule for this => junkyard - w.stats.Inc(linesRejected) + w.stats.Inc(statRejected) w.junkBatch.Write(line) return } @@ -120,7 +120,7 @@ func (w *worker) processLine(line []byte) { // write to the corresponding batch buffer w.batches[idx].Write(line) - w.stats.Inc(linesPassed) + w.stats.Inc(statPassed) w.stats.Inc(ruleToStatsName(idx)) } diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index 9fc29a7..db22412 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -22,7 +22,6 @@ import ( "net" "net/http" "os" - "strconv" "strings" "testing" "time" @@ -296,42 +295,10 @@ func assertNoMore(t *testing.T, ch chan string) { } func assertMonitor(t *testing.T, monitorCh chan string, received, sent int) { - remaining := map[string]bool{ - fmt.Sprintf(`received{listener="testlistener"} %d`, received): true, - fmt.Sprintf(`sent{listener="testlistener"} %d`, sent): true, - `read_errors{listener="testlistener"} 0`: true, + expected := []string{ + fmt.Sprintf(`received{listener="testlistener"} %d`, received), + fmt.Sprintf(`sent{listener="testlistener"} %d`, sent), + `read_errors{listener="testlistener"} 0`, } - - var allLines string - timeout := time.After(spouttest.LongWait) - for { - select { - case lines := <-monitorCh: - for _, line := range strings.Split(lines, "\n") { - if len(line) == 0 { - continue - } - line = stripTimestamp(t, line) - allLines += fmt.Sprintf("%q\n", line) - delete(remaining, line) - } - if len(remaining) < 1 { - return - } - case <-timeout: - t.Fatalf("timed out waiting for expected stats. received: %s", allLines) - } - } -} - -func stripTimestamp(t *testing.T, s string) string { - i := strings.LastIndexByte(s, ' ') - require.True(t, i >= 0) - - // Check that end looks like a timestamp - _, err := strconv.Atoi(s[i+1:]) - require.NoError(t, err) - - // Strip off the timestamp - return s[:i] + spouttest.AssertMonitor(t, monitorCh, expected) } diff --git a/spouttest/asserts.go b/spouttest/asserts.go index 9aa6aea..f9a1471 100644 --- a/spouttest/asserts.go +++ b/spouttest/asserts.go @@ -1,11 +1,14 @@ package spouttest import ( + "fmt" + "strconv" "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // AssertRecv checks that a specific string has been received from a @@ -49,3 +52,47 @@ func stripLeadingNL(s string) string { } return s } + +// AssertMonitor ensures that a number of lines have been from a +// component's statistician goroutine. The target lines may arrive in +// any order and non-matching lines are ignored. Timestamps on the +// received lines are checked for and then stripped. +func AssertMonitor(t *testing.T, ch chan string, expected []string) { + remaining := make(map[string]bool) + for _, line := range expected { + remaining[line] = true + } + + var seenLines string + timeout := time.After(LongWait) + for { + select { + case lines := <-ch: + for _, line := range strings.Split(lines, "\n") { + if len(line) == 0 { + continue + } + line = stripTimestamp(t, line) + seenLines += fmt.Sprintf("%q\n", line) + delete(remaining, line) + } + if len(remaining) < 1 { + return + } + case <-timeout: + t.Fatalf("timed out waiting for expected lines. received:\n%s", seenLines) + } + } +} + +func stripTimestamp(t *testing.T, s string) string { + i := strings.LastIndexByte(s, ' ') + require.True(t, i >= 0) + + // Check that end looks like a timestamp + _, err := strconv.Atoi(s[i+1:]) + require.NoError(t, err) + + // Strip off the timestamp + return s[:i] +} From 746055f19fb45c9fa05c31ad83b7fcb66fec028e Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:21:03 +1200 Subject: [PATCH 05/16] writer: Emit metrics in Prometheus format --- writer/writer.go | 50 ++++++++++++------------------------ writer/writer_medium_test.go | 26 ++++++++++--------- 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/writer/writer.go b/writer/writer.go index d8ed3c9..23fe071 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -39,9 +39,9 @@ import ( // Writer stats counters const ( - batchesReceived = "batches-received" - writeRequests = "write-requests" - failedWrites = "failed-writes" + statReceived = "received" + statWriteRequests = "write_requests" + statFailedWrites = "failed_writes" ) type Writer struct { @@ -64,7 +64,7 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { url: fmt.Sprintf("http://%s:%d/write?db=%s", c.InfluxDBAddress, c.InfluxDBPort, c.DBName), batchMaxBytes: c.BatchMaxMB * 1024 * 1024, batchMaxAge: time.Duration(c.BatchMaxSecs) * time.Second, - stats: stats.New(batchesReceived, writeRequests, failedWrites), + stats: stats.New(statReceived, statWriteRequests, statFailedWrites), stop: make(chan struct{}), } defer func() { @@ -160,7 +160,7 @@ func (w *Writer) worker(jobs <-chan *nats.Msg) { for { select { case j := <-jobs: - w.stats.Inc(batchesReceived) + w.stats.Inc(statReceived) batchWrite(j.Data) case <-time.After(time.Second): // Wake up regularly to check batch age @@ -169,10 +169,10 @@ func (w *Writer) worker(jobs <-chan *nats.Msg) { } if w.shouldSendBatch(batch) { - w.stats.Inc(writeRequests) + w.stats.Inc(statWriteRequests) if err := w.sendBatch(batch, client); err != nil { - w.stats.Inc(failedWrites) + w.stats.Inc(statFailedWrites) log.Printf("Error: %v", err) } @@ -284,37 +284,21 @@ func (w *Writer) monitorSub(sub *nats.Subscription) { } } +// This goroutine is responsible for monitoring the statistics and +// sending it to the monitoring backend. func (w *Writer) startStatistician() { defer w.wg.Done() - // This goroutine is responsible for monitoring the statistics and - // sending it to the monitoring backend. - statsLine := lineformatter.New( - "spout_stat_writer", - []string{ // tag keys - "writer", - "influxdb_address", - "influxdb_port", - "influxdb_dbname", - }, - "received", - "write_requests", - "failed_writes", - ) - tagVals := []string{ - w.c.Name, - w.c.InfluxDBAddress, - strconv.Itoa(w.c.InfluxDBPort), - w.c.DBName, + labels := map[string]string{ + "writer": w.c.Name, + "influxdb_address": w.c.InfluxDBAddress, + "influxdb_port": strconv.Itoa(w.c.InfluxDBPort), + "influxdb_dbname": w.c.DBName, } + for { - stats := w.stats.Clone() - w.nc.Publish(w.c.NATSSubjectMonitor, statsLine.Format( - tagVals, - stats.Get(batchesReceived), - stats.Get(writeRequests), - stats.Get(failedWrites), - )) + lines := stats.SnapshotToPrometheus(w.stats.Snapshot(), time.Now(), labels) + w.nc.Publish(w.c.NATSSubjectMonitor, lines) select { case <-time.After(3 * time.Second): diff --git a/writer/writer_medium_test.go b/writer/writer_medium_test.go index f2cc5b9..4085a6c 100644 --- a/writer/writer_medium_test.go +++ b/writer/writer_medium_test.go @@ -20,7 +20,6 @@ import ( "fmt" "io/ioutil" "net/http" - "strconv" "strings" "sync" "testing" @@ -71,9 +70,9 @@ func TestBasicWriter(t *testing.T) { defer w.Stop() // Subscribe to stats output. - statsCh := make(chan string, 10) + monitorCh := make(chan string, 10) _, err := nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) { - statsCh <- string(msg.Data) + monitorCh <- string(msg.Data) }) require.NoError(t, err) @@ -95,15 +94,18 @@ func TestBasicWriter(t *testing.T) { } } - // Check the stats output. - spouttest.AssertRecvMulti(t, statsCh, "stats", - strings.Join([]string{ - "spout_stat_writer", - "writer=foo", - "influxdb_address=localhost", - "influxdb_port=" + strconv.Itoa(influxPort), - "influxdb_dbname=metrics", - }, ",")+" received=5,write_requests=5,failed_writes=0\n") + // Check the monitor output. + labels := "{" + strings.Join([]string{ + `influxdb_address="localhost"`, + `influxdb_dbname="metrics"`, + fmt.Sprintf(`influxdb_port="%d"`, influxPort), + `writer="foo"`, + }, ",") + "}" + spouttest.AssertMonitor(t, monitorCh, []string{ + `received` + labels + ` 5`, + `write_requests` + labels + ` 5`, + `failed_writes` + labels + ` 0`, + }) } func TestBatchMBLimit(t *testing.T) { From 7cbff78a54b24d625d4e45dc384c66247132e19d Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:21:33 +1200 Subject: [PATCH 06/16] spouttest: Clearer failure messages from AssertMonitor --- spouttest/asserts.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spouttest/asserts.go b/spouttest/asserts.go index f9a1471..ea82e93 100644 --- a/spouttest/asserts.go +++ b/spouttest/asserts.go @@ -73,14 +73,17 @@ func AssertMonitor(t *testing.T, ch chan string, expected []string) { continue } line = stripTimestamp(t, line) - seenLines += fmt.Sprintf("%q\n", line) + seenLines += fmt.Sprintf("%s\n", line) delete(remaining, line) } if len(remaining) < 1 { return } case <-timeout: - t.Fatalf("timed out waiting for expected lines. received:\n%s", seenLines) + t.Fatalf("timed out waiting for expected lines. expected:\n%s\nsaw:\n%s", + strings.Join(expected, "\n"), + seenLines, + ) } } } From b7e443f988ce9b2824a23cce74d8bcecbeb2bdc9 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:35:07 +1200 Subject: [PATCH 07/16] spouttest: Remove unused AssertRecvMulti helper All usage of this have been replaced by AssertMonitor. --- spouttest/asserts.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/spouttest/asserts.go b/spouttest/asserts.go index ea82e93..91667a9 100644 --- a/spouttest/asserts.go +++ b/spouttest/asserts.go @@ -24,26 +24,6 @@ func AssertRecv(t *testing.T, ch <-chan string, label, expected string) { } } -// AssertRecvMulti checks that a specific string has been received -// from a channel. The channel will be read multiple times if -// required. The check times out after LongWait. -func AssertRecvMulti(t *testing.T, ch <-chan string, label, expected string) { - expected = stripLeadingNL(expected) - - var received string - timeout := time.After(LongWait) - for { - select { - case received = <-ch: - if expected == received { - return - } - case <-timeout: - t.Fatalf("timed out waiting for %s. last received: %q", label, received) - } - } -} - func stripLeadingNL(s string) string { // This allows long `expected` strings to be formatted nicely in // the caller. From 7782183ac2b0bf0aad603b69a4ef9667317aec6b Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:35:41 +1200 Subject: [PATCH 08/16] stats: Add CounterToPrometheus This helper is useful for cases where a single Prometheus metric line is required (instead of generating from a Snapshot). --- stats/prometheus.go | 28 +++++++++++++++++++++++++++- stats/prometheus_small_test.go | 9 +++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/stats/prometheus.go b/stats/prometheus.go index 4bb177a..ce5299e 100644 --- a/stats/prometheus.go +++ b/stats/prometheus.go @@ -28,7 +28,7 @@ func SnapshotToPrometheus( now time.Time, labels map[string]string, ) []byte { - millis := now.UnixNano() / int64(time.Millisecond) + millis := timeToMillis(now) labelPairs := make(prometheus.LabelPairs, 0, len(labels)) for name, value := range labels { @@ -49,3 +49,29 @@ func SnapshotToPrometheus( } return set.ToBytes() } + +// CounterToPrometheus generates a single Prometheus line for a counter. +func CounterToPrometheus(name string, value int, now time.Time, labels map[string]string) []byte { + metric := &prometheus.Metric{ + Name: []byte(name), + Labels: toLabelPairs(labels), + Value: int64(value), + Milliseconds: timeToMillis(now), + } + return metric.ToBytes() +} + +func timeToMillis(t time.Time) int64 { + return t.UnixNano() / int64(time.Millisecond) +} + +func toLabelPairs(labels map[string]string) prometheus.LabelPairs { + labelPairs := make(prometheus.LabelPairs, 0, len(labels)) + for name, value := range labels { + labelPairs = append(labelPairs, prometheus.LabelPair{ + Name: []byte(name), + Value: []byte(value), + }) + } + return labelPairs +} diff --git a/stats/prometheus_small_test.go b/stats/prometheus_small_test.go index 51be66d..dc9b1db 100644 --- a/stats/prometheus_small_test.go +++ b/stats/prometheus_small_test.go @@ -61,3 +61,12 @@ qaz{host="nyc01",level="high"} 0 1234567890000 `[1:]) assert.Equal(t, expected, actual) } + +func TestCounterToPrometheus(t *testing.T) { + ts := time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) + labels := map[string]string{"host": "nyc01"} + + actual := stats.CounterToPrometheus("foo", 99, ts, labels) + expected := []byte(`foo{host="nyc01"} 99 1234567890000`) + assert.Equal(t, expected, actual) +} From e037211d0045c61af6e8b870cfaf43f58330ff8f Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:40:07 +1200 Subject: [PATCH 09/16] filter: Clean up generation of per rule metrics --- filter/filter.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/filter/filter.go b/filter/filter.go index e5d4072..b55adb4 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -24,7 +24,6 @@ import ( "time" "github.com/jumptrading/influx-spout/config" - "github.com/jumptrading/influx-spout/prometheus" "github.com/jumptrading/influx-spout/stats" "github.com/nats-io/go-nats" ) @@ -172,20 +171,16 @@ func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) { f.nc.Publish(f.c.NATSSubjectMonitor, lines) // publish the per rule stats - // XXX merge with SnapshotToPrometheus - millis := now.UnixNano() / int64(time.Millisecond) for i, subject := range rules.Subjects() { - metric := &prometheus.Metric{ - Name: []byte("triggered"), - Labels: prometheus.LabelPairs{ - {[]byte("filter"), []byte(f.c.Name)}, - {[]byte("rule"), []byte(subject)}, + f.nc.Publish(f.c.NATSSubjectMonitor, stats.CounterToPrometheus( + "triggered", + ruleCounts[i], + now, + map[string]string{ + "filter": f.c.Name, + "rule": subject, }, - Value: int64(ruleCounts[i]), - Milliseconds: millis, - } - - f.nc.Publish(f.c.NATSSubjectMonitor, metric.ToBytes()) + )) } select { From 56abc7f85bca745771f6f5fcb65d109a8ab42f86 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:49:25 +1200 Subject: [PATCH 10/16] writer: Use Prometheus metric for subscription drop stats --- writer/writer.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/writer/writer.go b/writer/writer.go index 23fe071..aedf883 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -33,7 +33,6 @@ import ( "github.com/jumptrading/influx-spout/config" "github.com/jumptrading/influx-spout/filter" - "github.com/jumptrading/influx-spout/lineformatter" "github.com/jumptrading/influx-spout/stats" ) @@ -241,17 +240,15 @@ func (w *Writer) sendBatch(batch *batchBuffer, client *http.Client) error { return nil } -var dropLine = lineformatter.New("writer_drop", nil, "total", "diff") - -func (w *Writer) signalDrop(drop, last int) { +func (w *Writer) signalDrop(subject string, drop, last int) { // uh, this writer is overloaded and had to drop a packet - log.Printf("Warning: dropped %d (now %d dropped in total)\n", drop-last, drop) + log.Printf("Warning: dropped %d for subject %q (total dropped: %d)", drop-last, subject, drop) - // publish to the monitor subject, so grafana can pick it up and report failures - w.nc.Publish(w.c.NATSSubjectMonitor, dropLine.FormatT(time.Now(), nil, drop, drop-last)) + labels := w.metricsLabels() + labels["subject"] = subject - // the fact the we dropped a packet MUST reach the server - // immediately so we can investigate + line := stats.CounterToPrometheus("dropped", drop, time.Now(), labels) + w.nc.Publish(w.c.NATSSubjectMonitor, line) w.nc.Flush() } @@ -271,7 +268,7 @@ func (w *Writer) monitorSub(sub *nats.Subscription) { } if drop != last { - w.signalDrop(drop, last) + w.signalDrop(sub.Subject, drop, last) } last = drop @@ -289,13 +286,7 @@ func (w *Writer) monitorSub(sub *nats.Subscription) { func (w *Writer) startStatistician() { defer w.wg.Done() - labels := map[string]string{ - "writer": w.c.Name, - "influxdb_address": w.c.InfluxDBAddress, - "influxdb_port": strconv.Itoa(w.c.InfluxDBPort), - "influxdb_dbname": w.c.DBName, - } - + labels := w.metricsLabels() for { lines := stats.SnapshotToPrometheus(w.stats.Snapshot(), time.Now(), labels) w.nc.Publish(w.c.NATSSubjectMonitor, lines) @@ -307,3 +298,12 @@ func (w *Writer) startStatistician() { } } } + +func (w *Writer) metricsLabels() map[string]string { + return map[string]string{ + "writer": w.c.Name, + "influxdb_address": w.c.InfluxDBAddress, + "influxdb_port": strconv.Itoa(w.c.InfluxDBPort), + "influxdb_dbname": w.c.DBName, + } +} From 161781235b6068d814a659335f96f4aa02faf322 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:50:47 +1200 Subject: [PATCH 11/16] lineformatter: Removed No longer used now that the components are publishing metrics using Prometheus format. --- lineformatter/lineformatter.go | 153 ---------------------- lineformatter/lineformatter_small_test.go | 149 --------------------- 2 files changed, 302 deletions(-) delete mode 100644 lineformatter/lineformatter.go delete mode 100644 lineformatter/lineformatter_small_test.go diff --git a/lineformatter/lineformatter.go b/lineformatter/lineformatter.go deleted file mode 100644 index a8764db..0000000 --- a/lineformatter/lineformatter.go +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2017 Jump Trading -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package lineformatter only contains the LineFormatter type and its -// tests. LineFormatter efficiently generates InfluxDB Line Protocol -// entries. -// -// The byte slices generated look something like this: -// measurement,tag1=foo,tag2=bar field1=123,field2="string" -package lineformatter - -import ( - "fmt" - "strconv" - "time" -) - -// New creates a new LineFormatter initialised for the given -// measurement, tag names and fields names. -func New(measurement string, tags []string, fields ...string) *LineFormatter { - f := &LineFormatter{ - prefix: []byte(measurement), - fields: make([][]byte, len(fields)), - tags: make([][]byte, len(tags)), - } - - // bufCap estimates a sensible initial capacity for the byte slice - // returned by Format(). - f.bufCap = len(measurement) - - // Precompute tag sections - for i, tag := range tags { - // TODO: escaping of tag names (not critical right now). - f.tags[i] = []byte("," + tag + "=") - f.bufCap += len(tags[i]) + 16 // tag name + approx tag size - } - - // Precompute label sections - for i, field := range fields { - // TODO: escaping of field names (not critical right now). - if i > 0 { - f.fields[i] = []byte("," + field + "=") - } else { - f.fields[i] = []byte(field + "=") - } - f.bufCap += len(fields[i]) + 16 // field name + approx value size - } - return f -} - -// LineFormatter generates InfluxDB Line Protocol lines. It is ~80% -// faster than using a `[]byte(fmt.Sprintf(...))` style approach. -type LineFormatter struct { - prefix []byte - bufCap int - tags [][]byte - fields [][]byte -} - -// Format efficiently generates a new InfluxDB line as per the -// measurement, tags & fields passed to New, using the tag and field -// values provided. -// -// The number of tag values given must be equal to or less than the -// number of tags passed to New. -// -// The number of field values given must be equal to or less than the -// number of fields passed to New. The following field types are -// supported: int, int64, string, bool. A panic will occur if other -// types are passed. Strings will be correctly quoted and escaped. -// -// Format is goroutine safe. -func (f *LineFormatter) Format(tagVals []string, vals ...interface{}) []byte { - buf := make([]byte, 0, f.bufCap) - buf = f.format(buf, tagVals, vals...) - return append(buf, '\n') -} - -var timestampLen = len(fmt.Sprint(time.Now().UnixNano())) + 1 // 1 extra for the space prefix - -// FormatT the same as Format but appends an appropriately formatted -// timestamp to the end of the line. -// -// FormatT is goroutine safe. -func (f *LineFormatter) FormatT(ts time.Time, tagVals []string, vals ...interface{}) []byte { - buf := make([]byte, 0, f.bufCap+timestampLen) - buf = f.format(buf, tagVals, vals...) - buf = append(buf, ' ') - buf = strconv.AppendInt(buf, ts.UnixNano(), 10) - return append(buf, '\n') -} - -func (f *LineFormatter) format(buf []byte, tagVals []string, vals ...interface{}) []byte { - buf = append(buf, f.prefix...) - - for i, tagVal := range tagVals { - buf = append(buf, f.tags[i]...) - // Note: this loop is significantly faster than - // `append(buf, []byte(tagVal))` - for i := range tagVal { - // TODO: escaping of tag values (not critical right now). - buf = append(buf, tagVal[i]) - } - } - - buf = append(buf, ' ') - - for i, val := range vals { - buf = append(buf, f.fields[i]...) - switch v := val.(type) { - case int: - buf = strconv.AppendInt(buf, int64(v), 10) - case int64: - buf = strconv.AppendInt(buf, v, 10) - case string: - buf = append(buf, '"') - for i := range v { - if v[i] == '"' { - buf = append(buf, '\\') - } - buf = append(buf, v[i]) - } - buf = append(buf, '"') - case bool: - if v { - buf = append(buf, 't') - } else { - buf = append(buf, 'f') - } - case float32: - buf = strconv.AppendFloat(buf, float64(v), 'f', -1, 32) - case float64: - buf = strconv.AppendFloat(buf, v, 'f', -1, 64) - case []byte: - buf = append(buf, v...) - default: - // Best effort - buf = append(buf, []byte(fmt.Sprintf("%v", val))...) - } - } - return buf -} diff --git a/lineformatter/lineformatter_small_test.go b/lineformatter/lineformatter_small_test.go deleted file mode 100644 index 606060a..0000000 --- a/lineformatter/lineformatter_small_test.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2017 Jump Trading -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build small - -package lineformatter_test - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/jumptrading/influx-spout/lineformatter" -) - -func TestIntField(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - assertFormat(t, f.Format(nil, 123), "measurement f=123") - assertFormat(t, f.Format(nil, 66), "measurement f=66") -} - -func TestInt64Field(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - assertFormat(t, f.Format(nil, int64(123)), "measurement f=123") - assertFormat(t, f.Format(nil, int64(66)), "measurement f=66") -} - -func TestFloat32Field(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - assertFormat(t, f.Format(nil, float32(1.23)), "measurement f=1.23") - assertFormat(t, f.Format(nil, float32(-654.321)), "measurement f=-654.321") -} - -func TestFloat64Field(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - assertFormat(t, f.Format(nil, float64(1.23)), "measurement f=1.23") - assertFormat(t, f.Format(nil, float64(-99999999999.321)), "measurement f=-99999999999.321") -} - -func TestByteSlice(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - // byte slices are just added raw without quoting - assertFormat(t, f.Format(nil, []byte("abc")), "measurement f=abc") -} - -func TestStringField(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - assertFormat(t, f.Format(nil, "foo"), `measurement f="foo"`) - assertFormat(t, f.Format(nil, ""), `measurement f=""`) - assertFormat(t, f.Format(nil, `foo bar`), `measurement f="foo bar"`) - assertFormat(t, f.Format(nil, `st"uff`), `measurement f="st\"uff"`) - assertFormat(t, f.Format(nil, `"stuff"`), `measurement f="\"stuff\""`) -} - -func TestBoolField(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - assertFormat(t, f.Format(nil, true), "measurement f=t") - assertFormat(t, f.Format(nil, false), "measurement f=f") -} - -func TestUnknownType(t *testing.T) { - f := lineformatter.New("measurement", nil, "f") - - assertFormat(t, f.Format(nil, uint16(42)), `measurement f=42`) - assertFormat(t, f.Format(nil, int32(-999)), `measurement f=-999`) - assertFormat(t, f.Format(nil, 'x'), `measurement f=120`) -} - -func TestMultipleFields(t *testing.T) { - f := lineformatter.New("status", nil, "foo", "bar", "thing") - - assertFormat(t, f.Format(nil, true, "medium to high", 99999), - `status foo=t,bar="medium to high",thing=99999`) - assertFormat(t, f.Format(nil, false, "low", -5), - `status foo=f,bar="low",thing=-5`) -} - -func TestTags(t *testing.T) { - f := lineformatter.New("measurement", []string{"abc", "x"}, "f") - - assertFormat(t, f.Format([]string{"foo", "bar"}, true), "measurement,abc=foo,x=bar f=t") -} - -func TestTagsAndMultipleFields(t *testing.T) { - f := lineformatter.New("m", []string{"x", "y"}, "a", "b") - - assertFormat(t, f.Format([]string{"o", "p"}, 1, 2), "m,x=o,y=p a=1,b=2") -} - -func TestFormatT(t *testing.T) { - ts := time.Date(2017, 6, 5, 4, 3, 2, 1, time.UTC) - f := lineformatter.New("status", nil, "foo", "bar", "thing") - - assertFormat(t, f.FormatT(ts, nil, true, "average", 100), - `status foo=t,bar="average",thing=100 1496635382000000001`) - assertFormat(t, f.FormatT(ts, nil, false, "subpar", 4), - `status foo=f,bar="subpar",thing=4 1496635382000000001`) -} - -func BenchmarkFormat(b *testing.B) { - f := lineformatter.New("foo", nil, "a", "b", "c") - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = f.Format(nil, 123, "foo", true) - } -} - -func BenchmarkFormatT(b *testing.B) { - f := lineformatter.New("foo", nil, "a", "b", "c") - ts := time.Date(2017, 6, 5, 4, 3, 2, 1, time.UTC) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = f.FormatT(ts, nil, 123, "foo", true) - } -} - -func BenchmarkFormatWithTags(b *testing.B) { - f := lineformatter.New("foo", []string{"t1", "t2"}, "a", "b", "c") - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = f.Format([]string{"x", "y"}, 123, "foo", true) - } -} - -func assertFormat(t *testing.T, actual []byte, expected string) { - assert.Equal(t, expected+"\n", string(actual)) -} From f5e652b7c9038a31f061b91bdc2eab112c25b0e1 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 15:53:07 +1200 Subject: [PATCH 12/16] stats: Remove the now unused Stats.Clone method This has been superseded by the Snapshot method. --- stats/stats.go | 16 +--------------- stats/stats_small_test.go | 17 ----------------- 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/stats/stats.go b/stats/stats.go index 853cde9..47d22a9 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -67,6 +67,7 @@ type CounterPair struct { Value int } +// Snapshot holds the names and values of some counters. type Snapshot []CounterPair // Snapshot returns the current values of all the counters. @@ -80,18 +81,3 @@ func (s *Stats) Snapshot() Snapshot { } return out } - -// Clone returns a new Stats instance, copying the source Stats -// counts. -func (s *Stats) Clone() *Stats { - s.mu.Lock() - defer s.mu.Unlock() - - out := &Stats{ - counts: make(map[string]int), - } - for name, count := range s.counts { - out.counts[name] = count - } - return out -} diff --git a/stats/stats_small_test.go b/stats/stats_small_test.go index 1a0f641..32c71d3 100644 --- a/stats/stats_small_test.go +++ b/stats/stats_small_test.go @@ -62,23 +62,6 @@ func TestSnapshot(t *testing.T) { }, s.Snapshot()) } -func TestClone(t *testing.T) { - s := stats.New("foo", "bar", "qaz") - s.Inc("foo") - s.Inc("bar") - s.Inc("bar") - - s2 := s.Clone() - assert.Equal(t, 1, s2.Get("foo")) - assert.Equal(t, 2, s2.Get("bar")) - assert.Equal(t, 0, s2.Get("qaz")) - - // Make sure they're independent - s2.Inc("foo") - assert.Equal(t, 2, s2.Get("foo")) - assert.Equal(t, 1, s.Get("foo")) -} - func BenchmarkStats(b *testing.B) { s := stats.New("foo", "bar") From 380fbfd483e6111f39658e3027eacebdfa3377cf Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 16:45:23 +1200 Subject: [PATCH 13/16] config: Set a default port for the monitor. Port 9331 has been allocated for use by influx-spout. See https://github.com/prometheus/prometheus/wiki/Default-port-allocations --- config/config.go | 15 +++++++++++---- config/config_small_test.go | 6 ++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index fa6c49c..78aae41 100644 --- a/config/config.go +++ b/config/config.go @@ -101,11 +101,18 @@ func NewConfigFromFile(fileName string) (*Config, error) { if conf.Name == "" { conf.Name = pathToConfigName(fileName) } - if conf.Mode == "listener" && conf.Port == 0 { - conf.Port = 10001 - } else if conf.Mode == "listener_http" && conf.Port == 0 { - conf.Port = 13337 + + if conf.Port == 0 { + switch conf.Mode { + case "listener": + conf.Port = 10001 + case "listener_http": + conf.Port = 13337 + case "monitor": + conf.Port = 9331 + } } + return conf, nil } diff --git a/config/config_small_test.go b/config/config_small_test.go index 94f7ab3..5007d7e 100644 --- a/config/config_small_test.go +++ b/config/config_small_test.go @@ -120,6 +120,12 @@ func TestDefaultPortHTTPListener(t *testing.T) { assert.Equal(t, 13337, conf.Port) } +func TestDefaultPortMonitor(t *testing.T) { + conf, err := parseConfig(`mode = "monitor"`) + require.NoError(t, err) + assert.Equal(t, 9331, conf.Port) +} + func TestNoMode(t *testing.T) { _, err := parseConfig("") assert.EqualError(t, err, "mode not specified in config") From bfac4dd1e5fd6a96b247c84fae6e2d4db2adf321 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 16:49:11 +1200 Subject: [PATCH 14/16] Document the monitor component --- README.md | 93 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 0cfc3af..49d2ad6 100644 --- a/README.md +++ b/README.md @@ -2,22 +2,23 @@ ## Overview -influx-spout is an open source messaging system that routes and processes -[InfluxDB line protocol] metrics from agents (for example [Telegraf]) to -processing and storage backends ([InfluxDB], [Kapacitor] etc.). +influx-spout is an open source messaging system that routes and processes +[InfluxDB line protocol] metrics from agents (for example [Telegraf]) to +processing and storage backends ([InfluxDB], [Kapacitor] etc.). Key features: -- Proven ability to handle high volumes of data (>500k points per second) + +- Proven ability to handle high volumes of data (>500k points per second) in a production environment - Horizontal scalability - - Multithreaded data processing within components - - Components can be distributed on multiple servers or a fleet of containers + * Multithreaded data processing within components + * Components can be distributed on multiple servers or a fleet of containers - Ability to add and remove endpoints without disrupting existing data flows -- Fine-grained, regexp-based control over routing of metrics data to specific +- Fine-grained, regexp-based control over routing of metrics data to specific backends -- Sanity checking of the data stream that prevents corrupt metrics data from +- Sanity checking of the data stream that prevents corrupt metrics data from reaching backends -- Batching of outgoing data to larger chunks, making it easier for backends +- Batching of outgoing data to larger chunks, making it easier for backends to handle high-volume dataflows - Leverages the high-performance [NATS] messaging system @@ -43,23 +44,23 @@ flow of data between the various components: +-----------------+ +-----------------+ | | v v - +----------+ +-------------------------------------------------+ - | |<--+ | - | Filter | | NATS | - | +-->| | - +----------+ +--------+----------------+----------------+------+ - | | | - v v v - +----------+ +----------+ +----------+ - | | | | | | - | Writer | | Writer | | Writer | - | | | | | | - +-----+----+ +-----+----+ +-----+----+ - | | | - v v v - +----------+ +----------+ +----------+ - | InfluxDB | | InfluxDB | | InfluxDB | - +----------+ +----------+ +----------+ + +----------+ +-------------------------------------------------+ +---------+ + | |<--+ | | | + | Filter | | NATS +-->| Monitor | + | +-->| | | | + +----------+ +--------+----------------+----------------+------+ +----+----+ + | | | | + v v v | + +----------+ +----------+ +----------+ | + | | | | | | | + | Writer | | Writer | | Writer | | + | | | | | | | + +-----+----+ +-----+----+ +-----+----+ | + | | | | + v v v V + +----------+ +----------+ +----------+ +----------------+ + | InfluxDB | | InfluxDB | | InfluxDB | | Metrics (HTTP) | + +----------+ +----------+ +----------+ +----------------+ ``` All the influx-spout components may be run on a single host or may be @@ -131,8 +132,8 @@ batch = 10 # support higher receive rates. read_buffer_bytes = 4194304 -# Out-of-bound metrics and diagnostic messages are published to this NATS subject -# (in InfluxDB line protocol format). +# The listener will publish its own metrics to this NATS subject (for consumption +# by the monitor). nats_subject_monitor = "influx-spout-monitor" ``` @@ -175,8 +176,8 @@ read_buffer_bytes = 4194304 # defaults to 1 MB). listener_batch_bytes = 1048576 -# Out-of-bound metrics and diagnostic messages are published to this NATS subject -# (in InfluxDB line protocol format). +# The HTTP listener will publish its own metrics to this NATS subject (for +# consumption by the monitor). nats_subject_monitor = "influx-spout-monitor" ``` @@ -201,8 +202,8 @@ nats_subject = ["influx-spout"] # Measurements which do not match any rule (below) are sent to this NATS subject. nats_subject_junkyard = "influx-spout-junk" -# Out-of-bound metrics and diagnostic messages are published to this NATS subject -# (in InfluxDB line protocol format). +# The filter will publish its own metrics to this NATS subject (for consumption +# by the monitor). nats_subject_monitor = "influx-spout-monitor" # The number of filter workers to spawn. @@ -313,8 +314,8 @@ write_timeout_secs = 30 # this limit is reached. This helps to deal with slow InfluxDB instances. nats_pending_max_mb = 200 -# Out-of-bound metrics and diagnostic messages are published to this NATS subject -# (in InfluxDB line protocol format). +# The writer will publish its own metrics to this NATS subject (for consumption +# by the monitor). nats_subject_monitor = "influx-spout-monitor" ``` @@ -326,6 +327,30 @@ measurements which don't match a rule will be dropped by the writer instead of being written to InfluxDB. Rule configuration is the same as for the filter component, but the rule subject should be omitted. +### Monitor + +The monitor is responsible for collecting metrics from the other +influx-spout components and serving then over HTTP in [Prometheus data +exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). Metrics +are available at `/metrics` on the configured port. + +The supported configuration options for the writer mode follow. Defaults are +shown. + +```toml +mode = "monitor" # Required + +# Address of NATS server. +nats_address = "nats://localhost:4222" + +# The NATS subject used by influx-spout components to report metrics to the monitor. +# The monitor will consume and aggregate metrics sent to this subject. +nats_subject_monitor = "influx-spout-monitor" + +# The TCP port where the monitor will serve Prometheus formatted metrics over HTTP. +port = 9331 +``` + ## Running tests influx-spout's tests are classified as either "small", "medium" or "large", From 8f7c80ec4297086324599ab69736e3f632f4d77c Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 10 Apr 2018 17:58:39 +1200 Subject: [PATCH 15/16] spouttest: Include the monitor in the end-to-end test --- spouttest/asserts.go | 16 +++++++++++ spouttest/e2e_test.go | 62 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/spouttest/asserts.go b/spouttest/asserts.go index 91667a9..d2ef4dc 100644 --- a/spouttest/asserts.go +++ b/spouttest/asserts.go @@ -68,7 +68,23 @@ func AssertMonitor(t *testing.T, ch chan string, expected []string) { } } +// StripTimestamps takes a string containing one or more metrics +// lines, validates that each line appears to end with a timestamp and +// then strips the timestamp off. The returned string is the same as +// the input but without the timestamps (for easier test comparisons). +func StripTimestamps(t *testing.T, s string) string { + var out []string + for _, line := range strings.Split(s, "\n") { + out = append(out, stripTimestamp(t, line)) + } + return strings.Join(out, "\n") +} + func stripTimestamp(t *testing.T, s string) string { + if len(s) < 1 { + return "" + } + i := strings.LastIndexByte(s, ' ') require.True(t, i >= 0) diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go index deb5ad6..7aab94f 100644 --- a/spouttest/e2e_test.go +++ b/spouttest/e2e_test.go @@ -19,6 +19,7 @@ package spouttest_test import ( "bytes" "fmt" + "io/ioutil" "net" "net/http" "strconv" @@ -40,6 +41,7 @@ const ( influxdPort = 44501 listenerPort = 44502 httpListenerPort = 44503 + monitorPort = 44504 influxDBName = "test" sendCount = 10 ) @@ -70,9 +72,13 @@ func TestEndToEnd(t *testing.T) { writer := startWriter(t, fs) defer writer.Stop() - // Make sure the listeners are actually listening. - assertListenerReady(t, listener) - assertListenerReady(t, httpListener) + monitor := startMonitor(t, fs) + defer monitor.Stop() + + // Make sure the listeners & monitor are actually listening. + assertReady(t, listener) + assertReady(t, httpListener) + assertReady(t, monitor) // Connect to the listener. addr := net.JoinHostPort("localhost", strconv.Itoa(listenerPort)) @@ -120,17 +126,48 @@ func TestEndToEnd(t *testing.T) { } time.Sleep(250 * time.Millisecond) } + + // Check metrics published by monitor component. + expectedMetrics := ` +failed_writes{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 0 +invalid_time{filter="filter"} 0 +passed{filter="filter"} 10 +processed{filter="filter"} 20 +read_errors{listener="listener"} 0 +received{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2 +received{listener="listener"} 5 +rejected{filter="filter"} 10 +sent{listener="listener"} 1 +triggered{filter="filter",rule="system"} 10 +write_requests{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2 +`[1:] + var lines string + for try := 0; try < 20; try++ { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", monitorPort)) + require.NoError(t, err) + + raw, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + lines = spouttest.StripTimestamps(t, string(raw)) + if lines == expectedMetrics { + return + } + time.Sleep(500 * time.Millisecond) + } + + t.Fatalf("Failed to see expected metrics. Last saw:\n%s", lines) } type HasReady interface { Ready() <-chan struct{} } -func assertListenerReady(t *testing.T, listener interface{}) { +func assertReady(t *testing.T, component interface{}) { select { - case <-listener.(HasReady).Ready(): + case <-component.(HasReady).Ready(): case <-time.After(spouttest.LongWait): - t.Fatal("timeout out waiting for listener to be ready") + t.Fatal("timeout out waiting for component to be ready") } } @@ -156,6 +193,7 @@ port = %d nats_address = "nats://localhost:%d" batch = 5 debug = true +nats_subject_monitor = "monitor" `, listenerPort, natsPort)) } @@ -166,6 +204,7 @@ port = %d nats_address = "nats://localhost:%d" batch = 5 debug = true +nats_subject_monitor = "monitor" `, httpListenerPort, natsPort)) } @@ -174,6 +213,7 @@ func startFilter(t *testing.T, fs afero.Fs) cmd.Stoppable { mode = "filter" nats_address = "nats://localhost:%d" debug = true +nats_subject_monitor = "monitor" [[rule]] type = "basic" @@ -192,9 +232,19 @@ influxdb_dbname = "%s" batch = 1 workers = 4 debug = true +nats_subject_monitor = "monitor" `, natsPort, influxdPort, influxDBName)) } +func startMonitor(t *testing.T, fs afero.Fs) cmd.Stoppable { + return startComponent(t, fs, "monitor", fmt.Sprintf(` +mode = "monitor" +nats_address = "nats://localhost:%d" +nats_subject_monitor = "monitor" +port = %d +`, natsPort, monitorPort)) +} + func startComponent(t *testing.T, fs afero.Fs, name, config string) cmd.Stoppable { configFilename := name + ".toml" err := afero.WriteFile(fs, configFilename, []byte(config), 0600) From ee7defcf3039d709f9671bb3755de8ad6e6e14d3 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 17 Apr 2018 15:56:21 +1200 Subject: [PATCH 16/16] Separate component type and name in Prometheus metrics This simplifies querying of influx-spout metrics. --- filter/filter.go | 8 +++++--- filter/filter_medium_test.go | 20 ++++++++++---------- listener/listener.go | 3 ++- listener/listener_medium_test.go | 6 +++--- spouttest/e2e_test.go | 22 +++++++++++----------- writer/writer.go | 3 ++- writer/writer_medium_test.go | 3 ++- 7 files changed, 35 insertions(+), 30 deletions(-) diff --git a/filter/filter.go b/filter/filter.go index b55adb4..bcab1e0 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -159,7 +159,8 @@ func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) { defer f.wg.Done() generalLabels := map[string]string{ - "filter": f.c.Name, + "component": "filter", + "name": f.c.Name, } for { @@ -177,8 +178,9 @@ func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) { ruleCounts[i], now, map[string]string{ - "filter": f.c.Name, - "rule": subject, + "component": "filter", + "name": f.c.Name, + "rule": subject, }, )) } diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index c4487db..debe8da 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -105,11 +105,11 @@ goodbye,host=gopher01 // Receive monitor metrics spouttest.AssertMonitor(t, monitorCh, []string{ - `passed{filter="particle"} 2`, - `processed{filter="particle"} 3`, - `rejected{filter="particle"} 1`, - `invalid_time{filter="particle"} 0`, - `triggered{filter="particle",rule="hello-subject"} 2`, + `passed{component="filter",name="particle"} 2`, + `processed{component="filter",name="particle"} 3`, + `rejected{component="filter",name="particle"} 1`, + `invalid_time{component="filter",name="particle"} 0`, + `triggered{component="filter",name="particle",rule="hello-subject"} 2`, }) } @@ -162,10 +162,10 @@ func TestInvalidTimeStamps(t *testing.T) { // Receive monitor metrics. spouttest.AssertMonitor(t, monitorCh, []string{ - `passed{filter="particle"} 2`, - `processed{filter="particle"} 4`, - `rejected{filter="particle"} 0`, - `invalid_time{filter="particle"} 2`, - `triggered{filter="particle",rule="hello-subject"} 2`, + `passed{component="filter",name="particle"} 2`, + `processed{component="filter",name="particle"} 4`, + `rejected{component="filter",name="particle"} 0`, + `invalid_time{component="filter",name="particle"} 2`, + `triggered{component="filter",name="particle",rule="hello-subject"} 2`, }) } diff --git a/listener/listener.go b/listener/listener.go index 903bb85..00a9009 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -278,7 +278,8 @@ func (l *Listener) startStatistician() { defer l.wg.Done() labels := map[string]string{ - "listener": l.c.Name, + "component": "listener", + "name": l.c.Name, } for { lines := stats.SnapshotToPrometheus(l.stats.Snapshot(), time.Now(), labels) diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index db22412..b7f0817 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -296,9 +296,9 @@ func assertNoMore(t *testing.T, ch chan string) { func assertMonitor(t *testing.T, monitorCh chan string, received, sent int) { expected := []string{ - fmt.Sprintf(`received{listener="testlistener"} %d`, received), - fmt.Sprintf(`sent{listener="testlistener"} %d`, sent), - `read_errors{listener="testlistener"} 0`, + fmt.Sprintf(`received{component="listener",name="testlistener"} %d`, received), + fmt.Sprintf(`sent{component="listener",name="testlistener"} %d`, sent), + `read_errors{component="listener",name="testlistener"} 0`, } spouttest.AssertMonitor(t, monitorCh, expected) } diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go index 7aab94f..cd88007 100644 --- a/spouttest/e2e_test.go +++ b/spouttest/e2e_test.go @@ -129,17 +129,17 @@ func TestEndToEnd(t *testing.T) { // Check metrics published by monitor component. expectedMetrics := ` -failed_writes{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 0 -invalid_time{filter="filter"} 0 -passed{filter="filter"} 10 -processed{filter="filter"} 20 -read_errors{listener="listener"} 0 -received{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2 -received{listener="listener"} 5 -rejected{filter="filter"} 10 -sent{listener="listener"} 1 -triggered{filter="filter",rule="system"} 10 -write_requests{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2 +failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 0 +invalid_time{component="filter",name="filter"} 0 +passed{component="filter",name="filter"} 10 +processed{component="filter",name="filter"} 20 +read_errors{component="listener",name="listener"} 0 +received{component="listener",name="listener"} 5 +received{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 2 +rejected{component="filter",name="filter"} 10 +sent{component="listener",name="listener"} 1 +triggered{component="filter",name="filter",rule="system"} 10 +write_requests{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 2 `[1:] var lines string for try := 0; try < 20; try++ { diff --git a/writer/writer.go b/writer/writer.go index aedf883..4a8ac1f 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -301,7 +301,8 @@ func (w *Writer) startStatistician() { func (w *Writer) metricsLabels() map[string]string { return map[string]string{ - "writer": w.c.Name, + "component": "writer", + "name": w.c.Name, "influxdb_address": w.c.InfluxDBAddress, "influxdb_port": strconv.Itoa(w.c.InfluxDBPort), "influxdb_dbname": w.c.DBName, diff --git a/writer/writer_medium_test.go b/writer/writer_medium_test.go index 4085a6c..ee242a4 100644 --- a/writer/writer_medium_test.go +++ b/writer/writer_medium_test.go @@ -96,10 +96,11 @@ func TestBasicWriter(t *testing.T) { // Check the monitor output. labels := "{" + strings.Join([]string{ + `component="writer"`, `influxdb_address="localhost"`, `influxdb_dbname="metrics"`, fmt.Sprintf(`influxdb_port="%d"`, influxPort), - `writer="foo"`, + `name="foo"`, }, ",") + "}" spouttest.AssertMonitor(t, monitorCh, []string{ `received` + labels + ` 5`,