Skip to content

Commit

Permalink
feat(parsers/logfmt): Add tag support (#11060)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hipska authored May 17, 2022
1 parent 1b10e15 commit 442728b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 16 deletions.
5 changes: 4 additions & 1 deletion plugins/parsers/logfmt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ The `logfmt` data format parses data in [logfmt] format.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "logfmt"

## Array of key names which should be collected as tags. Globs accepted.
logfmt_tag_keys = ["method","host"]
```

## Metrics
Expand All @@ -26,5 +29,5 @@ of the field is automatically determined based on the contents of the value.

```text
- method=GET host=example.org ts=2018-07-24T19:43:40.275Z connect=4ms service=8ms status=200 bytes=1653
+ logfmt method="GET",host="example.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i
+ logfmt,host=example.org,method=GET ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i
```
28 changes: 24 additions & 4 deletions plugins/parsers/logfmt/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logfmt/logfmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/metric"
)

Expand All @@ -17,17 +18,22 @@ var (

// Parser decodes logfmt formatted messages into metrics.
type Parser struct {
TagKeys []string `toml:"logfmt_tag_keys"`

MetricName string
DefaultTags map[string]string
Now func() time.Time

tagFilter filter.Filter
}

// NewParser creates a parser.
func NewParser(metricName string, defaultTags map[string]string) *Parser {
func NewParser(metricName string, defaultTags map[string]string, tagKeys []string) *Parser {
return &Parser{
MetricName: metricName,
DefaultTags: defaultTags,
Now: time.Now,
TagKeys: tagKeys,
}
}

Expand All @@ -46,14 +52,17 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
break
}
fields := make(map[string]interface{})
tags := make(map[string]string)
for decoder.ScanKeyval() {
if string(decoder.Value()) == "" {
continue
}

//type conversions
value := string(decoder.Value())
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
if p.tagFilter != nil && p.tagFilter.Match(string(decoder.Key())) {
tags[string(decoder.Key())] = value
} else if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
fields[string(decoder.Key())] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
fields[string(decoder.Key())] = fValue
Expand All @@ -63,11 +72,11 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
fields[string(decoder.Key())] = value
}
}
if len(fields) == 0 {
if len(fields) == 0 && len(tags) == 0 {
continue
}

m := metric.New(p.MetricName, map[string]string{}, fields, p.Now())
m := metric.New(p.MetricName, tags, fields, p.Now())

metrics = append(metrics, m)
}
Expand Down Expand Up @@ -106,3 +115,14 @@ func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
}
}
}

func (p *Parser) Init() error {
var err error

// Compile tag key patterns
if p.tagFilter, err = filter.Compile(p.TagKeys); err != nil {
return fmt.Errorf("error compiling tag pattern: %w", err)
}

return nil
}
85 changes: 77 additions & 8 deletions plugins/parsers/logfmt/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)

func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
t.Helper()
v := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)

return v
}

func TestParse(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -222,3 +215,79 @@ func TestParseLine(t *testing.T) {
})
}
}

func TestTags(t *testing.T) {
tests := []struct {
name string
measurement string
tagKeys []string
s string
want telegraf.Metric
wantErr bool
}{
{
name: "logfmt parser returns tags and fields",
measurement: "testlog",
tagKeys: []string{"lvl"},
s: "ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST",
want: testutil.MustMetric(
"testlog",
map[string]string{
"lvl": "info",
},
map[string]interface{}{
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
time.Unix(0, 0),
),
},
{
name: "logfmt parser returns no empty metrics",
measurement: "testlog",
tagKeys: []string{"lvl"},
s: "lvl=info",
want: testutil.MustMetric(
"testlog",
map[string]string{
"lvl": "info",
},
map[string]interface{}{},
time.Unix(0, 0),
),
},
{
name: "logfmt parser returns all keys as tag",
measurement: "testlog",
tagKeys: []string{"*"},
s: "ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST",
want: testutil.MustMetric(
"testlog",
map[string]string{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
map[string]interface{}{},
time.Unix(0, 0),
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := NewParser(tt.measurement, map[string]string{}, tt.tagKeys)
assert.NoError(t, l.Init())

got, err := l.ParseLine(tt.s)

if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime())
})
}
}
11 changes: 8 additions & 3 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ type Config struct {

// Influx configuration
InfluxParserType string `toml:"influx_parser_type"`

// LogFmt configuration
LogFmtTagKeys []string `toml:"logfmt_tag_keys"`
}

type XPathConfig xpath.Config
Expand Down Expand Up @@ -262,7 +265,7 @@ func NewParser(config *Config) (Parser, error) {
config.GrokTimezone,
config.GrokUniqueTimestamp)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags, config.LogFmtTagKeys)
case "form_urlencoded":
parser, err = NewFormUrlencodedParser(
config.MetricName,
Expand Down Expand Up @@ -389,8 +392,10 @@ func NewDropwizardParser(
}

// NewLogFmtParser returns a logfmt parser with the default options.
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
return logfmt.NewParser(metricName, defaultTags), nil
func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys []string) (Parser, error) {
parser := logfmt.NewParser(metricName, defaultTags, tagKeys)
err := parser.Init()
return parser, err
}

func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
Expand Down

0 comments on commit 442728b

Please sign in to comment.