Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #25 from jumptrading/end-to-end-test
Browse files Browse the repository at this point in the history
Implement an end-to-end test
  • Loading branch information
mjs authored Feb 20, 2018
2 parents 23d470d + 2c40673 commit 99990d9
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 48 deletions.
61 changes: 61 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2018 Jump Trading
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"fmt"
"runtime"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/filter"
"github.com/jumptrading/influx-spout/listener"
"github.com/jumptrading/influx-spout/writer"
)

// Stoppable defines something this is a capable of being stopped.
type Stoppable interface {
Stop()
}

// Run parses the configuration file provided and starts influx-spout
// in the appropriate mode.
func Run(configFile string) (out Stoppable, err error) {
c, err := config.NewConfigFromFile(configFile)
if err != nil {
return nil, fmt.Errorf("Error while loading config file: %v", err)
}

switch c.Mode {
case "listener":
out, err = listener.StartListener(c)
case "listener_http":
out, err = listener.StartHTTPListener(c)
case "filter":
out, err = filter.StartFilter(c)
case "writer":
if c.Workers == 0 {
// this seems to be an okay default from our testing experience:
// aim to have on average two workers per OS-thread running.
c.Workers = runtime.GOMAXPROCS(-1) * 2
}
out, err = writer.StartWriter(c)
default:
return nil, fmt.Errorf("unknown mode of operation: [%s]", c.Mode)
}
if err != nil {
return nil, fmt.Errorf("failed to start %s: %v", c.Mode, err)
}
return out, nil
}
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewConfigFromFile(fileName string) (*Config, error) {
}

func readConfig(fileName string, conf *Config) error {
f, err := fs.Open(fileName)
f, err := Fs.Open(fileName)
if err != nil {
return err
}
Expand All @@ -133,4 +133,6 @@ func pathToConfigName(path string) string {
return path[:len(path)-len(ext)]
}

var fs = afero.NewOsFs()
// Fs abstracts away filesystem access for the config package. It
// should only be modified by tests.
var Fs = afero.NewOsFs()
20 changes: 10 additions & 10 deletions config/config_small_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ mode = "listener"
batch = 100
debug = true
`
fs = afero.NewMemMapFs()
afero.WriteFile(fs, commonFileName, []byte(commonConfig), 0600)
afero.WriteFile(fs, testConfigFileName, []byte(specificConfig), 0600)
Fs = afero.NewMemMapFs()
afero.WriteFile(Fs, commonFileName, []byte(commonConfig), 0600)
afero.WriteFile(Fs, testConfigFileName, []byte(specificConfig), 0600)

conf, err := NewConfigFromFile(testConfigFileName)
require.NoError(t, err)
Expand All @@ -202,9 +202,9 @@ batch = 100
debug = true
`

fs = afero.NewMemMapFs()
afero.WriteFile(fs, commonFileName, []byte(commonConfig), 0600)
afero.WriteFile(fs, testConfigFileName, []byte(specificConfig), 0600)
Fs = afero.NewMemMapFs()
afero.WriteFile(Fs, commonFileName, []byte(commonConfig), 0600)
afero.WriteFile(Fs, testConfigFileName, []byte(specificConfig), 0600)

_, err := NewConfigFromFile(testConfigFileName)
require.Error(t, err)
Expand All @@ -218,23 +218,23 @@ func (*failingOpenFs) Open(string) (afero.File, error) {
}

func TestErrorOpeningCommonFile(t *testing.T) {
fs = new(failingOpenFs)
Fs = new(failingOpenFs)

_, err := NewConfigFromFile(testConfigFileName)
assert.EqualError(t, err, "boom")
}

func TestOpenError(t *testing.T) {
fs = afero.NewMemMapFs()
Fs = afero.NewMemMapFs()

conf, err := NewConfigFromFile("/does/not/exist")
assert.Nil(t, conf)
assert.Error(t, err)
}

func parseConfig(content string) (*Config, error) {
fs = afero.NewMemMapFs()
afero.WriteFile(fs, testConfigFileName, []byte(content), 0600)
Fs = afero.NewMemMapFs()
afero.WriteFile(Fs, testConfigFileName, []byte(content), 0600)

return NewConfigFromFile(testConfigFileName)
}
6 changes: 5 additions & 1 deletion filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {
}

f.sub, err = f.nc.Subscribe(f.c.NATSSubject[0], func(msg *nats.Msg) {
if conf.Debug {
log.Printf("filter received %d bytes", len(msg.Data))
}
jobs <- msg.Data
})
if err != nil {
Expand All @@ -83,7 +86,8 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {
f.wg.Add(1)
go f.startStatistician(stats, rules)

log.Printf("Filter listening on [%s] with %d rules\n", f.c.NATSSubject, rules.Count())
log.Printf("filter subscribed to [%s] at %s with %d rules\n",
f.c.NATSSubject[0], f.c.NATSAddress, rules.Count())
return f, nil
}

Expand Down
22 changes: 19 additions & 3 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,17 @@ var statsInterval = 3 * time.Second
//
// The listener reads incoming UDP packets, batches them up and send
// batches onwards to a NATS subject.
func StartListener(c *config.Config) (*Listener, error) {
func StartListener(c *config.Config) (_ *Listener, err error) {
listener, err := newListener(c)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
listener.Stop()
}
}()

sc, err := listener.setupUDP(c.ReadBufferBytes)
if err != nil {
return nil, err
Expand All @@ -65,6 +71,8 @@ func StartListener(c *config.Config) (*Listener, error) {
listener.wg.Add(2)
go listener.startStatistician()
go listener.listenUDP(sc)

log.Printf("UDP listener publishing to [%s] at %s", c.NATSSubject[0], c.NATSAddress)
listener.notifyState("ready")

return listener, nil
Expand All @@ -83,6 +91,8 @@ func StartHTTPListener(c *config.Config) (*Listener, error) {
listener.wg.Add(2)
go listener.startStatistician()
go listener.listenHTTP(server)

log.Printf("HTTP listener publishing to [%s] at %s", c.NATSSubject[0], c.NATSAddress)
listener.notifyState("ready")

return listener, nil
Expand All @@ -102,6 +112,12 @@ type Listener struct {
stop chan struct{}
}

// Ready returns a channel which is closed once the listener is
// actually listening for incoming data.
func (l *Listener) Ready() <-chan struct{} {
return l.ready
}

func (l *Listener) Stop() {
close(l.stop)
l.wg.Wait()
Expand Down Expand Up @@ -153,7 +169,7 @@ func (l *Listener) setupUDP(configBufSize int) (*net.UDPConn, error) {
return nil, err
}

log.Printf("Listener bound to UDP socket: %v\n", sc.LocalAddr().String())
log.Printf("listener bound to UDP socket: %v\n", sc.LocalAddr().String())
return sc, nil
}

Expand Down Expand Up @@ -241,7 +257,7 @@ func (l *Listener) processRead(sz int) {
l.batchSize += sz

if l.c.Debug {
log.Printf("Info: Listener read %d bytes\n", sz)
log.Printf("listener read %d bytes\n", sz)
}

// Send when sufficient reads have been batched or the batch
Expand Down
2 changes: 1 addition & 1 deletion listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func startListener(t require.TestingT, conf *config.Config) *Listener {

func assertListenerStarted(t require.TestingT, listener *Listener) {
select {
case <-listener.ready:
case <-listener.Ready():
case <-time.After(spouttest.LongWait):
listener.Stop()
t.Errorf("listener failed to start up")
Expand Down
33 changes: 3 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import (
"os"
"runtime"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/filter"
"github.com/jumptrading/influx-spout/listener"
"github.com/jumptrading/influx-spout/writer"
"github.com/jumptrading/influx-spout/cmd"
)

// These are set at build time.
Expand Down Expand Up @@ -61,32 +58,8 @@ func main() {

log.Printf("Running %v version %s, built on %s, %s\n", os.Args[0], version, builtOn, runtime.Version())

c, err := config.NewConfigFromFile(configFile)
if err != nil {
fmt.Printf("FATAL: Error while loading config file: %v\n", err)
os.Exit(1)
if _, err := cmd.Run(configFile); err != nil {
log.Fatalf("%s", err)
}

switch c.Mode {
case "listener":
_, err = listener.StartListener(c)
case "listener_http":
_, err = listener.StartHTTPListener(c)
case "filter":
_, err = filter.StartFilter(c)
case "writer":
if c.Workers == 0 {
// this seems to be an okay default from our testing experience:
// aim to have on average two workers per OS-thread running.
c.Workers = runtime.GOMAXPROCS(-1) * 2
}
_, err = writer.StartWriter(c)
default:
log.Fatalf("unknown mode of operation: [%s]", c.Mode)
}
if err != nil {
log.Fatalf("failed to start %s: %v", c.Mode, err)
}

runtime.Goexit()
}
Loading

0 comments on commit 99990d9

Please sign in to comment.