From 2e957c200930bc4f8af27ab0580023c1ced022b4 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 2 Jun 2016 18:47:15 +0100 Subject: [PATCH] logparser input plugin closes #102 closes #328 --- Godeps | 1 + filter/filter.go | 79 +++++ filter/filter_test.go | 96 ++++++ internal/internal.go | 23 -- internal/internal_test.go | 31 -- internal/models/filter.go | 36 ++- internal/models/filter_test.go | 45 --- plugins/inputs/all/all.go | 1 + plugins/inputs/logparser/README.md | 8 + plugins/inputs/logparser/grok/grok.go | 287 ++++++++++++++++++ plugins/inputs/logparser/grok/grok_test.go | 198 ++++++++++++ .../logparser/grok/patterns/influx-patterns | 65 ++++ .../logparser/grok/testdata/test-patterns | 11 + .../inputs/logparser/grok/testdata/test_a.log | 1 + .../inputs/logparser/grok/testdata/test_b.log | 1 + plugins/inputs/logparser/logparser.go | 197 ++++++++++++ plugins/inputs/logparser/logparser_test.go | 1 + plugins/inputs/varnish/varnish.go | 9 +- 18 files changed, 967 insertions(+), 123 deletions(-) create mode 100644 filter/filter.go create mode 100644 filter/filter_test.go create mode 100644 plugins/inputs/logparser/README.md create mode 100644 plugins/inputs/logparser/grok/grok.go create mode 100644 plugins/inputs/logparser/grok/grok_test.go create mode 100644 plugins/inputs/logparser/grok/patterns/influx-patterns create mode 100644 plugins/inputs/logparser/grok/testdata/test-patterns create mode 100644 plugins/inputs/logparser/grok/testdata/test_a.log create mode 100644 plugins/inputs/logparser/grok/testdata/test_b.log create mode 100644 plugins/inputs/logparser/logparser.go create mode 100644 plugins/inputs/logparser/logparser_test.go diff --git a/Godeps b/Godeps index 2ac95a90455eb..f47a578062707 100644 --- a/Godeps +++ b/Godeps @@ -47,6 +47,7 @@ github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c +github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 diff --git a/filter/filter.go b/filter/filter.go new file mode 100644 index 0000000000000..85eed17ac0e7c --- /dev/null +++ b/filter/filter.go @@ -0,0 +1,79 @@ +package filter + +import ( + "strings" + + "github.com/gobwas/glob" +) + +type Filter interface { + Match(string) bool +} + +// CompileFilter takes a list of string filters and returns a Filter interface +// for matching a given string against the filter list. The filter list +// supports glob matching too, ie: +// +// f, _ := CompileFilter([]string{"cpu", "mem", "net*"}) +// f.Match("cpu") // true +// f.Match("network") // true +// f.Match("memory") // false +// +func CompileFilter(filters []string) (Filter, error) { + // return if there is nothing to compile + if len(filters) == 0 { + return nil, nil + } + + // check if we can compile a non-glob filter + noGlob := true + for _, filter := range filters { + if hasMeta(filter) { + noGlob = false + break + } + } + + switch { + case noGlob: + // return non-globbing filter if not needed. + return compileFilterNoGlob(filters), nil + case len(filters) == 1: + return glob.Compile(filters[0]) + default: + return glob.Compile("{" + strings.Join(filters, ",") + "}") + } +} + +// hasMeta reports whether path contains any magic glob characters. +func hasMeta(s string) bool { + return strings.IndexAny(s, "*?[") >= 0 +} + +type filter struct { + m map[string]struct{} +} + +func (f *filter) Match(s string) bool { + _, ok := f.m[s] + return ok +} + +type filtersingle struct { + s string +} + +func (f *filtersingle) Match(s string) bool { + return f.s == s +} + +func compileFilterNoGlob(filters []string) Filter { + if len(filters) == 1 { + return &filtersingle{s: filters[0]} + } + out := filter{m: make(map[string]struct{})} + for _, filter := range filters { + out.m[filter] = struct{}{} + } + return &out +} diff --git a/filter/filter_test.go b/filter/filter_test.go new file mode 100644 index 0000000000000..85072e2ac5354 --- /dev/null +++ b/filter/filter_test.go @@ -0,0 +1,96 @@ +package filter + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompileFilter(t *testing.T) { + f, err := CompileFilter([]string{}) + assert.NoError(t, err) + assert.Nil(t, f) + + f, err = CompileFilter([]string{"cpu"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.False(t, f.Match("cpu0")) + assert.False(t, f.Match("mem")) + + f, err = CompileFilter([]string{"cpu*"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.True(t, f.Match("cpu0")) + assert.False(t, f.Match("mem")) + + f, err = CompileFilter([]string{"cpu", "mem"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.False(t, f.Match("cpu0")) + assert.True(t, f.Match("mem")) + + f, err = CompileFilter([]string{"cpu", "mem", "net*"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.False(t, f.Match("cpu0")) + assert.True(t, f.Match("mem")) + assert.True(t, f.Match("network")) +} + +var benchbool bool + +func BenchmarkFilterSingleNoGlobFalse(b *testing.B) { + f, _ := CompileFilter([]string{"cpu"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("network") + } + benchbool = tmp +} + +func BenchmarkFilterSingleNoGlobTrue(b *testing.B) { + f, _ := CompileFilter([]string{"cpu"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("cpu") + } + benchbool = tmp +} + +func BenchmarkFilter(b *testing.B) { + f, _ := CompileFilter([]string{"cpu", "mem", "net*"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("network") + } + benchbool = tmp +} + +func BenchmarkFilterNoGlob(b *testing.B) { + f, _ := CompileFilter([]string{"cpu", "mem", "net"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("net") + } + benchbool = tmp +} + +func BenchmarkFilter2(b *testing.B) { + f, _ := CompileFilter([]string{"aa", "bb", "c", "ad", "ar", "at", "aq", + "aw", "az", "axxx", "ab", "cpu", "mem", "net*"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("network") + } + benchbool = tmp +} + +func BenchmarkFilter2NoGlob(b *testing.B) { + f, _ := CompileFilter([]string{"aa", "bb", "c", "ad", "ar", "at", "aq", + "aw", "az", "axxx", "ab", "cpu", "mem", "net"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("net") + } + benchbool = tmp +} diff --git a/internal/internal.go b/internal/internal.go index 27a24f02162e6..4c90d11b9a62a 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -17,8 +17,6 @@ import ( "strings" "time" "unicode" - - "github.com/gobwas/glob" ) const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -209,27 +207,6 @@ func WaitTimeout(c *exec.Cmd, timeout time.Duration) error { } } -// CompileFilter takes a list of glob "filters", ie: -// ["MAIN.*", "CPU.*", "NET"] -// and compiles them into a glob object. This glob object can -// then be used to match keys to the filter. -func CompileFilter(filters []string) (glob.Glob, error) { - var out glob.Glob - - // return if there is nothing to compile - if len(filters) == 0 { - return out, nil - } - - var err error - if len(filters) == 1 { - out, err = glob.Compile(filters[0]) - } else { - out, err = glob.Compile("{" + strings.Join(filters, ",") + "}") - } - return out, err -} - // RandomSleep will sleep for a random amount of time up to max. // If the shutdown channel is closed, it will return before it has finished // sleeping. diff --git a/internal/internal_test.go b/internal/internal_test.go index 31bb5ec612bb4..213e94d3d05d3 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -107,37 +107,6 @@ func TestRunError(t *testing.T) { assert.Error(t, err) } -func TestCompileFilter(t *testing.T) { - f, err := CompileFilter([]string{}) - assert.NoError(t, err) - assert.Nil(t, f) - - f, err = CompileFilter([]string{"cpu"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.False(t, f.Match("cpu0")) - assert.False(t, f.Match("mem")) - - f, err = CompileFilter([]string{"cpu*"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.True(t, f.Match("cpu0")) - assert.False(t, f.Match("mem")) - - f, err = CompileFilter([]string{"cpu", "mem"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.False(t, f.Match("cpu0")) - assert.True(t, f.Match("mem")) - - f, err = CompileFilter([]string{"cpu", "mem", "net*"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.False(t, f.Match("cpu0")) - assert.True(t, f.Match("mem")) - assert.True(t, f.Match("network")) -} - func TestRandomSleep(t *testing.T) { // test that zero max returns immediately s := time.Now() diff --git a/internal/models/filter.go b/internal/models/filter.go index 71d71c23edbcc..ac24ec667c4b5 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -3,80 +3,78 @@ package internal_models import ( "fmt" - "github.com/gobwas/glob" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/filter" ) // TagFilter is the name of a tag, and the values on which to filter type TagFilter struct { Name string Filter []string - filter glob.Glob + filter filter.Filter } // Filter containing drop/pass and tagdrop/tagpass rules type Filter struct { NameDrop []string - nameDrop glob.Glob + nameDrop filter.Filter NamePass []string - namePass glob.Glob + namePass filter.Filter FieldDrop []string - fieldDrop glob.Glob + fieldDrop filter.Filter FieldPass []string - fieldPass glob.Glob + fieldPass filter.Filter TagDrop []TagFilter TagPass []TagFilter TagExclude []string - tagExclude glob.Glob + tagExclude filter.Filter TagInclude []string - tagInclude glob.Glob + tagInclude filter.Filter IsActive bool } -// Compile all Filter lists into glob.Glob objects. +// Compile all Filter lists into filter.Filter objects. func (f *Filter) CompileFilter() error { var err error - f.nameDrop, err = internal.CompileFilter(f.NameDrop) + f.nameDrop, err = filter.CompileFilter(f.NameDrop) if err != nil { return fmt.Errorf("Error compiling 'namedrop', %s", err) } - f.namePass, err = internal.CompileFilter(f.NamePass) + f.namePass, err = filter.CompileFilter(f.NamePass) if err != nil { return fmt.Errorf("Error compiling 'namepass', %s", err) } - f.fieldDrop, err = internal.CompileFilter(f.FieldDrop) + f.fieldDrop, err = filter.CompileFilter(f.FieldDrop) if err != nil { return fmt.Errorf("Error compiling 'fielddrop', %s", err) } - f.fieldPass, err = internal.CompileFilter(f.FieldPass) + f.fieldPass, err = filter.CompileFilter(f.FieldPass) if err != nil { return fmt.Errorf("Error compiling 'fieldpass', %s", err) } - f.tagExclude, err = internal.CompileFilter(f.TagExclude) + f.tagExclude, err = filter.CompileFilter(f.TagExclude) if err != nil { return fmt.Errorf("Error compiling 'tagexclude', %s", err) } - f.tagInclude, err = internal.CompileFilter(f.TagInclude) + f.tagInclude, err = filter.CompileFilter(f.TagInclude) if err != nil { return fmt.Errorf("Error compiling 'taginclude', %s", err) } for i, _ := range f.TagDrop { - f.TagDrop[i].filter, err = internal.CompileFilter(f.TagDrop[i].Filter) + f.TagDrop[i].filter, err = filter.CompileFilter(f.TagDrop[i].Filter) if err != nil { return fmt.Errorf("Error compiling 'tagdrop', %s", err) } } for i, _ := range f.TagPass { - f.TagPass[i].filter, err = internal.CompileFilter(f.TagPass[i].Filter) + f.TagPass[i].filter, err = filter.CompileFilter(f.TagPass[i].Filter) if err != nil { return fmt.Errorf("Error compiling 'tagpass', %s", err) } diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go index a374160950a54..454f10c4596f7 100644 --- a/internal/models/filter_test.go +++ b/internal/models/filter_test.go @@ -253,51 +253,6 @@ func TestFilter_TagDrop(t *testing.T) { } } -func TestFilter_CompileFilterError(t *testing.T) { - f := Filter{ - NameDrop: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - NamePass: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - FieldDrop: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - FieldPass: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - TagExclude: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - TagInclude: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - filters := []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"{foobar}"}, - }} - f = Filter{ - TagDrop: filters, - } - require.Error(t, f.CompileFilter()) - filters = []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"{foobar}"}, - }} - f = Filter{ - TagPass: filters, - } - require.Error(t, f.CompileFilter()) -} - func TestFilter_ShouldMetricsPass(t *testing.T) { m := testutil.TestMetric(1, "testmetric") f := Filter{ diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 1a386d97c7a0f..1d84724699629 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -29,6 +29,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" + _ "github.com/influxdata/telegraf/plugins/inputs/logparser" _ "github.com/influxdata/telegraf/plugins/inputs/lustre2" _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" _ "github.com/influxdata/telegraf/plugins/inputs/memcached" diff --git a/plugins/inputs/logparser/README.md b/plugins/inputs/logparser/README.md new file mode 100644 index 0000000000000..7dfa04cb6b3b4 --- /dev/null +++ b/plugins/inputs/logparser/README.md @@ -0,0 +1,8 @@ +# logparser Input Plugin + +### Configuration: + +```toml + +``` + diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go new file mode 100644 index 0000000000000..40a2b175f81bd --- /dev/null +++ b/plugins/inputs/logparser/grok/grok.go @@ -0,0 +1,287 @@ +package grok + +import ( + "bufio" + "fmt" + "log" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/vjeantet/grok" + + "github.com/influxdata/telegraf" +) + +var timeFormats = map[string]string{ + "ts-ansic": "Mon Jan _2 15:04:05 2006", + "ts-unix": "Mon Jan _2 15:04:05 MST 2006", + "ts-ruby": "Mon Jan 02 15:04:05 -0700 2006", + "ts-rfc822": "02 Jan 06 15:04 MST", + "ts-rfc822z": "02 Jan 06 15:04 -0700", // RFC822 with numeric zone + "ts-rfc850": "Monday, 02-Jan-06 15:04:05 MST", + "ts-rfc1123": "Mon, 02 Jan 2006 15:04:05 MST", + "ts-rfc1123z": "Mon, 02 Jan 2006 15:04:05 -0700", // RFC1123 with numeric zone + "ts-rfc3339": "2006-01-02T15:04:05Z07:00", + "ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00", + "ts-httpd": "02/Jan/2006:15:04:05 -0700", +} + +const ( + INT = "int" + TAG = "tag" + FLOAT = "float" + STRING = "string" + DURATION = "duration" + DROP = "drop" +) + +var ( + // matches named captures that contain a type. + // ie, + // %{NUMBER:bytes:int} + // %{IPORHOST:clientip:tag} + // %{HTTPDATE:ts1:ts-http} + // %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"} + typedRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`) + // matches a plain pattern name. ie, %{NUMBER} + patternOnlyRe = regexp.MustCompile(`%{(\w+)}`) +) + +type Parser struct { + Patterns []string + CustomPatterns string + CustomPatternFiles []string + + // typeMap is a map of patterns -> capture name -> modifier, + // ie, { + // "%{TESTLOG}": + // { + // "bytes": "int", + // "clientip": "tag" + // } + // } + typeMap map[string]map[string]string + // tsMap is a map of patterns -> capture name -> timestamp layout. + // ie, { + // "%{TESTLOG}": + // { + // "httptime": "02/Jan/2006:15:04:05 -0700" + // } + // } + tsMap map[string]map[string]string + // patterns is a map of all of the parsed patterns from CustomPatterns + // and CustomPatternFiles. + // ie, { + // "DURATION": "%{NUMBER}[nuµm]?s" + // "RESPONSE_CODE": "%{NUMBER:rc:tag}" + // } + patterns map[string]string + + g *grok.Grok +} + +func (p *Parser) Compile() error { + p.typeMap = make(map[string]map[string]string) + p.tsMap = make(map[string]map[string]string) + p.patterns = make(map[string]string) + var err error + p.g, err = grok.NewWithConfig(&grok.Config{NamedCapturesOnly: true}) + if err != nil { + return err + } + + if len(p.CustomPatternFiles) != 0 { + for _, filename := range p.CustomPatternFiles { + file, err := os.Open(filename) + if err != nil { + return err + } + + scanner := bufio.NewScanner(bufio.NewReader(file)) + p.addCustomPatterns(scanner) + } + } + + if len(p.CustomPatterns) != 0 { + scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) + p.addCustomPatterns(scanner) + } + + return p.compileCustomPatterns() +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + var err error + var values map[string]string + // the matching pattern string + var patternName string + for _, pattern := range p.Patterns { + if values, err = p.g.Parse(pattern, line); err != nil { + return nil, err + } + if len(values) != 0 { + patternName = pattern + break + } + } + + if len(values) == 0 { + return nil, nil + } + + fields := make(map[string]interface{}) + tags := make(map[string]string) + timestamp := time.Now() + for k, v := range values { + if k == "" || v == "" { + continue + } + + var t string + // if this key has a type, use it + // Otherwise leave the type blank, which will be treated as a string field. + if types, ok := p.typeMap[patternName]; ok { + t = types[k] + } + + switch t { + case INT: + iv, err := strconv.ParseInt(v, 10, 64) + if err != nil { + log.Printf("ERROR parsing %s to int: %s", v, err) + } else { + fields[k] = iv + } + case FLOAT: + fv, err := strconv.ParseFloat(v, 64) + if err != nil { + log.Printf("ERROR parsing %s to float: %s", v, err) + } else { + fields[k] = fv + } + case DURATION: + d, err := time.ParseDuration(v) + if err != nil { + log.Printf("ERROR parsing %s to duration: %s", v, err) + } else { + fields[k] = int64(d) + } + case TAG: + tags[k] = v + case STRING: + fields[k] = v + case DROP: + // goodbye! + default: + // check if this pattern has timestamp types + if ts, ok := p.tsMap[patternName]; ok { + // check if the type is a timestamp type + if layout, ok := ts[k]; ok { + ts, err := time.Parse(layout, v) + if err == nil { + timestamp = ts + } else { + log.Printf("ERROR parsing %s to time layout [%s]: %s", v, layout, err) + } + } else { + // no types found, so just assume it's a string field. + fields[k] = v + } + } else { + // no types found, so just assume it's a string field. + fields[k] = v + } + } + } + + return telegraf.NewMetric("grok", tags, fields, timestamp) +} + +func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) > 0 && line[0] != '#' { + names := strings.SplitN(line, " ", 2) + p.patterns[names[0]] = names[1] + } + } +} + +func (p *Parser) compileCustomPatterns() error { + var err error + // check if the pattern contains a subpattern that is already defined + // replace it with the subpattern for modifier inheritance. + for name, pattern := range p.patterns { + subNames := patternOnlyRe.FindAllStringSubmatch(pattern, -1) + for _, subName := range subNames { + if subPattern, ok := p.patterns[subName[1]]; ok { + pattern = strings.Replace(pattern, subName[0], subPattern, 1) + } + } + p.patterns[name] = pattern + } + + // check if pattern contains modifiers. Parse them out if it does. + for name, pattern := range p.patterns { + if typedRe.MatchString(pattern) { + // this pattern has modifiers, so parse out the modifiers + pattern, err = p.parseTypedCaptures(name, pattern) + if err != nil { + return err + } + p.patterns[name] = pattern + } + } + + return p.g.AddPatternsFromMap(p.patterns) +} + +// parseTypedCaptures parses the capture types, and then deletes the type from +// the line so that it is a valid "grok" pattern again. +// ie, +// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int) +// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag) +func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) { + matches := typedRe.FindAllStringSubmatch(pattern, -1) + + // grab the name of the capture pattern + patternName := "%{" + name + "}" + // create type map for this pattern + p.typeMap[patternName] = make(map[string]string) + p.tsMap[patternName] = make(map[string]string) + + // boolean to verify that each pattern only has a single ts- data type. + hasTimestamp := false + for _, match := range matches { + if len(match) < 3 { + continue + } + + // regex capture 1 is the name of the capture + // regex capture 2 is the type of the capture + if strings.HasPrefix(match[2], "ts-") { + if hasTimestamp { + return pattern, fmt.Errorf("logparser pattern compile error: "+ + "Each pattern is allowed only one named "+ + "timestamp data type. pattern: %s", pattern) + } + if f, ok := timeFormats[match[2]]; ok { + p.tsMap[patternName][match[1]] = f + } else { + p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`) + } + hasTimestamp = true + } else { + p.typeMap[patternName][match[1]] = match[2] + } + + // the modifier is not a valid part of a "grok" pattern, so remove it + // from the pattern. + pattern = strings.Replace(pattern, ":"+match[2]+"}", "}", 1) + } + + return pattern, nil +} diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go new file mode 100644 index 0000000000000..ed6158f797e3c --- /dev/null +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -0,0 +1,198 @@ +package grok + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCompileStringAndParse(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatterns: ` + DURATION %{NUMBER}[nuµm]?s + RESPONSE_CODE %{NUMBER:response_code:tag} + RESPONSE_TIME %{DURATION:response_time:duration} + TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} + `, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "clientip": "192.168.1.1", + "myfloat": float64(1.25), + "response_time": int64(5432), + }, + metricA.Fields()) + assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags()) +} + +func TestCompileFileAndParse(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatternFiles: []string{"./testdata/test-patterns"}, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "clientip": "192.168.1.1", + "myfloat": float64(1.25), + "response_time": int64(5432), + "myint": int64(101), + }, + metricA.Fields()) + assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags()) + assert.Equal(t, + time.Date(2016, time.June, 4, 12, 41, 45, 0, time.FixedZone("foo", 60*60)).Nanosecond(), + metricA.Time().Nanosecond()) + + metricB, err := p.ParseLine(`[04/06/2016--12:41:45] 1.25 mystring dropme nomodifier`) + require.NotNil(t, metricB) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "myfloat": 1.25, + "mystring": "mystring", + "nomodifier": "nomodifier", + }, + metricB.Fields()) + assert.Equal(t, map[string]string{}, metricB.Tags()) + assert.Equal(t, + time.Date(2016, time.June, 4, 12, 41, 45, 0, time.FixedZone("foo", 60*60)).Nanosecond(), + metricB.Time().Nanosecond()) +} + +func TestCompileNoModifiersAndParse(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_C}"}, + CustomPatterns: ` + DURATION %{NUMBER}[nuµm]?s + TEST_LOG_C %{NUMBER:myfloat} %{NUMBER} %{IPORHOST:clientip} %{DURATION:rt} + `, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "clientip": "192.168.1.1", + "myfloat": "1.25", + "rt": "5.432µs", + }, + metricA.Fields()) + assert.Equal(t, map[string]string{}, metricA.Tags()) +} + +func TestCompileNoNamesAndParse(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_C}"}, + CustomPatterns: ` + DURATION %{NUMBER}[nuµm]?s + TEST_LOG_C %{NUMBER} %{NUMBER} %{IPORHOST} %{DURATION} + `, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`) + require.Nil(t, metricA) + assert.NoError(t, err) +} + +func TestParseNoMatch(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatternFiles: []string{"./testdata/test-patterns"}, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`) + assert.NoError(t, err) + assert.Nil(t, metricA) +} + +func TestCompileErrors(t *testing.T) { + // Compile fails because there are multiple timestamps: + p := &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatterns: ` + TEST_LOG_A %{HTTPDATE:ts1:ts-httpd} %{HTTPDATE:ts2:ts-httpd} %{NUMBER:mynum:int} + `, + } + assert.Error(t, p.Compile()) + + // Compile fails because file doesn't exist: + p = &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatternFiles: []string{"/tmp/foo/bar/baz"}, + } + assert.Error(t, p.Compile()) +} + +func TestParseErrors(t *testing.T) { + // Parse fails because the pattern doesn't exist + p := &Parser{ + Patterns: []string{"%{TEST_LOG_B}"}, + CustomPatterns: ` + TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{} + `, + } + assert.NoError(t, p.Compile()) + _, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`) + assert.Error(t, err) + + // Parse fails because myword is not an int + p = &Parser{ + Patterns: []string{"%{TEST_LOG_A}"}, + CustomPatterns: ` + TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} + `, + } + assert.NoError(t, p.Compile()) + _, err = p.ParseLine(`04/Jun/2016:12:41:45 +0100 notnumber`) + assert.Error(t, err) + + // Parse fails because myword is not a float + p = &Parser{ + Patterns: []string{"%{TEST_LOG_A}"}, + CustomPatterns: ` + TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:float} + `, + } + assert.NoError(t, p.Compile()) + _, err = p.ParseLine(`04/Jun/2016:12:41:45 +0100 notnumber`) + assert.Error(t, err) + + // Parse fails because myword is not a duration + p = &Parser{ + Patterns: []string{"%{TEST_LOG_A}"}, + CustomPatterns: ` + TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:duration} + `, + } + assert.NoError(t, p.Compile()) + _, err = p.ParseLine(`04/Jun/2016:12:41:45 +0100 notnumber`) + assert.Error(t, err) + + // Parse fails because the time layout is wrong. + p = &Parser{ + Patterns: []string{"%{TEST_LOG_A}"}, + CustomPatterns: ` + TEST_LOG_A %{HTTPDATE:ts:ts-unix} %{WORD:myword:duration} + `, + } + assert.NoError(t, p.Compile()) + _, err = p.ParseLine(`04/Jun/2016:12:41:45 +0100 notnumber`) + assert.Error(t, err) +} diff --git a/plugins/inputs/logparser/grok/patterns/influx-patterns b/plugins/inputs/logparser/grok/patterns/influx-patterns new file mode 100644 index 0000000000000..5889c3e170c25 --- /dev/null +++ b/plugins/inputs/logparser/grok/patterns/influx-patterns @@ -0,0 +1,65 @@ +# Captures are a slightly modified version of logstash "grok" patterns, with +# the format %{[:][:]} +# By default all named captures are converted into string fields. +# Modifiers can be used to convert captures to other types or tags. +# Timestamp modifiers can be used to convert captures to the timestamp of the +# parsed metric. + +# View logstash grok pattern docs here: +# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html +# All default logstash patterns are supported, these can be viewed here: +# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns + +# Available modifiers: +# string (default if nothing is specified) +# int +# float +# duration (ie, 5.23ms gets converted to int nanoseconds) +# tag (converts the field into a tag) +# drop (drops the field completely) +# Timestamp modifiers: +# ts-ansic ("Mon Jan _2 15:04:05 2006") +# ts-unix ("Mon Jan _2 15:04:05 MST 2006") +# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006") +# ts-rfc822 ("02 Jan 06 15:04 MST") +# ts-rfc822z ("02 Jan 06 15:04 -0700") +# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST") +# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST") +# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700") +# ts-rfc3339 ("2006-01-02T15:04:05Z07:00") +# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00") +# ts-httpd ("02/Jan/2006:15:04:05 -0700") +# ts-"CUSTOM" + +# Example log file pattern, example log looks like this: +# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs +# Breakdown of the DURATION pattern below: +# NUMBER is a builtin logstash grok pattern matching float & int numbers. +# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets. +# s is also regex, this pattern must end in "s". +DURATION %{NUMBER}[nuµm]?s +RESPONSE_CODE %{NUMBER:response_code:tag} +RESPONSE_TIME %{DURATION:response_time:duration} +EXAMPLE_LOG \[%{HTTPDATE:timestamp:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} + +## +## COMMON LOG PATTERNS +## + +# InfluxDB log patterns +TIMESTAMP %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME} +CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1) +INFLUXDB_HTTPD_LOG \[httpd\] %{TIMESTAMP} %{CLIENT:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp:ts-httpd}\] %{WORD:httpmethod:tag} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response:tag} (?:%{NUMBER:bytes:int}|-) (?:%{URI:referrer}|-) %{WORD:agent} %{UUID:uuid:drop} %{DURATION:response_time:duration} + +# Apache logs, this is also known as the "common log format" +# see https://en.wikipedia.org/wiki/Common_Log_Format +APACHE_LOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion:float})?|%{DATA})" %{NUMBER:response:int} (?:%{NUMBER:bytes:int}|-) + +# Combined Log Format is a general log format common to apache and nginx. +# This is the APACHE_LOG with two extra quoted strings as the end for referrer and agent. +COMBINED_LOG_FORMAT %{APACHE_LOG} %{QS:referrer} %{QS:agent} + +# HTTPD log formats +HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg} +HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message} +HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} diff --git a/plugins/inputs/logparser/grok/testdata/test-patterns b/plugins/inputs/logparser/grok/testdata/test-patterns new file mode 100644 index 0000000000000..8de9a5aba7d4b --- /dev/null +++ b/plugins/inputs/logparser/grok/testdata/test-patterns @@ -0,0 +1,11 @@ +# Test A log line: +# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101 +DURATION %{NUMBER}[nuµm]?s +RESPONSE_CODE %{NUMBER:response_code:tag} +RESPONSE_TIME %{DURATION:response_time:duration} +TEST_LOG_A \[%{HTTPDATE:timestamp:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} %{NUMBER:myint:int} + +# Test B log line: +# [04/06/2016--12:41:45] 1.25 mystring dropme nomodifier +TEST_TIMESTAMP %{MONTHDAY}/%{MONTHNUM}/%{YEAR}--%{TIME} +TEST_LOG_B \[%{TEST_TIMESTAMP:timestamp:ts-"02/01/2006--15:04:05"}\] %{NUMBER:myfloat:float} %{WORD:mystring:string} %{WORD:dropme:drop} %{WORD:nomodifier} diff --git a/plugins/inputs/logparser/grok/testdata/test_a.log b/plugins/inputs/logparser/grok/testdata/test_a.log new file mode 100644 index 0000000000000..49537bd21f93d --- /dev/null +++ b/plugins/inputs/logparser/grok/testdata/test_a.log @@ -0,0 +1 @@ +[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101 \ No newline at end of file diff --git a/plugins/inputs/logparser/grok/testdata/test_b.log b/plugins/inputs/logparser/grok/testdata/test_b.log new file mode 100644 index 0000000000000..ad5b7ff76a396 --- /dev/null +++ b/plugins/inputs/logparser/grok/testdata/test_b.log @@ -0,0 +1 @@ +[04/06/2016--12:41:45 +0100] 1.25 mystring dropme nomodifier \ No newline at end of file diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go new file mode 100644 index 0000000000000..644f757a3cee8 --- /dev/null +++ b/plugins/inputs/logparser/logparser.go @@ -0,0 +1,197 @@ +package logparser + +import ( + "fmt" + "log" + "reflect" + "sync" + + "github.com/hpcloud/tail" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/inputs" + + // Parsers + "github.com/influxdata/telegraf/plugins/inputs/logparser/grok" +) + +type LogParser interface { + ParseLine(line string) (telegraf.Metric, error) + Compile() error +} + +type LogParserPlugin struct { + Files []string + FromBeginning bool + + tailers []*tail.Tail + wg sync.WaitGroup + acc telegraf.Accumulator + parsers []LogParser + + sync.Mutex + + GrokParser *grok.Parser `toml:"grok"` +} + +func NewLogParserPlugin() *LogParserPlugin { + return &LogParserPlugin{ + FromBeginning: false, + } +} + +const sampleConfig = ` + ## files to tail. + ## These accept standard unix glob matching rules, but with the addition of + ## ** as a "super asterisk". ie: + ## "/var/log/**.log" -> recursively find all .log files in /var/log + ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log + ## "/var/log/apache.log" -> just tail the apache log file + ## + ## See https://github.com/gobwas/glob for more examples + ## + files = ["/var/log/influxdb/influxdb.log"] + ## Read file from beginning. + from_beginning = false + + ## For parsing logstash-style "grok" patterns: + [inputs.logparser.grok] + patterns = ["%{INFLUXDB_HTTPD_LOG}"] + custom_pattern_files = ["$HOME/influx-patterns"] + custom_patterns = ''' + ''' +` + +func (l *LogParserPlugin) SampleConfig() string { + return sampleConfig +} + +func (l *LogParserPlugin) Description() string { + return "Stream and parse log file(s)." +} + +func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { + l.Lock() + defer l.Unlock() + + l.acc = acc + + // Looks for fields which implement LogParser interface + l.parsers = []LogParser{} + s := reflect.ValueOf(l).Elem() + for i := 0; i < s.NumField(); i++ { + f := s.Field(i) + + if !f.CanInterface() { + continue + } + + if lpPlugin, ok := f.Interface().(LogParser); ok { + if reflect.ValueOf(lpPlugin).IsNil() { + continue + } + l.parsers = append(l.parsers, lpPlugin) + } + } + + if len(l.parsers) == 0 { + return fmt.Errorf("ERROR: logparser input plugin: no parser defined.") + } + + // compile log parser patterns: + for _, parser := range l.parsers { + if err := parser.Compile(); err != nil { + return err + } + } + + var seek tail.SeekInfo + if !l.FromBeginning { + seek.Whence = 2 + seek.Offset = 0 + } + + var errS string + // Create a "tailer" for each file + for _, filepath := range l.Files { + g, err := globpath.Compile(filepath) + if err != nil { + log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + } + for file, _ := range g.Match() { + tailer, err := tail.TailFile(file, + tail.Config{ + ReOpen: true, + Follow: true, + Location: &seek, + }) + if err != nil { + errS += err.Error() + " " + continue + } + // create a goroutine for each "tailer" + l.wg.Add(1) + go l.receiver(tailer) + l.tailers = append(l.tailers, tailer) + } + } + + if errS != "" { + return fmt.Errorf(errS) + } + return nil +} + +// this is launched as a goroutine to continuously watch a tailed logfile +// for changes, parse any incoming msgs, and add to the accumulator. +func (l *LogParserPlugin) receiver(tailer *tail.Tail) { + defer l.wg.Done() + + var m telegraf.Metric + var err error + var line *tail.Line + for line = range tailer.Lines { + if line.Err != nil { + log.Printf("ERROR tailing file %s, Error: %s\n", + tailer.Filename, err) + continue + } + + for _, parser := range l.parsers { + m, err = parser.ParseLine(line.Text) + if err == nil { + if m != nil { + l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + } else { + log.Printf("Malformed log line in %s: [%s], Error: %s\n", + tailer.Filename, line.Text, err) + } + } + } +} + +func (l *LogParserPlugin) Stop() { + l.Lock() + defer l.Unlock() + + for _, t := range l.tailers { + err := t.Stop() + if err != nil { + log.Printf("ERROR stopping tail on file %s\n", t.Filename) + } + t.Cleanup() + } + l.wg.Wait() +} + +func init() { + inputs.Add("logparser", func() telegraf.Input { + return NewLogParserPlugin() + }) +} diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go new file mode 100644 index 0000000000000..cc77ae1acbd1c --- /dev/null +++ b/plugins/inputs/logparser/logparser_test.go @@ -0,0 +1 @@ +package logparser diff --git a/plugins/inputs/varnish/varnish.go b/plugins/inputs/varnish/varnish.go index 1a3e4c5580162..2b0e84514d1ff 100644 --- a/plugins/inputs/varnish/varnish.go +++ b/plugins/inputs/varnish/varnish.go @@ -12,9 +12,8 @@ import ( "strings" "time" - "github.com/gobwas/glob" - "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -26,7 +25,7 @@ type Varnish struct { Stats []string Binary string - filter glob.Glob + filter filter.Filter run runner } @@ -78,13 +77,13 @@ func (s *Varnish) Gather(acc telegraf.Accumulator) error { if s.filter == nil { var err error if len(s.Stats) == 0 { - s.filter, err = internal.CompileFilter(defaultStats) + s.filter, err = filter.CompileFilter(defaultStats) } else { // legacy support, change "all" -> "*": if s.Stats[0] == "all" { s.Stats[0] = "*" } - s.filter, err = internal.CompileFilter(s.Stats) + s.filter, err = filter.CompileFilter(s.Stats) } if err != nil { return err