From 91ce149a61222cfd89157002161b3b1e0a6b5e1d Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 20 Feb 2018 22:27:04 +1300 Subject: [PATCH 1/6] Add utility to start a fake influxd listener This implements influxd work-a-like which simply records the lines it has received for later inspection. --- spouttest/influxd.go | 102 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 spouttest/influxd.go diff --git a/spouttest/influxd.go b/spouttest/influxd.go new file mode 100644 index 0000000..9882336 --- /dev/null +++ b/spouttest/influxd.go @@ -0,0 +1,102 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spouttest + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/http" + "sync" +) + +// RunFakeInfluxd starts a fake influxd instance with the HTTP port +// given. Stop() should be called on the returned instance once it is +// no longer needed. +func RunFakeInfluxd(port int) *FakeInfluxDB { + f := &FakeInfluxDB{ + server: &http.Server{ + Addr: fmt.Sprintf(":%d", port), + }, + lines: make(map[string][]string), + } + + f.wg.Add(1) + go f.run() + + return f +} + +// FakeInfluxDB implements a simple listener which mimics InfluxDB's +// HTTP write API, recording writes for later inspection. +type FakeInfluxDB struct { + server *http.Server + wg sync.WaitGroup + + mu sync.Mutex + lines map[string][]string +} + +// Stop shuts down the instance's server. It blocks until the server +// is stopped. +func (f *FakeInfluxDB) Stop() { + f.server.Close() + f.wg.Wait() +} + +// Lines returned the lines received for each "database". The returned +// map and slices are copies. Lines() is goroutine-safe. +func (f *FakeInfluxDB) Lines() map[string][]string { + f.mu.Lock() + defer f.mu.Unlock() + + out := make(map[string][]string) + for db, lines := range f.lines { + out[db] = append([]string(nil), lines...) + } + return out +} + +func (f *FakeInfluxDB) run() { + defer f.wg.Done() + + mux := http.NewServeMux() + mux.HandleFunc("/write", f.handleWrite) + f.server.Handler = mux + + log.Printf("fake influxd listening on %s", f.server.Addr) + f.server.ListenAndServe() +} + +func (f *FakeInfluxDB) handleWrite(w http.ResponseWriter, r *http.Request) { + db := r.URL.Query().Get("db") + body, err := ioutil.ReadAll(r.Body) + if err != nil { + panic(fmt.Sprintf("FakeInfluxDB read failed: %v", err)) + } + + f.mu.Lock() + lines := f.lines[db] + for _, line := range bytes.SplitAfter(body, []byte("\n")) { + if len(line) > 0 { + lines = append(lines, string(line)) + } + } + f.lines[db] = lines + f.mu.Unlock() + + w.WriteHeader(http.StatusNoContent) +} From 537eea905813798f3442fa34b85bdf41c3b898da Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 20 Feb 2018 22:29:12 +1300 Subject: [PATCH 2/6] Extract the bulk of main() The main func now just calls cmd.Run() where most of the work is done. This allows cmd.Run() to be used for the upcoming end-to-end test. --- cmd/run.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 33 +++-------------------------- 2 files changed, 64 insertions(+), 30 deletions(-) create mode 100644 cmd/run.go diff --git a/cmd/run.go b/cmd/run.go new file mode 100644 index 0000000..966bca8 --- /dev/null +++ b/cmd/run.go @@ -0,0 +1,61 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "fmt" + "runtime" + + "github.com/jumptrading/influx-spout/config" + "github.com/jumptrading/influx-spout/filter" + "github.com/jumptrading/influx-spout/listener" + "github.com/jumptrading/influx-spout/writer" +) + +// Stoppable defines something this is a capable of being stopped. +type Stoppable interface { + Stop() +} + +// Run parses the configuration file provided and starts influx-spout +// in the appropriate mode. +func Run(configFile string) (out Stoppable, err error) { + c, err := config.NewConfigFromFile(configFile) + if err != nil { + return nil, fmt.Errorf("Error while loading config file: %v", err) + } + + switch c.Mode { + case "listener": + out, err = listener.StartListener(c) + case "listener_http": + out, err = listener.StartHTTPListener(c) + case "filter": + out, err = filter.StartFilter(c) + case "writer": + if c.Workers == 0 { + // this seems to be an okay default from our testing experience: + // aim to have on average two workers per OS-thread running. + c.Workers = runtime.GOMAXPROCS(-1) * 2 + } + out, err = writer.StartWriter(c) + default: + return nil, fmt.Errorf("unknown mode of operation: [%s]", c.Mode) + } + if err != nil { + return nil, fmt.Errorf("failed to start %s: %v", c.Mode, err) + } + return out, nil +} diff --git a/main.go b/main.go index 4cc6aa6..a864bd0 100644 --- a/main.go +++ b/main.go @@ -20,10 +20,7 @@ import ( "os" "runtime" - "github.com/jumptrading/influx-spout/config" - "github.com/jumptrading/influx-spout/filter" - "github.com/jumptrading/influx-spout/listener" - "github.com/jumptrading/influx-spout/writer" + "github.com/jumptrading/influx-spout/cmd" ) // These are set at build time. @@ -61,32 +58,8 @@ func main() { log.Printf("Running %v version %s, built on %s, %s\n", os.Args[0], version, builtOn, runtime.Version()) - c, err := config.NewConfigFromFile(configFile) - if err != nil { - fmt.Printf("FATAL: Error while loading config file: %v\n", err) - os.Exit(1) + if _, err := cmd.Run(configFile); err != nil { + log.Fatalf("%s", err) } - - switch c.Mode { - case "listener": - _, err = listener.StartListener(c) - case "listener_http": - _, err = listener.StartHTTPListener(c) - case "filter": - _, err = filter.StartFilter(c) - case "writer": - if c.Workers == 0 { - // this seems to be an okay default from our testing experience: - // aim to have on average two workers per OS-thread running. - c.Workers = runtime.GOMAXPROCS(-1) * 2 - } - _, err = writer.StartWriter(c) - default: - log.Fatalf("unknown mode of operation: [%s]", c.Mode) - } - if err != nil { - log.Fatalf("failed to start %s: %v", c.Mode, err) - } - runtime.Goexit() } From 131ce492c44f3c52d0dd93d7cfb061e46053a26c Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 20 Feb 2018 22:30:11 +1300 Subject: [PATCH 3/6] config: Export `fs` to support external tests The end-to-end test needs to provide configuration files for various listener components so it's useful to able to use an isolated fake filesystem for this. --- config/config.go | 6 ++++-- config/config_small_test.go | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/config/config.go b/config/config.go index 709db75..6bcacff 100644 --- a/config/config.go +++ b/config/config.go @@ -108,7 +108,7 @@ func NewConfigFromFile(fileName string) (*Config, error) { } func readConfig(fileName string, conf *Config) error { - f, err := fs.Open(fileName) + f, err := Fs.Open(fileName) if err != nil { return err } @@ -133,4 +133,6 @@ func pathToConfigName(path string) string { return path[:len(path)-len(ext)] } -var fs = afero.NewOsFs() +// Fs abstracts away filesystem access for the config package. It +// should only be modified by tests. +var Fs = afero.NewOsFs() diff --git a/config/config_small_test.go b/config/config_small_test.go index 6c7912f..15abd5d 100644 --- a/config/config_small_test.go +++ b/config/config_small_test.go @@ -180,9 +180,9 @@ mode = "listener" batch = 100 debug = true ` - fs = afero.NewMemMapFs() - afero.WriteFile(fs, commonFileName, []byte(commonConfig), 0600) - afero.WriteFile(fs, testConfigFileName, []byte(specificConfig), 0600) + Fs = afero.NewMemMapFs() + afero.WriteFile(Fs, commonFileName, []byte(commonConfig), 0600) + afero.WriteFile(Fs, testConfigFileName, []byte(specificConfig), 0600) conf, err := NewConfigFromFile(testConfigFileName) require.NoError(t, err) @@ -202,9 +202,9 @@ batch = 100 debug = true ` - fs = afero.NewMemMapFs() - afero.WriteFile(fs, commonFileName, []byte(commonConfig), 0600) - afero.WriteFile(fs, testConfigFileName, []byte(specificConfig), 0600) + Fs = afero.NewMemMapFs() + afero.WriteFile(Fs, commonFileName, []byte(commonConfig), 0600) + afero.WriteFile(Fs, testConfigFileName, []byte(specificConfig), 0600) _, err := NewConfigFromFile(testConfigFileName) require.Error(t, err) @@ -218,14 +218,14 @@ func (*failingOpenFs) Open(string) (afero.File, error) { } func TestErrorOpeningCommonFile(t *testing.T) { - fs = new(failingOpenFs) + Fs = new(failingOpenFs) _, err := NewConfigFromFile(testConfigFileName) assert.EqualError(t, err, "boom") } func TestOpenError(t *testing.T) { - fs = afero.NewMemMapFs() + Fs = afero.NewMemMapFs() conf, err := NewConfigFromFile("/does/not/exist") assert.Nil(t, conf) @@ -233,8 +233,8 @@ func TestOpenError(t *testing.T) { } func parseConfig(content string) (*Config, error) { - fs = afero.NewMemMapFs() - afero.WriteFile(fs, testConfigFileName, []byte(content), 0600) + Fs = afero.NewMemMapFs() + afero.WriteFile(Fs, testConfigFileName, []byte(content), 0600) return NewConfigFromFile(testConfigFileName) } From e23d223ae7d5d3a34a2bb80d617e71a08d6da2f7 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 20 Feb 2018 22:31:50 +1300 Subject: [PATCH 4/6] More consistent & useful logging --- filter/filter.go | 6 +++++- listener/listener.go | 8 ++++++-- writer/writer.go | 3 ++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/filter/filter.go b/filter/filter.go index 5493eaa..f77d545 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -74,6 +74,9 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) { } f.sub, err = f.nc.Subscribe(f.c.NATSSubject[0], func(msg *nats.Msg) { + if conf.Debug { + log.Printf("filter received %d bytes", len(msg.Data)) + } jobs <- msg.Data }) if err != nil { @@ -83,7 +86,8 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) { f.wg.Add(1) go f.startStatistician(stats, rules) - log.Printf("Filter listening on [%s] with %d rules\n", f.c.NATSSubject, rules.Count()) + log.Printf("filter subscribed to [%s] at %s with %d rules\n", + f.c.NATSSubject[0], f.c.NATSAddress, rules.Count()) return f, nil } diff --git a/listener/listener.go b/listener/listener.go index aadcc4f..6d14f12 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -65,6 +65,8 @@ func StartListener(c *config.Config) (*Listener, error) { listener.wg.Add(2) go listener.startStatistician() go listener.listenUDP(sc) + + log.Printf("UDP listener publishing to [%s] at %s", c.NATSSubject[0], c.NATSAddress) listener.notifyState("ready") return listener, nil @@ -83,6 +85,8 @@ func StartHTTPListener(c *config.Config) (*Listener, error) { listener.wg.Add(2) go listener.startStatistician() go listener.listenHTTP(server) + + log.Printf("HTTP listener publishing to [%s] at %s", c.NATSSubject[0], c.NATSAddress) listener.notifyState("ready") return listener, nil @@ -153,7 +157,7 @@ func (l *Listener) setupUDP(configBufSize int) (*net.UDPConn, error) { return nil, err } - log.Printf("Listener bound to UDP socket: %v\n", sc.LocalAddr().String()) + log.Printf("listener bound to UDP socket: %v\n", sc.LocalAddr().String()) return sc, nil } @@ -241,7 +245,7 @@ func (l *Listener) processRead(sz int) { l.batchSize += sz if l.c.Debug { - log.Printf("Info: Listener read %d bytes\n", sz) + log.Printf("listener read %d bytes\n", sz) } // Send when sufficient reads have been batched or the batch diff --git a/writer/writer.go b/writer/writer.go index efc8249..6b36906 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -131,7 +131,8 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { // notify the monitor that we are ready to receive messages and transmit to influxdb w.notifyState("ready") - log.Printf("listening on [%v] with %d workers\n", c.NATSSubject, c.Workers) + log.Printf("writer subscribed to [%v] at %s with %d workers", + c.NATSSubject, c.NATSAddress, c.Workers) log.Printf("POST timeout: %ds", c.WriteTimeoutSecs) log.Printf("maximum NATS subject size: %dMB", c.NATSPendingMaxMB) From c684a644da8a97678eae8d4c120abe16e25ca6cc Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 20 Feb 2018 22:33:12 +1300 Subject: [PATCH 5/6] Expose listener "ready" channel This will be useful for the end-to-end test for avoiding a race where the test could start sending lines to the listener before it's up. --- listener/listener.go | 14 +++++++++++++- listener/listener_medium_test.go | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/listener/listener.go b/listener/listener.go index 6d14f12..600fcce 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -52,11 +52,17 @@ var statsInterval = 3 * time.Second // // The listener reads incoming UDP packets, batches them up and send // batches onwards to a NATS subject. -func StartListener(c *config.Config) (*Listener, error) { +func StartListener(c *config.Config) (_ *Listener, err error) { listener, err := newListener(c) if err != nil { return nil, err } + defer func() { + if err != nil { + listener.Stop() + } + }() + sc, err := listener.setupUDP(c.ReadBufferBytes) if err != nil { return nil, err @@ -106,6 +112,12 @@ type Listener struct { stop chan struct{} } +// Ready returns a channel which is closed once the listener is +// actually listening for incoming data. +func (l *Listener) Ready() <-chan struct{} { + return l.ready +} + func (l *Listener) Stop() { close(l.stop) l.wg.Wait() diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index 1206857..8161f31 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -234,7 +234,7 @@ func startListener(t require.TestingT, conf *config.Config) *Listener { func assertListenerStarted(t require.TestingT, listener *Listener) { select { - case <-listener.ready: + case <-listener.Ready(): case <-time.After(spouttest.LongWait): listener.Stop() t.Errorf("listener failed to start up") From 2c4067344e6b0d3fd2c84532d07793540124f92c Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 20 Feb 2018 22:33:58 +1300 Subject: [PATCH 6/6] Implement end-to-end test This is the first "large" test. It starts a UDP listener, HTTP listener, filter & writer, ensuring that lines flow correctly from the listeners to the (fake) influxd. The sets up the various influx-spout component using configuration files to ensure that configuration results in expected behaviour. Due to the number of moving parts involved, extra care has been taken to avoid races where a component isn't ready before something attempts to use it. --- spouttest/e2e_test.go | 205 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 spouttest/e2e_test.go diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go new file mode 100644 index 0000000..deb5ad6 --- /dev/null +++ b/spouttest/e2e_test.go @@ -0,0 +1,205 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build large + +package spouttest_test + +import ( + "bytes" + "fmt" + "net" + "net/http" + "strconv" + "strings" + "testing" + "time" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jumptrading/influx-spout/cmd" + "github.com/jumptrading/influx-spout/config" + "github.com/jumptrading/influx-spout/spouttest" +) + +const ( + natsPort = 44500 + influxdPort = 44501 + listenerPort = 44502 + httpListenerPort = 44503 + influxDBName = "test" + sendCount = 10 +) + +func TestEndToEnd(t *testing.T) { + // Start gnatsd. + gnatsd := spouttest.RunGnatsd(natsPort) + defer gnatsd.Shutdown() + + // Start influxd & set up test database. + influxd := spouttest.RunFakeInfluxd(influxdPort) + defer influxd.Stop() + + // Use a fake filesystem (for config files). + fs := afero.NewMemMapFs() + config.Fs = fs + + // Start spout components. + listener := startListener(t, fs) + defer listener.Stop() + + httpListener := startHTTPListener(t, fs) + defer httpListener.Stop() + + filter := startFilter(t, fs) + defer filter.Stop() + + writer := startWriter(t, fs) + defer writer.Stop() + + // Make sure the listeners are actually listening. + assertListenerReady(t, listener) + assertListenerReady(t, httpListener) + + // Connect to the listener. + addr := net.JoinHostPort("localhost", strconv.Itoa(listenerPort)) + conn, err := net.Dial("udp", addr) + require.NoError(t, err) + defer conn.Close() + + // Do 5 UDP metric sends each containing 2 lines. + for i := 0; i < sendCount/2; i++ { + _, err := conn.Write(makeTestLines().Bytes()) + require.NoError(t, err) + + // Generous sleep between sends to avoid UDP drops. + time.Sleep(100 * time.Millisecond) + } + + // Do 5 HTTP metric sends, the same as the UDP sends above. + url := fmt.Sprintf("http://localhost:%d/write", httpListenerPort) + for i := 0; i < sendCount/2; i++ { + _, err := http.Post(url, "text/plain", makeTestLines()) + require.NoError(t, err) + } + + // Check "database". + maxWaitTime := time.Now().Add(spouttest.LongWait) + for { + lines := influxd.Lines() + recvCount := len(lines[influxDBName]) + if recvCount == sendCount { + // Expected number of lines received... + // Now check they are correct. + for _, line := range lines[influxDBName] { + if !strings.HasPrefix(line, cpuLine) { + t.Fatalf("unexpected line received: %s", line) + } + } + + // No writes to other databases are expected. + assert.Len(t, lines, 1) + + break // Success + } + if time.Now().After(maxWaitTime) { + t.Fatalf("failed to see expected database records. Saw %d records.", recvCount) + } + time.Sleep(250 * time.Millisecond) + } +} + +type HasReady interface { + Ready() <-chan struct{} +} + +func assertListenerReady(t *testing.T, listener interface{}) { + select { + case <-listener.(HasReady).Ready(): + case <-time.After(spouttest.LongWait): + t.Fatal("timeout out waiting for listener to be ready") + } +} + +const cpuLine = "cpu,env=prod,cls=server user=13.33,usage_system=0.16,usage_idle=86.53" + +func makeTestLines() *bytes.Buffer { + now := time.Now().UnixNano() + out := new(bytes.Buffer) + + // Only the 2nd line should make it through the filter. + fmt.Fprintf(out, ` +foo,env=dev bar=99 %d +%s %d +`[1:], now, cpuLine, now) + + return out +} + +func startListener(t *testing.T, fs afero.Fs) cmd.Stoppable { + return startComponent(t, fs, "listener", fmt.Sprintf(` +mode = "listener" +port = %d +nats_address = "nats://localhost:%d" +batch = 5 +debug = true +`, listenerPort, natsPort)) +} + +func startHTTPListener(t *testing.T, fs afero.Fs) cmd.Stoppable { + return startComponent(t, fs, "listener", fmt.Sprintf(` +mode = "listener_http" +port = %d +nats_address = "nats://localhost:%d" +batch = 5 +debug = true +`, httpListenerPort, natsPort)) +} + +func startFilter(t *testing.T, fs afero.Fs) cmd.Stoppable { + return startComponent(t, fs, "filter", fmt.Sprintf(` +mode = "filter" +nats_address = "nats://localhost:%d" +debug = true + +[[rule]] +type = "basic" +match = "cpu" +subject = "system" +`, natsPort)) +} + +func startWriter(t *testing.T, fs afero.Fs) cmd.Stoppable { + return startComponent(t, fs, "writer", fmt.Sprintf(` +mode = "writer" +nats_address = "nats://localhost:%d" +nats_subject = ["system"] +influxdb_port = %d +influxdb_dbname = "%s" +batch = 1 +workers = 4 +debug = true +`, natsPort, influxdPort, influxDBName)) +} + +func startComponent(t *testing.T, fs afero.Fs, name, config string) cmd.Stoppable { + configFilename := name + ".toml" + err := afero.WriteFile(fs, configFilename, []byte(config), 0600) + require.NoError(t, err) + s, err := cmd.Run(configFilename) + require.NoError(t, err) + return s +}