Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.

Use monitor component #64

Merged
merged 16 commits into from
Apr 17, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
spouttest: Include the monitor in the end-to-end test
  • Loading branch information
mjs committed Apr 10, 2018
commit 8f7c80ec4297086324599ab69736e3f632f4d77c
16 changes: 16 additions & 0 deletions spouttest/asserts.go
Original file line number Diff line number Diff line change
@@ -68,7 +68,23 @@ func AssertMonitor(t *testing.T, ch chan string, expected []string) {
}
}

// StripTimestamps takes a string containing one or more metrics
// lines, validates that each line appears to end with a timestamp and
// then strips the timestamp off. The returned string is the same as
// the input but without the timestamps (for easier test comparisons).
func StripTimestamps(t *testing.T, s string) string {
var out []string
for _, line := range strings.Split(s, "\n") {
out = append(out, stripTimestamp(t, line))
}
return strings.Join(out, "\n")
}

func stripTimestamp(t *testing.T, s string) string {
if len(s) < 1 {
return ""
}

i := strings.LastIndexByte(s, ' ')
require.True(t, i >= 0)

62 changes: 56 additions & 6 deletions spouttest/e2e_test.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ package spouttest_test
import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
@@ -40,6 +41,7 @@ const (
influxdPort = 44501
listenerPort = 44502
httpListenerPort = 44503
monitorPort = 44504
influxDBName = "test"
sendCount = 10
)
@@ -70,9 +72,13 @@ func TestEndToEnd(t *testing.T) {
writer := startWriter(t, fs)
defer writer.Stop()

// Make sure the listeners are actually listening.
assertListenerReady(t, listener)
assertListenerReady(t, httpListener)
monitor := startMonitor(t, fs)
defer monitor.Stop()

// Make sure the listeners & monitor are actually listening.
assertReady(t, listener)
assertReady(t, httpListener)
assertReady(t, monitor)

// Connect to the listener.
addr := net.JoinHostPort("localhost", strconv.Itoa(listenerPort))
@@ -120,17 +126,48 @@ func TestEndToEnd(t *testing.T) {
}
time.Sleep(250 * time.Millisecond)
}

// Check metrics published by monitor component.
expectedMetrics := `
failed_writes{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 0
invalid_time{filter="filter"} 0
passed{filter="filter"} 10
processed{filter="filter"} 20
read_errors{listener="listener"} 0
received{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2
received{listener="listener"} 5
rejected{filter="filter"} 10
sent{listener="listener"} 1
triggered{filter="filter",rule="system"} 10
write_requests{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2
`[1:]
var lines string
for try := 0; try < 20; try++ {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", monitorPort))
require.NoError(t, err)

raw, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

lines = spouttest.StripTimestamps(t, string(raw))
if lines == expectedMetrics {
return
}
time.Sleep(500 * time.Millisecond)
}

t.Fatalf("Failed to see expected metrics. Last saw:\n%s", lines)
}

type HasReady interface {
Ready() <-chan struct{}
}

func assertListenerReady(t *testing.T, listener interface{}) {
func assertReady(t *testing.T, component interface{}) {
select {
case <-listener.(HasReady).Ready():
case <-component.(HasReady).Ready():
case <-time.After(spouttest.LongWait):
t.Fatal("timeout out waiting for listener to be ready")
t.Fatal("timeout out waiting for component to be ready")
}
}

@@ -156,6 +193,7 @@ port = %d
nats_address = "nats://localhost:%d"
batch = 5
debug = true
nats_subject_monitor = "monitor"
`, listenerPort, natsPort))
}

@@ -166,6 +204,7 @@ port = %d
nats_address = "nats://localhost:%d"
batch = 5
debug = true
nats_subject_monitor = "monitor"
`, httpListenerPort, natsPort))
}

@@ -174,6 +213,7 @@ func startFilter(t *testing.T, fs afero.Fs) cmd.Stoppable {
mode = "filter"
nats_address = "nats://localhost:%d"
debug = true
nats_subject_monitor = "monitor"

[[rule]]
type = "basic"
@@ -192,9 +232,19 @@ influxdb_dbname = "%s"
batch = 1
workers = 4
debug = true
nats_subject_monitor = "monitor"
`, natsPort, influxdPort, influxDBName))
}

func startMonitor(t *testing.T, fs afero.Fs) cmd.Stoppable {
return startComponent(t, fs, "monitor", fmt.Sprintf(`
mode = "monitor"
nats_address = "nats://localhost:%d"
nats_subject_monitor = "monitor"
port = %d
`, natsPort, monitorPort))
}

func startComponent(t *testing.T, fs afero.Fs, name, config string) cmd.Stoppable {
configFilename := name + ".toml"
err := afero.WriteFile(fs, configFilename, []byte(config), 0600)