Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #45 from mjs/cleanup-writer-tests
Browse files Browse the repository at this point in the history
writer: Clean up tests
  • Loading branch information
oplehto authored Mar 13, 2018
2 parents cbc51ed + 2aafa8e commit 24f198f
Showing 1 changed file with 143 additions and 70 deletions.
213 changes: 143 additions & 70 deletions writer/writer_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -34,11 +34,8 @@ import (
"github.com/jumptrading/influx-spout/spouttest"
)

var httpWrites = make(chan string, 10)
var nc *nats.Conn

const natsPort = 44443
const influxPort = 44445
const natsPort = 44443

var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort)

Expand All @@ -61,40 +58,13 @@ func testConfig() *config.Config {
}
}

func TestMain(m *testing.M) {
os.Exit(runMain(m))
}

func runMain(m *testing.M) int {
var err error

// Start gnatsd.
s := spouttest.RunGnatsd(natsPort)
defer s.Shutdown()

// Set up a dummy HTTP server to write to.
http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("Body read: %v\n", err)
}
httpWrites <- string(body)
w.WriteHeader(http.StatusNoContent)
})
go http.ListenAndServe(fmt.Sprintf(":%d", influxPort), nil)

// connect to the NATS instance
nc, err = nats.Connect(natsAddress)
if err != nil {
fmt.Printf("Error while setup: %v\n", err)
return 1
}
defer nc.Close()
func TestBasicWriter(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

return m.Run()
}
influxd := runTestInfluxd()
defer influxd.Stop()

func TestBasicWriter(t *testing.T) {
// No filter rules.
conf := testConfig()
w := startWriter(t, conf)
Expand All @@ -109,17 +79,17 @@ func TestBasicWriter(t *testing.T) {

// Publish 5 messages to the bus.
subject := conf.NATSSubject[0]
publish(t, subject, "To be, or not to be: that is the question:")
publish(t, subject, "Whether ’tis nobler in the mind to suffer")
publish(t, subject, "The slings and arrows of outrageous fortune,")
publish(t, subject, "Or to take arms against a sea of troubles,")
publish(t, subject, "And by opposing end them. To die: to sleep;")
publish(t, nc, subject, "To be, or not to be: that is the question:")
publish(t, nc, subject, "Whether ’tis nobler in the mind to suffer")
publish(t, nc, subject, "The slings and arrows of outrageous fortune,")
publish(t, nc, subject, "Or to take arms against a sea of troubles,")
publish(t, nc, subject, "And by opposing end them. To die: to sleep;")

// Wait for confirmation that they were written.
timeout := time.After(spouttest.LongWait)
for i := 0; i < 5; i++ {
select {
case <-httpWrites:
case <-influxd.Writes:
case <-timeout:
t.Fatal("timed out waiting for messages")
}
Expand All @@ -137,6 +107,12 @@ func TestBasicWriter(t *testing.T) {
}

func TestBatchMBLimit(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

// No filter rules.
conf := testConfig()
conf.Workers = 1
Expand All @@ -153,21 +129,27 @@ func TestBatchMBLimit(t *testing.T) {
large[i] = byte('x')
}
for i := 0; i < chunks; i++ {
publish(t, conf.NATSSubject[0], string(large))
publish(t, nc, conf.NATSSubject[0], string(large))
}

// the messages should come through (in one batch) because
// BatchMaxMB is exceed
select {
case msg := <-httpWrites:
case msg := <-influxd.Writes:
assert.Len(t, msg, totalSize)
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for messages")
}
assertNoWrite(t)
influxd.AssertNoWrite(t)
}

func TestBatchTimeLimit(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

// No filter rules.
conf := testConfig()
conf.Workers = 1
Expand All @@ -178,13 +160,19 @@ func TestBatchTimeLimit(t *testing.T) {

// Send one small message. It should still come through because of
// BatchMaxSecs.
publish(t, conf.NATSSubject[0], "foo")
publish(t, nc, conf.NATSSubject[0], "foo")

assertWrite(t, "foo")
assertNoWrite(t)
influxd.AssertWrite(t, "foo")
influxd.AssertNoWrite(t)
}

func TestBasicFilterRule(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "basic",
Expand All @@ -194,14 +182,20 @@ func TestBasicFilterRule(t *testing.T) {
defer w.Stop()

// Send 2 messages, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped")
publish(t, conf.NATSSubject[0], "foo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped")
publish(t, nc, conf.NATSSubject[0], "foo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func TestBatchedInput(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "basic",
Expand All @@ -211,13 +205,19 @@ func TestBatchedInput(t *testing.T) {
defer w.Stop()

// Send 2 messages together, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped\nfoo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped\nfoo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func TestRegexFilterRule(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "regex",
Expand All @@ -227,14 +227,20 @@ func TestRegexFilterRule(t *testing.T) {
defer w.Stop()

// Send 2 messages, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped")
publish(t, conf.NATSSubject[0], "foo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped")
publish(t, nc, conf.NATSSubject[0], "foo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func TestNegativeRegexFilterRule(t *testing.T) {
nc, closeNATS := runGnatsd(t)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
conf.Rule = []config.Rule{{
Rtype: "negregex",
Expand All @@ -244,14 +250,20 @@ func TestNegativeRegexFilterRule(t *testing.T) {
defer w.Stop()

// Send 2 messages, the first of which should be dropped.
publish(t, conf.NATSSubject[0], "should be dropped")
publish(t, conf.NATSSubject[0], "foo bar")
publish(t, nc, conf.NATSSubject[0], "should be dropped")
publish(t, nc, conf.NATSSubject[0], "foo bar")

assertWrite(t, "foo bar")
assertNoWrite(t)
influxd.AssertWrite(t, "foo bar")
influxd.AssertNoWrite(t)
}

func BenchmarkWriterLatency(b *testing.B) {
nc, closeNATS := runGnatsd(b)
defer closeNATS()

influxd := runTestInfluxd()
defer influxd.Stop()

conf := testConfig()
w := startWriter(b, conf)
defer w.Stop()
Expand All @@ -261,7 +273,26 @@ func BenchmarkWriterLatency(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
nc.Publish(conf.NATSSubject[0], byteArr)
<-httpWrites
<-influxd.Writes
}
}

type FatalTestingT interface {
Fatalf(string, ...interface{})
}

func runGnatsd(t FatalTestingT) (*nats.Conn, func()) {
gnatsd := spouttest.RunGnatsd(natsPort)

nc, err := nats.Connect(natsAddress)
if err != nil {
gnatsd.Shutdown()
t.Fatalf("NATS connect failed: %v", err)
}

return nc, func() {
nc.Close()
gnatsd.Shutdown()
}
}

Expand All @@ -271,24 +302,66 @@ func startWriter(t require.TestingT, conf *config.Config) *Writer {
return w
}

func publish(t require.TestingT, subject, msg string) {
func publish(t require.TestingT, nc *nats.Conn, subject, msg string) {
err := nc.Publish(subject, []byte(msg))
require.NoError(t, err)
}

func assertWrite(t *testing.T, expected string) {
type testInfluxd struct {
server *http.Server
wg sync.WaitGroup
Writes chan string
}

func runTestInfluxd() *testInfluxd {
s := &testInfluxd{
server: &http.Server{
Addr: fmt.Sprintf(":%d", influxPort),
},
Writes: make(chan string, 99),
}
s.wg.Add(1)
go s.run()
return s
}

func (s *testInfluxd) Stop() {
s.server.Close()
s.wg.Wait()
}

func (s *testInfluxd) AssertWrite(t *testing.T, expected string) {
select {
case msg := <-httpWrites:
case msg := <-s.Writes:
assert.Equal(t, msg, expected)
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for message")
}
}

func assertNoWrite(t *testing.T) {
func (s *testInfluxd) AssertNoWrite(t *testing.T) {
select {
case msg := <-httpWrites:
case msg := <-s.Writes:
t.Fatalf("saw unexpected write: %q", msg)
case <-time.After(spouttest.ShortWait):
}
}

func (s *testInfluxd) run() {
defer s.wg.Done()

mux := http.NewServeMux()
mux.HandleFunc("/write", s.handleWrite)
s.server.Handler = mux
s.server.ListenAndServe()
}

func (s *testInfluxd) handleWrite(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(fmt.Sprintf("Body read: %s", err))
}
w.WriteHeader(http.StatusNoContent)

s.Writes <- string(body)
}

0 comments on commit 24f198f

Please sign in to comment.