diff --git a/README.md b/README.md index 8a6b6d9..d0f1024 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,11 @@ nats_subject_monitor = "influx-spout-monitor" # The number of filter workers to spawn. workers = 8 +# Incoming metrics with timestamps ± this value from the current time will be +# rejected. Metrics with timestamps that are significantly different from previously +# written timestamps negatively impact InfluxDB performance. +max_time_delta_secs = 600 + # At least one rule should be defined. Rules are defined using TOML's table # syntax. The following examples show each rule type. diff --git a/config/config.go b/config/config.go index 6bcacff..fa6c49c 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,7 @@ type Config struct { NATSPendingMaxMB int `toml:"nats_pending_max_mb"` ListenerBatchBytes int `toml:"listener_batch_bytes"` Rule []Rule `toml:"rule"` + MaxTimeDeltaSecs int `toml:"max_time_delta_secs"` Debug bool `toml:"debug"` } @@ -77,6 +78,7 @@ func newDefaultConfig() *Config { ReadBufferBytes: 4 * 1024 * 1024, NATSPendingMaxMB: 200, ListenerBatchBytes: 1024 * 1024, + MaxTimeDeltaSecs: 600, } } diff --git a/config/config_small_test.go b/config/config_small_test.go index 15abd5d..94f7ab3 100644 --- a/config/config_small_test.go +++ b/config/config_small_test.go @@ -54,6 +54,7 @@ write_timeout_secs = 32 read_buffer_bytes = 43210 nats_pending_max_mb = 100 listener_batch_bytes = 4096 +max_time_delta_secs = 789 ` conf, err := parseConfig(validConfigSample) require.NoError(t, err, "Couldn't parse a valid config: %v\n", err) @@ -67,8 +68,9 @@ listener_batch_bytes = 4096 assert.Equal(t, 96, conf.Workers) assert.Equal(t, 32, conf.WriteTimeoutSecs, "WriteTimeoutSecs must match") assert.Equal(t, 43210, conf.ReadBufferBytes) - assert.Equal(t, 100, conf.NATSPendingMaxMB, "NATSPendingMaxMB must match") - assert.Equal(t, 4096, conf.ListenerBatchBytes, "NATSPendingMaxMB must match") + assert.Equal(t, 100, conf.NATSPendingMaxMB) + assert.Equal(t, 4096, conf.ListenerBatchBytes) + assert.Equal(t, 789, conf.MaxTimeDeltaSecs) assert.Equal(t, 8086, conf.InfluxDBPort, "InfluxDB Port must match") assert.Equal(t, "junk_nats", conf.DBName, "InfluxDB DBname must match") @@ -101,6 +103,7 @@ func TestAllDefaults(t *testing.T) { assert.Equal(t, 4194304, conf.ReadBufferBytes) assert.Equal(t, 200, conf.NATSPendingMaxMB) assert.Equal(t, 1048576, conf.ListenerBatchBytes) + assert.Equal(t, 600, conf.MaxTimeDeltaSecs) assert.Equal(t, false, conf.Debug) assert.Len(t, conf.Rule, 0) } diff --git a/filter/filter.go b/filter/filter.go index 51556c8..708177a 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -22,18 +22,18 @@ import ( "sync" "time" - "github.com/nats-io/go-nats" - "github.com/jumptrading/influx-spout/config" "github.com/jumptrading/influx-spout/lineformatter" "github.com/jumptrading/influx-spout/stats" + "github.com/nats-io/nats" ) // Name for supported stats const ( - linesPassed = "lines-passed" - linesProcessed = "lines-processed" - linesRejected = "lines-rejected" + linesPassed = "passed" + linesProcessed = "processed" + linesRejected = "rejected" + linesInvalidTime = "invalid-time" ) // StartFilter creates a Filter instance, sets up its rules based on @@ -65,7 +65,14 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) { jobs := make(chan []byte, 1024) for i := 0; i < f.c.Workers; i++ { - w, err := newWorker(rules, stats, f.natsConnect, f.c.NATSSubjectJunkyard) + w, err := newWorker( + f.c.MaxTimeDeltaSecs, + rules, + stats, + f.c.Debug, + f.natsConnect, + f.c.NATSSubjectJunkyard, + ) if err != nil { return nil, fmt.Errorf("failed to start worker: %v", err) } @@ -105,6 +112,7 @@ func initStats(rules *RuleSet) *stats.Stats { linesPassed, linesProcessed, linesRejected, + linesInvalidTime, } for i := 0; i < rules.Count(); i++ { statNames = append(statNames, ruleToStatsName(i)) @@ -153,7 +161,7 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { totalLine := lineformatter.New( "spout_stat_filter", []string{"filter"}, - "passed", "processed", "rejected", + linesPassed, linesProcessed, linesRejected, linesInvalidTime, ) ruleLine := lineformatter.New( "spout_stat_filter_rule", @@ -170,6 +178,7 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { st.Get(linesPassed), st.Get(linesProcessed), st.Get(linesRejected), + st.Get(linesInvalidTime), )) // publish the per rule stats diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index f0da8bb..a01c1f3 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -18,7 +18,9 @@ package filter import ( "fmt" + "strings" "testing" + "time" "github.com/nats-io/go-nats" "github.com/stretchr/testify/require" @@ -29,25 +31,30 @@ import ( const natsPort = 44446 -var conf = config.Config{ - Name: "particle", - NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort), - NATSSubject: []string{"filter-test"}, - NATSSubjectMonitor: "filter-test-monitor", - NATSSubjectJunkyard: "filter-junkyard", - Workers: 1, - Rule: []config.Rule{{ - Rtype: "basic", - Match: "hello", - Subject: "hello-subject", - }}, +func testConfig() *config.Config { + return &config.Config{ + Name: "particle", + NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort), + NATSSubject: []string{"filter-test"}, + NATSSubjectMonitor: "filter-test-monitor", + NATSSubjectJunkyard: "filter-junkyard", + Workers: 1, + MaxTimeDeltaSecs: 600, + Rule: []config.Rule{{ + Rtype: "basic", + Match: "hello", + Subject: "hello-subject", + }}, + } } func TestFilterWorker(t *testing.T) { gnatsd := spouttest.RunGnatsd(natsPort) defer gnatsd.Shutdown() - filter, err := StartFilter(&conf) + conf := testConfig() + + filter, err := StartFilter(conf) require.NoError(t, err) defer filter.Stop() @@ -98,7 +105,65 @@ goodbye,host=gopher01 // Receive total stats spouttest.AssertRecvMulti(t, statsCh, "stats", ` -spout_stat_filter,filter=particle passed=2,processed=3,rejected=1 +spout_stat_filter,filter=particle passed=2,processed=3,rejected=1,invalid-time=0 +`) + + // Receive rule specific stats + spouttest.AssertRecvMulti(t, statsCh, "rule stats", ` +spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2 +`) +} + +func TestInvalidTimeStamps(t *testing.T) { + gnatsd := spouttest.RunGnatsd(natsPort) + defer gnatsd.Shutdown() + + conf := testConfig() + conf.MaxTimeDeltaSecs = 10 + + filter, err := StartFilter(conf) + require.NoError(t, err) + defer filter.Stop() + + nc, err := nats.Connect(conf.NATSAddress) + require.NoError(t, err) + defer nc.Close() + + // Subscribe to filter output + helloCh := make(chan string, 1) + _, err = nc.Subscribe(conf.Rule[0].Subject, func(msg *nats.Msg) { + helloCh <- string(msg.Data) + }) + require.NoError(t, err) + + // Subscribe to stats output + statsCh := make(chan string, 10) + _, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) { + statsCh <- string(msg.Data) + }) + require.NoError(t, err) + + // Publish 3 lines. + // The first should be rejected because it is too old. + // The second should be rejected because it is too new. + // The third should make it through because it is current. + // The fourth should make it through because it has no timestamp. + now := time.Now() + lines := []string{ + fmt.Sprintf("hello,instance=0 foo=0 %d", now.Add(-time.Second*11).UnixNano()), + fmt.Sprintf("hello,instance=1 foo=0 %d", now.Add(time.Second*11).UnixNano()), + fmt.Sprintf("hello,instance=2 foo=1 %d", now.UnixNano()), + "hello,instance=2 foo=3", + } + err = nc.Publish(conf.NATSSubject[0], []byte(strings.Join(lines, "\n"))) + require.NoError(t, err) + + // Expect to see the 3rd & 4th lines. + spouttest.AssertRecv(t, helloCh, "helloCh", strings.Join(lines[2:], "\n")) + + // Receive total stats. + spouttest.AssertRecvMulti(t, statsCh, "stats", ` +spout_stat_filter,filter=particle passed=2,processed=4,rejected=0,invalid-time=2 `) // Receive rule specific stats diff --git a/filter/rules_small_test.go b/filter/rules_small_test.go index d8ea719..c08e5f9 100644 --- a/filter/rules_small_test.go +++ b/filter/rules_small_test.go @@ -17,7 +17,10 @@ package filter import ( + "strconv" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -188,22 +191,31 @@ func BenchmarkProcessBatch(b *testing.B) { rs.Append(CreateBasicRule("hello", "hello-out")) rs.Append(CreateRegexRule("foo|bar", "foobar-out")) - w, err := newWorker(rs, initStats(rs), nullNATSConnect, "junk") + w, err := newWorker(600, rs, initStats(rs), false, nullNATSConnect, "junk") require.NoError(b, err) - batch := []byte(` -hello,host=gopher01 somefield=11,etc=false -bar,host=gopher02 somefield=14 -pepsi host=gopher01,cheese=stilton -hello,host=gopher01 somefield=11,etc=false -bar,host=gopher02 somefield=14 -pepsi host=gopher01,cheese=stilton -hello,host=gopher01 somefield=11,etc=false -bar,host=gopher02 somefield=14 -pepsi host=gopher01,cheese=stilton -`[1:]) + lines := []string{ + "hello,host=gopher01 somefield=11,etc=false", + "bar,host=gopher02 somefield=14", + "pepsi host=gopher01,cheese=stilton", + "hello,host=gopher01 somefield=11,etc=false", + "bar,host=gopher02 somefield=14", + "pepsi host=gopher01,cheese=stilton", + "hello,host=gopher01 somefield=11,etc=false", + "bar,host=gopher02 somefield=14", + "pepsi host=gopher01,cheese=stilton", + } + + // Add a timestamp to each line. + ts := strconv.FormatInt(time.Now().UnixNano(), 10) + for i, line := range lines { + lines[i] = line + " " + ts + } + + batch := []byte(strings.Join(lines, "\n")) b.ResetTimer() + for i := 0; i < b.N; i++ { w.processBatch(batch) } diff --git a/filter/timestamp_small_test.go b/filter/timestamp_small_test.go new file mode 100644 index 0000000..3310a78 --- /dev/null +++ b/filter/timestamp_small_test.go @@ -0,0 +1,83 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build small + +package filter + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtractTimestamp(t *testing.T) { + ts := time.Date(1997, 6, 5, 4, 3, 2, 1, time.UTC).UnixNano() + tsStr := strconv.FormatInt(ts, 10) + defaultTs := time.Now().UnixNano() + + check := func(input string, expected int64) { + assert.Equal( + t, expected, + extractTimestamp([]byte(input), defaultTs), + "extractTimestamp(%q)", input) + } + + check("", defaultTs) + check(" ", defaultTs) + check("weather temp=99", defaultTs) + check("weather,city=paris temp=60", defaultTs) + check("weather,city=paris temp=99,humidity=100", defaultTs) + check("weather temp=99 "+tsStr, ts) + check("weather temp=99 "+tsStr+"\n", ts) + check("weather,city=paris temp=60 "+tsStr, ts) + check("weather,city=paris temp=60,humidity=100 "+tsStr, ts) + check("weather,city=paris temp=60,humidity=100 "+tsStr+"\n", ts) + + // Various invalid timestamps + check("weather temp=99 "+tsStr+" ", defaultTs) + check("weather temp=99 xxxxxxxxxxxxxxxxxxx", defaultTs) + check("weather temp=99 152076148x803180202", defaultTs) // non-digit embedded + check("weather temp=99 11520761485803180202", defaultTs) // too long + check("weather temp=99 -"+tsStr, defaultTs) + check(tsStr, defaultTs) +} + +func TestFastParseInt(t *testing.T) { + check := func(input string, expected int64) { + actual, err := fastParseInt([]byte(input)) + require.NoError(t, err) + assert.Equal(t, expected, actual, "fastParseInt(%q)", input) + } + + shouldFail := func(input string) { + _, err := fastParseInt([]byte(input)) + assert.Error(t, err) + } + + check("0", 0) + check("1", 1) + check("9", 9) + check("10", 10) + check("99", 99) + check("101", 101) + check("9223372036854775807", (1<<63)-1) // max int64 value + + shouldFail("9223372036854775808") // max int64 value + 1 + shouldFail("-1") // negatives not supported + shouldFail("x") +} diff --git a/filter/worker.go b/filter/worker.go index 1692a6b..7874ddb 100644 --- a/filter/worker.go +++ b/filter/worker.go @@ -16,24 +16,31 @@ package filter import ( "bytes" + "errors" "fmt" + "log" "sync" + "time" "github.com/jumptrading/influx-spout/stats" ) type worker struct { - rules *RuleSet - stats *stats.Stats - nc natsConn - junkSubject string - batches []*bytes.Buffer - junkBatch *bytes.Buffer + maxTsDeltaNs int64 + rules *RuleSet + stats *stats.Stats + debug bool + nc natsConn + junkSubject string + batches []*bytes.Buffer + junkBatch *bytes.Buffer } func newWorker( + maxTsDeltaSecs int, rules *RuleSet, stats *stats.Stats, + debug bool, natsConnect func() (natsConn, error), junkSubject string, ) (*worker, error) { @@ -49,12 +56,13 @@ func newWorker( } return &worker{ - rules: rules, - stats: stats, - nc: nc, - batches: batches, - junkBatch: new(bytes.Buffer), - junkSubject: junkSubject, + maxTsDeltaNs: int64(maxTsDeltaSecs) * 1e9, + rules: rules, + stats: stats, + nc: nc, + batches: batches, + junkBatch: new(bytes.Buffer), + junkSubject: junkSubject, }, nil } @@ -75,9 +83,24 @@ func (w *worker) run(jobs <-chan []byte, stop <-chan struct{}, wg *sync.WaitGrou } func (w *worker) processBatch(batch []byte) { + now := time.Now().UnixNano() + minTs := now - w.maxTsDeltaNs + maxTs := now + w.maxTsDeltaNs + for _, line := range bytes.SplitAfter(batch, []byte("\n")) { - if len(line) > 0 { + if len(line) == 0 { + continue + } + w.stats.Inc(linesProcessed) + + ts := extractTimestamp(line, now) + if minTs < ts && ts < maxTs { w.processLine(line) + } else { + w.stats.Inc(linesInvalidTime) + if w.debug { + log.Printf("invalid line timestamp: %q", string(line)) + } } } @@ -86,8 +109,6 @@ func (w *worker) processBatch(batch []byte) { } func (w *worker) processLine(line []byte) { - w.stats.Inc(linesProcessed) - idx := w.rules.Lookup(line) if idx == -1 { // no rule for this => junkyard @@ -118,3 +139,63 @@ func (w *worker) sendOff() { w.junkBatch.Reset() } } + +// Any realistic timestamp will be 18 or 19 characters long. +const minTsLen = 18 +const maxTsLen = 19 + +func extractTimestamp(line []byte, defaultTs int64) int64 { + length := len(line) + + // Reject lines that are too short to have a timestamp. + if length <= minTsLen { + return defaultTs + } + + // Remove trailing newline. + if line[length-1] == '\n' { + length-- + line = line[:length] + } + + // Expect a space just before the timestamp. + for i := length - maxTsLen - 1; i < length-minTsLen; i++ { + if line[i] == ' ' { + out, err := fastParseInt(line[i+1:]) + if err != nil { + return defaultTs + } + return out + } + } + return defaultTs +} + +const int64Max = (1 << 63) - 1 + +// fastParseInt is a simpler, faster version of strconv.ParseInt(). +// Differences to ParseInt: +// - input is []byte instead of a string (no type conversion required +// by caller) +// - only supports base 10 input +// - only handles positive values +func fastParseInt(s []byte) (int64, error) { + if len(s) == 0 { + return 0, errors.New("empty") + } + + var n uint64 + for _, c := range s { + if '0' <= c && c <= '9' { + c -= '0' + } else { + return 0, errors.New("invalid char") + } + n = n*10 + uint64(c) + } + + if n > int64Max { + return 0, errors.New("overflow") + } + return int64(n), nil +}