Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wavefront parser #4402

Merged
merged 6 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,26 +189,33 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string
}

var source string
sourceTagFound := false

for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)

if s, ok := mTags["source"]; ok {
source = s
delete(mTags, "source")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: no empty line here

} else {
sourceTagFound := false
for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}
if sourceTagFound {
break

if !sourceTagFound {
source = mTags["host"]
}
}

if !sourceTagFound {
source = mTags["host"]
}
delete(mTags, "host")

return tagValueReplacer.Replace(source), mTags
Expand Down
7 changes: 7 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
)

// ParserInput is an interface for input plugins that are able to parse
Expand Down Expand Up @@ -120,6 +121,8 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags,
config.Separator,
config.Templates)
case "wavefront":
parser, err = NewWavefrontParser(config.DefaultTags)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand Down Expand Up @@ -200,3 +203,7 @@ func NewDropwizardParser(
}
return parser, err
}

func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
return wavefront.NewWavefrontParser(defaultTags), nil
}
238 changes: 238 additions & 0 deletions plugins/parsers/wavefront/element.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package wavefront

import (
"errors"
"fmt"
"strconv"
"time"
)

var (
ErrEOF = errors.New("EOF")
ErrInvalidTimestamp = errors.New("Invalid timestamp")
)

// Interface for parsing line elements.
type ElementParser interface {
parse(p *PointParser, pt *Point) error
}

type NameParser struct{}
type ValueParser struct{}
type TimestampParser struct {
optional bool
}
type WhiteSpaceParser struct {
nextOptional bool
}
type TagParser struct{}
type LoopedParser struct {
wrappedParser ElementParser
wsPaser *WhiteSpaceParser
}
type LiteralParser struct {
literal string
}

func (ep *NameParser) parse(p *PointParser, pt *Point) error {
//Valid characters are: a-z, A-Z, 0-9, hyphen ("-"), underscore ("_"), dot (".").
// Forward slash ("/") and comma (",") are allowed if metricName is enclosed in double quotes.
name, err := parseLiteral(p)
if err != nil {
return err
}
pt.Name = name
return nil
}

func (ep *ValueParser) parse(p *PointParser, pt *Point) error {
tok, lit := p.scan()
if tok == EOF {
return fmt.Errorf("found %q, expected number", lit)
}

p.writeBuf.Reset()
if tok == MINUS_SIGN {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}

for tok != EOF && (tok == LETTER || tok == NUMBER || tok == DOT) {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
p.unscan()

pt.Value = p.writeBuf.String()
_, err := strconv.ParseFloat(pt.Value, 64)
if err != nil {
return fmt.Errorf("invalid metric value %s", pt.Value)
}
return nil
}

func (ep *TimestampParser) parse(p *PointParser, pt *Point) error {
tok, lit := p.scan()
if tok == EOF {
if ep.optional {
p.unscanTokens(2)
return setTimestamp(pt, 0, 1)
}
return fmt.Errorf("found %q, expected number", lit)
}

if tok != NUMBER {
if ep.optional {
p.unscanTokens(2)
return setTimestamp(pt, 0, 1)
}
return ErrInvalidTimestamp
}

p.writeBuf.Reset()
for tok != EOF && tok == NUMBER {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
p.unscan()

tsStr := p.writeBuf.String()
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return err
}
return setTimestamp(pt, ts, len(tsStr))
}

func setTimestamp(pt *Point, ts int64, numDigits int) error {

if numDigits == 19 {
// nanoseconds
ts = ts / 1e9
} else if numDigits == 16 {
// microseconds
ts = ts / 1e6
} else if numDigits == 13 {
// milliseconds
ts = ts / 1e3
} else if numDigits != 10 {
// must be in seconds, return error if not 0
if ts == 0 {
ts = getCurrentTime()
} else {
return ErrInvalidTimestamp
}
}
pt.Timestamp = ts
return nil
}

func (ep *LoopedParser) parse(p *PointParser, pt *Point) error {
for {
err := ep.wrappedParser.parse(p, pt)
if err != nil {
return err
}
err = ep.wsPaser.parse(p, pt)
if err == ErrEOF {
break
}
}
return nil
}

func (ep *TagParser) parse(p *PointParser, pt *Point) error {
k, err := parseLiteral(p)
if err != nil {
if k == "" {
return nil
}
return err
}

next, lit := p.scan()
if next != EQUALS {
return fmt.Errorf("found %q, expected equals", lit)
}

v, err := parseLiteral(p)
if err != nil {
return err
}
if len(pt.Tags) == 0 {
pt.Tags = make(map[string]string)
}
pt.Tags[k] = v
return nil
}

func (ep *WhiteSpaceParser) parse(p *PointParser, pt *Point) error {
tok := WS
for tok != EOF && tok == WS {
tok, _ = p.scan()
}

if tok == EOF {
if !ep.nextOptional {
return ErrEOF
}
return nil
}
p.unscan()
return nil
}

func (ep *LiteralParser) parse(p *PointParser, pt *Point) error {
l, err := parseLiteral(p)
if err != nil {
return err
}

if l != ep.literal {
return fmt.Errorf("found %s, expected %s", l, ep.literal)
}
return nil
}

func parseQuotedLiteral(p *PointParser) (string, error) {
p.writeBuf.Reset()

escaped := false
tok, lit := p.scan()
for tok != EOF && (tok != QUOTES || (tok == QUOTES && escaped)) {
// let everything through
escaped = tok == BACKSLASH
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
if tok == EOF {
return "", fmt.Errorf("found %q, expected quotes", lit)
}
return p.writeBuf.String(), nil
}

func parseLiteral(p *PointParser) (string, error) {
tok, lit := p.scan()
if tok == EOF {
return "", fmt.Errorf("found %q, expected literal", lit)
}

if tok == QUOTES {
return parseQuotedLiteral(p)
}

p.writeBuf.Reset()
for tok != EOF && tok > literal_beg && tok < literal_end {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
if tok == QUOTES {
return "", errors.New("found quote inside unquoted literal")
}
p.unscan()
return p.writeBuf.String(), nil
}

func getCurrentTime() int64 {
return time.Now().UnixNano() / 1e9
}
Loading