Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #39 from mjs/writer-tags
Browse files Browse the repository at this point in the history
writer: Include InfluxDB details in metrics tags
  • Loading branch information
mjs authored Feb 27, 2018
2 parents 938fbe4 + 8197215 commit 708e3d0
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 45 deletions.
38 changes: 4 additions & 34 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package filter
import (
"fmt"
"testing"
"time"

"github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jumptrading/influx-spout/config"
Expand Down Expand Up @@ -88,51 +86,23 @@ hello,host=gopher01
require.NoError(t, err)

// Receive filter output
assertReceived(t, helloCh, "data", `
spouttest.AssertRecv(t, helloCh, "data", `
hello,host=gopher01
hello,host=gopher01
`)

// Receive junkyard output
assertReceived(t, junkCh, "junkyard data", `
spouttest.AssertRecv(t, junkCh, "junkyard data", `
goodbye,host=gopher01
`)

// Receive total stats
assertReceivedMulti(t, statsCh, "stats", `
spouttest.AssertRecvMulti(t, statsCh, "stats", `
spout_stat_filter,filter=particle passed=2,processed=3,rejected=1
`)

// Receive rule specific stats
assertReceivedMulti(t, statsCh, "rule stats", `
spouttest.AssertRecvMulti(t, statsCh, "rule stats", `
spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2
`)
}

func assertReceived(t *testing.T, ch <-chan string, label, expected string) {
expected = expected[1:]

select {
case received := <-ch:
assert.Equal(t, expected, received)
case <-time.After(spouttest.LongWait):
t.Fatalf("timed out waiting for %s", label)
}
}

func assertReceivedMulti(t *testing.T, ch <-chan string, label, expected string) {
expected = expected[1:]

var received string
timeout := time.After(spouttest.LongWait)
for {
select {
case received = <-ch:
if expected == received {
return
}
case <-timeout:
t.Fatalf("timed out waiting for %s. last received: %q", label, received)
}
}
}
51 changes: 51 additions & 0 deletions spouttest/asserts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package spouttest

import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// AssertRecv checks that a specific string has been received from a
// channel. The check times out after LongWait.
func AssertRecv(t *testing.T, ch <-chan string, label, expected string) {
expected = stripLeadingNL(expected)

select {
case received := <-ch:
assert.Equal(t, expected, received)
case <-time.After(LongWait):
t.Fatalf("timed out waiting for %s", label)
}
}

// AssertRecvMulti checks that a specific string has been received
// from a channel. The channel will be read multiple times if
// required. The check times out after LongWait.
func AssertRecvMulti(t *testing.T, ch <-chan string, label, expected string) {
expected = stripLeadingNL(expected)

var received string
timeout := time.After(LongWait)
for {
select {
case received = <-ch:
if expected == received {
return
}
case <-timeout:
t.Fatalf("timed out waiting for %s. last received: %q", label, received)
}
}
}

func stripLeadingNL(s string) string {
// This allows long `expected` strings to be formatted nicely in
// the caller.
if strings.HasPrefix(s, "\n") {
return s[1:]
}
return s
}
22 changes: 14 additions & 8 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ import (
"io/ioutil"
"log"
"os"
"strconv"
"sync"
"time"

// for profiling a nasty memleak
"net/http"
_ "net/http/pprof"

// This would be nice, but it's too unstable for now
// revisit eventually
//"github.com/valyala/fasthttp"
_ "net/http/pprof" // for profiling a nasty memleak

"github.com/nats-io/go-nats"

Expand Down Expand Up @@ -299,12 +295,22 @@ func (w *Writer) startStatistician() {
// sending it to the monitoring backend.
statsLine := lineformatter.New(
"spout_stat_writer",
[]string{"writer"}, // tag keys
[]string{ // tag keys
"writer",
"influxdb_address",
"influxdb_port",
"influxdb_dbname",
},
"received",
"write_requests",
"failed_writes",
)
tagVals := []string{w.c.Name}
tagVals := []string{
w.c.Name,
w.c.InfluxDBAddress,
strconv.Itoa(w.c.InfluxDBPort),
w.c.DBName,
}
for {
stats := w.stats.Clone()
w.nc.Publish(w.c.NATSSubjectMonitor, statsLine.Format(
Expand Down
27 changes: 24 additions & 3 deletions writer/writer_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -42,16 +44,18 @@ var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort)

func testConfig() *config.Config {
return &config.Config{
Mode: "writer",
Name: "foo",
NATSAddress: natsAddress,
NATSSubject: []string{"writer-test"},
NATSSubjectMonitor: "writer-test-monitor",
InfluxDBAddress: "localhost",
InfluxDBPort: influxPort,
DBName: "metrics",
BatchMessages: 1,
BatchMaxMB: 10,
BatchMaxSecs: 300,
Port: influxPort,
Mode: "writer",
Workers: 96,
NATSPendingMaxMB: 32,
}
Expand Down Expand Up @@ -96,15 +100,22 @@ func TestBasicWriter(t *testing.T) {
w := startWriter(t, conf)
defer w.Stop()

// publish 5 messages to the bus
// Subscribe to stats output.
statsCh := make(chan string, 10)
_, err := nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) {
statsCh <- string(msg.Data)
})
require.NoError(t, err)

// Publish 5 messages to the bus.
subject := conf.NATSSubject[0]
publish(t, subject, "To be, or not to be: that is the question:")
publish(t, subject, "Whether ’tis nobler in the mind to suffer")
publish(t, subject, "The slings and arrows of outrageous fortune,")
publish(t, subject, "Or to take arms against a sea of troubles,")
publish(t, subject, "And by opposing end them. To die: to sleep;")

// wait for confirmation that they were written
// Wait for confirmation that they were written.
timeout := time.After(spouttest.LongWait)
for i := 0; i < 5; i++ {
select {
Expand All @@ -113,6 +124,16 @@ func TestBasicWriter(t *testing.T) {
t.Fatal("timed out waiting for messages")
}
}

// Check the stats output.
spouttest.AssertRecvMulti(t, statsCh, "stats",
strings.Join([]string{
"spout_stat_writer",
"writer=foo",
"influxdb_address=localhost",
"influxdb_port=" + strconv.Itoa(influxPort),
"influxdb_dbname=metrics",
}, ",")+" received=5,write_requests=5,failed_writes=0\n")
}

func TestBatchMBLimit(t *testing.T) {
Expand Down

0 comments on commit 708e3d0

Please sign in to comment.