Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wavefront parser #4402

Merged
merged 6 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Collectd](#collectd)
1. [Dropwizard](#dropwizard)
1. [Grok](#grok)
1. [Wavefront](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#wavefront)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -887,3 +888,29 @@ the file output will only print once per `flush_interval`.
- Continue one token at a time until the entire line is successfully parsed.


```

# Wavefront:

Wavefront Data Format is metrics are parsed directly into Telegraf metrics.
For more information about the Wavefront Data Format see
[here](https://docs.wavefront.com/wavefront_data_format.html).

There are no additional configuration options for Wavefront Data Format line-protocol.

#### Wavefront Configuration:

```toml
[[inputs.exec]]
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "wavefront"
```
34 changes: 20 additions & 14 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,26 +189,32 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string
}

var source string
sourceTagFound := false

for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)

if s, ok := mTags["source"]; ok {
source = s
delete(mTags, "source")
} else {
sourceTagFound := false
for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}
if sourceTagFound {
break

if !sourceTagFound {
source = mTags["host"]
}
}

if !sourceTagFound {
source = mTags["host"]
}
delete(mTags, "host")

return tagValueReplacer.Replace(source), mTags
Expand Down
7 changes: 7 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
)

// ParserInput is an interface for input plugins that are able to parse
Expand Down Expand Up @@ -131,6 +132,8 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags,
config.Separator,
config.Templates)
case "wavefront":
parser, err = NewWavefrontParser(config.DefaultTags)
case "grok":
parser, err = newGrokParser(
config.MetricName,
Expand Down Expand Up @@ -238,3 +241,7 @@ func NewDropwizardParser(
}
return parser, err
}

func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
return wavefront.NewWavefrontParser(defaultTags), nil
}
238 changes: 238 additions & 0 deletions plugins/parsers/wavefront/element.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package wavefront

import (
"errors"
"fmt"
"strconv"
"time"
)

var (
ErrEOF = errors.New("EOF")
ErrInvalidTimestamp = errors.New("Invalid timestamp")
)

// Interface for parsing line elements.
type ElementParser interface {
parse(p *PointParser, pt *Point) error
}

type NameParser struct{}
type ValueParser struct{}
type TimestampParser struct {
optional bool
}
type WhiteSpaceParser struct {
nextOptional bool
}
type TagParser struct{}
type LoopedParser struct {
wrappedParser ElementParser
wsPaser *WhiteSpaceParser
}
type LiteralParser struct {
literal string
}

func (ep *NameParser) parse(p *PointParser, pt *Point) error {
//Valid characters are: a-z, A-Z, 0-9, hyphen ("-"), underscore ("_"), dot (".").
// Forward slash ("/") and comma (",") are allowed if metricName is enclosed in double quotes.
name, err := parseLiteral(p)
if err != nil {
return err
}
pt.Name = name
return nil
}

func (ep *ValueParser) parse(p *PointParser, pt *Point) error {
tok, lit := p.scan()
if tok == EOF {
return fmt.Errorf("found %q, expected number", lit)
}

p.writeBuf.Reset()
if tok == MINUS_SIGN {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}

for tok != EOF && (tok == LETTER || tok == NUMBER || tok == DOT) {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
p.unscan()

pt.Value = p.writeBuf.String()
_, err := strconv.ParseFloat(pt.Value, 64)
if err != nil {
return fmt.Errorf("invalid metric value %s", pt.Value)
}
return nil
}

func (ep *TimestampParser) parse(p *PointParser, pt *Point) error {
tok, lit := p.scan()
if tok == EOF {
if ep.optional {
p.unscanTokens(2)
return setTimestamp(pt, 0, 1)
}
return fmt.Errorf("found %q, expected number", lit)
}

if tok != NUMBER {
if ep.optional {
p.unscanTokens(2)
return setTimestamp(pt, 0, 1)
}
return ErrInvalidTimestamp
}

p.writeBuf.Reset()
for tok != EOF && tok == NUMBER {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
p.unscan()

tsStr := p.writeBuf.String()
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return err
}
return setTimestamp(pt, ts, len(tsStr))
}

func setTimestamp(pt *Point, ts int64, numDigits int) error {

if numDigits == 19 {
// nanoseconds
ts = ts / 1e9
} else if numDigits == 16 {
// microseconds
ts = ts / 1e6
} else if numDigits == 13 {
// milliseconds
ts = ts / 1e3
} else if numDigits != 10 {
// must be in seconds, return error if not 0
if ts == 0 {
ts = getCurrentTime()
} else {
return ErrInvalidTimestamp
}
}
pt.Timestamp = ts
return nil
}

func (ep *LoopedParser) parse(p *PointParser, pt *Point) error {
for {
err := ep.wrappedParser.parse(p, pt)
if err != nil {
return err
}
err = ep.wsPaser.parse(p, pt)
if err == ErrEOF {
break
}
}
return nil
}

func (ep *TagParser) parse(p *PointParser, pt *Point) error {
k, err := parseLiteral(p)
if err != nil {
if k == "" {
return nil
}
return err
}

next, lit := p.scan()
if next != EQUALS {
return fmt.Errorf("found %q, expected equals", lit)
}

v, err := parseLiteral(p)
if err != nil {
return err
}
if len(pt.Tags) == 0 {
pt.Tags = make(map[string]string)
}
pt.Tags[k] = v
return nil
}

func (ep *WhiteSpaceParser) parse(p *PointParser, pt *Point) error {
tok := WS
for tok != EOF && tok == WS {
tok, _ = p.scan()
}

if tok == EOF {
if !ep.nextOptional {
return ErrEOF
}
return nil
}
p.unscan()
return nil
}

func (ep *LiteralParser) parse(p *PointParser, pt *Point) error {
l, err := parseLiteral(p)
if err != nil {
return err
}

if l != ep.literal {
return fmt.Errorf("found %s, expected %s", l, ep.literal)
}
return nil
}

func parseQuotedLiteral(p *PointParser) (string, error) {
p.writeBuf.Reset()

escaped := false
tok, lit := p.scan()
for tok != EOF && (tok != QUOTES || (tok == QUOTES && escaped)) {
// let everything through
escaped = tok == BACKSLASH
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
if tok == EOF {
return "", fmt.Errorf("found %q, expected quotes", lit)
}
return p.writeBuf.String(), nil
}

func parseLiteral(p *PointParser) (string, error) {
tok, lit := p.scan()
if tok == EOF {
return "", fmt.Errorf("found %q, expected literal", lit)
}

if tok == QUOTES {
return parseQuotedLiteral(p)
}

p.writeBuf.Reset()
for tok != EOF && tok > literal_beg && tok < literal_end {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
if tok == QUOTES {
return "", errors.New("found quote inside unquoted literal")
}
p.unscan()
return p.writeBuf.String(), nil
}

func getCurrentTime() int64 {
return time.Now().UnixNano() / 1e9
}
Loading