Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

writer: Clean up tests #45

Merged
merged 1 commit into from
Mar 13, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 143 additions & 70 deletions writer/writer_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -34,11 +34,8 @@ import (
"github.com/jumptrading/influx-spout/spouttest"
)

var httpWrites = make(chan string, 10)
var nc *nats.Conn

const natsPort = 44443
const influxPort = 44445
const natsPort = 44443

var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort)

Expand All @@ -61,40 +58,13 @@ func testConfig() *config.Config {
}
}

func TestMain(m *testing.M) {
os.Exit(runMain(m))
}

func runMain(m *testing.M) int {
var err error

// Start gnatsd.
s := spouttest.RunGnatsd(natsPort)
defer s.Shutdown()

// Set up a dummy HTTP server to write to.
http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("Body read: %v\n", err)
}
httpWrites <- string(body)
w.WriteHeader(http.StatusNoContent)
})
go http.ListenAndServe(fmt.Sprintf(":%d", influxPort), nil)

// connect to the NATS instance
nc, err = nats.Connect(natsAddress)
if err != nil {
fmt.Printf("Error while setup: %v\n", err)
return 1
}
defer nc.Close()
func TestBasicWriter(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

return m.Run()
}
influxd := runTestInfluxd()
defer influxd.Stop()

func TestBasicWriter(t *testing.T) {
// No filter rules.
conf := testConfig()
w := startWriter(t, conf)
Expand All @@ -109,17 +79,17 @@ func TestBasicWriter(t *testing.T) {

// Publish 5 messages to the bus.
subject := conf.NATSSubject[0]
publish(t, subject, "To be, or not to be: that is the question:")
publish(t, subject, "Whether ’tis nobler in the mind to suffer")
publish(t, subject, "The slings and arrows of outrageous fortune,")
publish(t, subject, "Or to take arms against a sea of troubles,")
publish(t, subject, "And by opposing end them. To die: to sleep;")
publish(t, nc, subject, "To be, or not to be: that is the question:")
publish(t, nc, subject, "Whether ’tis nobler in the mind to suffer")
publish(t, nc, subject, "The slings and arrows of outrageous fortune,")
publish(t, nc, subject, "Or to take arms against a sea of troubles,")
publish(t, nc, subject, "And by opposing end them. To die: to sleep;")

// Wait for confirmation that they were written.
timeout := time.After(spouttest.LongWait)
for i := 0; i < 5; i++ {
select {
case <-httpWrites:
case <-influxd.Writes:
case <-timeout:
t.Fatal("timed out waiting for messages")
}
Expand All @@ -137,6 +107,12 @@ func TestBasicWriter(t *testing.T) {
}

func TestBatchMBLimit(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

// No filter rules.
conf := testConfig()
conf.Workers = 1
Expand All @@ -153,21 +129,27 @@ func TestBatchMBLimit(t *testing.T) {
large[i] = byte('x')
}
for i := 0; i < chunks; i++ {
publish(t, conf.NATSSubject[0], string(large))
publish(t, nc, conf.NATSSubject[0], string(large))
}

// the messages should come through (in one batch) because
// BatchMaxMB is exceed
select {
case msg := <-httpWrites:
case msg := <-influxd.Writes:
assert.Len(t, msg, totalSize)
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for messages")
}
assertNoWrite(t)
influxd.AssertNoWrite(t)
}

func TestBatchTimeLimit(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

// No filter rules.
conf := testConfig()
conf.Workers = 1
Expand All @@ -178,13 +160,19 @@ func TestBatchTimeLimit(t *testing.T) {

// Send one small message. It should still come through because of
// BatchMaxSecs.
publish(t, conf.NATSSubject[0], "foo")
publish(t, nc, conf.NATSSubject[0], "foo")

assertWrite(t, "foo")
assertNoWrite(t)
influxd.AssertWrite(t, "foo")
influxd.AssertNoWrite(t)
}

func TestBasicFilterRule(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "basic",
Expand All @@ -194,14 +182,20 @@ func TestBasicFilterRule(t *testing.T) {
defer w.Stop()

// Send 2 messages, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped")
publish(t, conf.NATSSubject[0], "foo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped")
publish(t, nc, conf.NATSSubject[0], "foo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func TestBatchedInput(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "basic",
Expand All @@ -211,13 +205,19 @@ func TestBatchedInput(t *testing.T) {
defer w.Stop()

// Send 2 messages together, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped\nfoo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped\nfoo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func TestRegexFilterRule(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "regex",
Expand All @@ -227,14 +227,20 @@ func TestRegexFilterRule(t *testing.T) {
defer w.Stop()

// Send 2 messages, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped")
publish(t, conf.NATSSubject[0], "foo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped")
publish(t, nc, conf.NATSSubject[0], "foo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func TestNegativeRegexFilterRule(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "negregex",
Expand All @@ -244,14 +250,20 @@ func TestNegativeRegexFilterRule(t *testing.T) {
defer w.Stop()

// Send 2 messages, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped")
publish(t, conf.NATSSubject[0], "foo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped")
publish(t, nc, conf.NATSSubject[0], "foo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func BenchmarkWriterLatency(b *testing.B) {
nc, closeNATS := runGnatsd(b)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
w := startWriter(b, conf)
defer w.Stop()
Expand All @@ -261,7 +273,26 @@ func BenchmarkWriterLatency(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
nc.Publish(conf.NATSSubject[0], byteArr)
<-httpWrites
<-influxd.Writes
}
}

type FatalTestingT interface {
Fatalf(string, ...interface{})
}

func runGnatsd(t FatalTestingT) (*nats.Conn, func()) {
gnatsd := spouttest.RunGnatsd(natsPort)

nc, err := nats.Connect(natsAddress)
if err != nil {
gnatsd.Shutdown()
t.Fatalf("NATS connect failed: %v", err)
}

return nc, func() {
nc.Close()
gnatsd.Shutdown()
}
}

Expand All @@ -271,24 +302,66 @@ func startWriter(t require.TestingT, conf *config.Config) *Writer {
return w
}

func publish(t require.TestingT, subject, msg string) {
func publish(t require.TestingT, nc *nats.Conn, subject, msg string) {
err := nc.Publish(subject, []byte(msg))
require.NoError(t, err)
}

func assertWrite(t *testing.T, expected string) {
type testInfluxd struct {
server *http.Server
wg sync.WaitGroup
Writes chan string
}

func runTestInfluxd() *testInfluxd {
s := &testInfluxd{
server: &http.Server{
Addr: fmt.Sprintf(":%d", influxPort),
},
Writes: make(chan string, 99),
}
s.wg.Add(1)
go s.run()
return s
}

func (s *testInfluxd) Stop() {
s.server.Close()
s.wg.Wait()
}

func (s *testInfluxd) AssertWrite(t *testing.T, expected string) {
select {
case msg := <-httpWrites:
case msg := <-s.Writes:
assert.Equal(t, msg, expected)
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for message")
}
}

func assertNoWrite(t *testing.T) {
func (s *testInfluxd) AssertNoWrite(t *testing.T) {
select {
case msg := <-httpWrites:
case msg := <-s.Writes:
t.Fatalf("saw unexpected write: %q", msg)
case <-time.After(spouttest.ShortWait):
}
}

func (s *testInfluxd) run() {
defer s.wg.Done()

mux := http.NewServeMux()
mux.HandleFunc("/write", s.handleWrite)
s.server.Handler = mux
s.server.ListenAndServe()
}

func (s *testInfluxd) handleWrite(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(fmt.Sprintf("Body read: %s", err))
}
w.WriteHeader(http.StatusNoContent)

s.Writes <- string(body)
}