diff --git a/writer/writer_medium_test.go b/writer/writer_medium_test.go index 605dc14..68cc69b 100644 --- a/writer/writer_medium_test.go +++ b/writer/writer_medium_test.go @@ -20,9 +20,9 @@ import ( "fmt" "io/ioutil" "net/http" - "os" "strconv" "strings" + "sync" "testing" "time" @@ -34,11 +34,8 @@ import ( "github.com/jumptrading/influx-spout/spouttest" ) -var httpWrites = make(chan string, 10) -var nc *nats.Conn - -const natsPort = 44443 const influxPort = 44445 +const natsPort = 44443 var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort) @@ -61,40 +58,13 @@ func testConfig() *config.Config { } } -func TestMain(m *testing.M) { - os.Exit(runMain(m)) -} - -func runMain(m *testing.M) int { - var err error - - // Start gnatsd. - s := spouttest.RunGnatsd(natsPort) - defer s.Shutdown() - - // Set up a dummy HTTP server to write to. - http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if err != nil { - fmt.Printf("Body read: %v\n", err) - } - httpWrites <- string(body) - w.WriteHeader(http.StatusNoContent) - }) - go http.ListenAndServe(fmt.Sprintf(":%d", influxPort), nil) - - // connect to the NATS instance - nc, err = nats.Connect(natsAddress) - if err != nil { - fmt.Printf("Error while setup: %v\n", err) - return 1 - } - defer nc.Close() +func TestBasicWriter(t *testing.T) { + nc, closeNATS := runGnatsd(t) + defer closeNATS() - return m.Run() -} + influxd := runTestInfluxd() + defer influxd.Stop() -func TestBasicWriter(t *testing.T) { // No filter rules. conf := testConfig() w := startWriter(t, conf) @@ -109,17 +79,17 @@ func TestBasicWriter(t *testing.T) { // Publish 5 messages to the bus. subject := conf.NATSSubject[0] - publish(t, subject, "To be, or not to be: that is the question:") - publish(t, subject, "Whether ’tis nobler in the mind to suffer") - publish(t, subject, "The slings and arrows of outrageous fortune,") - publish(t, subject, "Or to take arms against a sea of troubles,") - publish(t, subject, "And by opposing end them. To die: to sleep;") + publish(t, nc, subject, "To be, or not to be: that is the question:") + publish(t, nc, subject, "Whether ’tis nobler in the mind to suffer") + publish(t, nc, subject, "The slings and arrows of outrageous fortune,") + publish(t, nc, subject, "Or to take arms against a sea of troubles,") + publish(t, nc, subject, "And by opposing end them. To die: to sleep;") // Wait for confirmation that they were written. timeout := time.After(spouttest.LongWait) for i := 0; i < 5; i++ { select { - case <-httpWrites: + case <-influxd.Writes: case <-timeout: t.Fatal("timed out waiting for messages") } @@ -137,6 +107,12 @@ func TestBasicWriter(t *testing.T) { } func TestBatchMBLimit(t *testing.T) { + nc, closeNATS := runGnatsd(t) + defer closeNATS() + + influxd := runTestInfluxd() + defer influxd.Stop() + // No filter rules. conf := testConfig() conf.Workers = 1 @@ -153,21 +129,27 @@ func TestBatchMBLimit(t *testing.T) { large[i] = byte('x') } for i := 0; i < chunks; i++ { - publish(t, conf.NATSSubject[0], string(large)) + publish(t, nc, conf.NATSSubject[0], string(large)) } // the messages should come through (in one batch) because // BatchMaxMB is exceed select { - case msg := <-httpWrites: + case msg := <-influxd.Writes: assert.Len(t, msg, totalSize) case <-time.After(spouttest.LongWait): t.Fatal("timed out waiting for messages") } - assertNoWrite(t) + influxd.AssertNoWrite(t) } func TestBatchTimeLimit(t *testing.T) { + nc, closeNATS := runGnatsd(t) + defer closeNATS() + + influxd := runTestInfluxd() + defer influxd.Stop() + // No filter rules. conf := testConfig() conf.Workers = 1 @@ -178,13 +160,19 @@ func TestBatchTimeLimit(t *testing.T) { // Send one small message. It should still come through because of // BatchMaxSecs. - publish(t, conf.NATSSubject[0], "foo") + publish(t, nc, conf.NATSSubject[0], "foo") - assertWrite(t, "foo") - assertNoWrite(t) + influxd.AssertWrite(t, "foo") + influxd.AssertNoWrite(t) } func TestBasicFilterRule(t *testing.T) { + nc, closeNATS := runGnatsd(t) + defer closeNATS() + + influxd := runTestInfluxd() + defer influxd.Stop() + conf := testConfig() conf.Rule = []config.Rule{{ Rtype: "basic", @@ -194,14 +182,20 @@ func TestBasicFilterRule(t *testing.T) { defer w.Stop() // Send 2 messages, the first of which should be dropped. - publish(t, conf.NATSSubject[0], "should be dropped") - publish(t, conf.NATSSubject[0], "foo bar") + publish(t, nc, conf.NATSSubject[0], "should be dropped") + publish(t, nc, conf.NATSSubject[0], "foo bar") - assertWrite(t, "foo bar") - assertNoWrite(t) + influxd.AssertWrite(t, "foo bar") + influxd.AssertNoWrite(t) } func TestBatchedInput(t *testing.T) { + nc, closeNATS := runGnatsd(t) + defer closeNATS() + + influxd := runTestInfluxd() + defer influxd.Stop() + conf := testConfig() conf.Rule = []config.Rule{{ Rtype: "basic", @@ -211,13 +205,19 @@ func TestBatchedInput(t *testing.T) { defer w.Stop() // Send 2 messages together, the first of which should be dropped. - publish(t, conf.NATSSubject[0], "should be dropped\nfoo bar") + publish(t, nc, conf.NATSSubject[0], "should be dropped\nfoo bar") - assertWrite(t, "foo bar") - assertNoWrite(t) + influxd.AssertWrite(t, "foo bar") + influxd.AssertNoWrite(t) } func TestRegexFilterRule(t *testing.T) { + nc, closeNATS := runGnatsd(t) + defer closeNATS() + + influxd := runTestInfluxd() + defer influxd.Stop() + conf := testConfig() conf.Rule = []config.Rule{{ Rtype: "regex", @@ -227,14 +227,20 @@ func TestRegexFilterRule(t *testing.T) { defer w.Stop() // Send 2 messages, the first of which should be dropped. - publish(t, conf.NATSSubject[0], "should be dropped") - publish(t, conf.NATSSubject[0], "foo bar") + publish(t, nc, conf.NATSSubject[0], "should be dropped") + publish(t, nc, conf.NATSSubject[0], "foo bar") - assertWrite(t, "foo bar") - assertNoWrite(t) + influxd.AssertWrite(t, "foo bar") + influxd.AssertNoWrite(t) } func TestNegativeRegexFilterRule(t *testing.T) { + nc, closeNATS := runGnatsd(t) + defer closeNATS() + + influxd := runTestInfluxd() + defer influxd.Stop() + conf := testConfig() conf.Rule = []config.Rule{{ Rtype: "negregex", @@ -244,14 +250,20 @@ func TestNegativeRegexFilterRule(t *testing.T) { defer w.Stop() // Send 2 messages, the first of which should be dropped. - publish(t, conf.NATSSubject[0], "should be dropped") - publish(t, conf.NATSSubject[0], "foo bar") + publish(t, nc, conf.NATSSubject[0], "should be dropped") + publish(t, nc, conf.NATSSubject[0], "foo bar") - assertWrite(t, "foo bar") - assertNoWrite(t) + influxd.AssertWrite(t, "foo bar") + influxd.AssertNoWrite(t) } func BenchmarkWriterLatency(b *testing.B) { + nc, closeNATS := runGnatsd(b) + defer closeNATS() + + influxd := runTestInfluxd() + defer influxd.Stop() + conf := testConfig() w := startWriter(b, conf) defer w.Stop() @@ -261,7 +273,26 @@ func BenchmarkWriterLatency(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { nc.Publish(conf.NATSSubject[0], byteArr) - <-httpWrites + <-influxd.Writes + } +} + +type FatalTestingT interface { + Fatalf(string, ...interface{}) +} + +func runGnatsd(t FatalTestingT) (*nats.Conn, func()) { + gnatsd := spouttest.RunGnatsd(natsPort) + + nc, err := nats.Connect(natsAddress) + if err != nil { + gnatsd.Shutdown() + t.Fatalf("NATS connect failed: %v", err) + } + + return nc, func() { + nc.Close() + gnatsd.Shutdown() } } @@ -271,24 +302,66 @@ func startWriter(t require.TestingT, conf *config.Config) *Writer { return w } -func publish(t require.TestingT, subject, msg string) { +func publish(t require.TestingT, nc *nats.Conn, subject, msg string) { err := nc.Publish(subject, []byte(msg)) require.NoError(t, err) } -func assertWrite(t *testing.T, expected string) { +type testInfluxd struct { + server *http.Server + wg sync.WaitGroup + Writes chan string +} + +func runTestInfluxd() *testInfluxd { + s := &testInfluxd{ + server: &http.Server{ + Addr: fmt.Sprintf(":%d", influxPort), + }, + Writes: make(chan string, 99), + } + s.wg.Add(1) + go s.run() + return s +} + +func (s *testInfluxd) Stop() { + s.server.Close() + s.wg.Wait() +} + +func (s *testInfluxd) AssertWrite(t *testing.T, expected string) { select { - case msg := <-httpWrites: + case msg := <-s.Writes: assert.Equal(t, msg, expected) case <-time.After(spouttest.LongWait): t.Fatal("timed out waiting for message") } } -func assertNoWrite(t *testing.T) { +func (s *testInfluxd) AssertNoWrite(t *testing.T) { select { - case msg := <-httpWrites: + case msg := <-s.Writes: t.Fatalf("saw unexpected write: %q", msg) case <-time.After(spouttest.ShortWait): } } + +func (s *testInfluxd) run() { + defer s.wg.Done() + + mux := http.NewServeMux() + mux.HandleFunc("/write", s.handleWrite) + s.server.Handler = mux + s.server.ListenAndServe() +} + +func (s *testInfluxd) handleWrite(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + panic(fmt.Sprintf("Body read: %s", err)) + } + w.WriteHeader(http.StatusNoContent) + + s.Writes <- string(body) +}