Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statsd datadog #5791

Merged
merged 13 commits into from
May 14, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5017,6 +5017,7 @@
# ## Parses tags in the datadog statsd format
# ## http://docs.datadoghq.com/guides/dogstatsd/
# parse_data_dog_tags = false
# parse_data_dog_events = false
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
#
# ## Statsd data translation templates, more info can be read here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md
Expand Down
2 changes: 2 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
## Parses tags in the datadog statsd format
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false
parse_data_dog_events = false
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
docmerlin marked this conversation as resolved.
Show resolved Hide resolved

## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md
Expand Down Expand Up @@ -185,6 +186,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time.
- **templates** []string: Templates for transforming statsd buckets into influx
measurements and tags.
- **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
- **parse_data_dog_events** boolean: Enable parsing of events in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
docmerlin marked this conversation as resolved.
Show resolved Hide resolved

### Statsd bucket -> InfluxDB line-protocol Templates

Expand Down
173 changes: 173 additions & 0 deletions plugins/inputs/statsd/datadog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package statsd

import (
"fmt"
"log"
"strconv"
"strings"
"time"
)

const (
priorityNormal = "normal"
priorityLow = "low"
)

var uncommenter = strings.NewReplacer("\\n", "\n")
danielnelson marked this conversation as resolved.
Show resolved Hide resolved

// this is adapted from datadog's apache licensed version at
// https://github.com/DataDog/datadog-agent/blob/fcfc74f106ab1bd6991dfc6a7061c558d934158a/pkg/dogstatsd/parser.go#L173
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostname string) error {
// _e{title.length,text.length}:title|text
// [
// |d:date_happened
// |p:priority
// |h:hostname
// |t:alert_type
// |s:source_type_nam
// |#tag1,tag2
// ]
//
//
// tag is key:value
messageRaw := strings.SplitN(message, ":", 2)
if len(messageRaw) < 2 || len(messageRaw[0]) < 7 || len(messageRaw[1]) < 3 {
return fmt.Errorf("Invalid message format")
}
header := messageRaw[0]
message = messageRaw[1]

rawLen := strings.SplitN(header[3:], ",", 2)
if len(rawLen) != 2 {
return fmt.Errorf("Invalid message format")
}

titleLen, err := strconv.ParseInt(rawLen[0], 10, 64)
if err != nil {
return fmt.Errorf("Invalid message format, could not parse title.length: '%s'", rawLen[0])
}

textLen, err := strconv.ParseInt(rawLen[1][:len(rawLen[1])-1], 10, 64)
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("Invalid message format, could not parse text.length: '%s'", rawLen[0])
}
if titleLen+textLen+1 > int64(len(message)) {
return fmt.Errorf("Invalid message format, title.length and text.length exceed total message length")
}

rawTitle := message[:titleLen]
rawText := message[titleLen+1 : titleLen+1+textLen]
message = message[titleLen+1+textLen:]

if len(rawTitle) == 0 || len(rawText) == 0 {
return fmt.Errorf("Invalid event message format: empty 'title' or 'text' field")
}

// Handle hostname, with a priority to the h: field, then the host:
// tag and finally the defaultHostname value
// Metadata
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
m := cachedEvent{
name: rawTitle,
}
m.tags = make(map[string]string, strings.Count(message, ",")+2) // allocate for the approximate number of tags
m.fields = make(map[string]interface{}, 9)
m.fields["alert-type"] = "info" // default event type
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
m.fields["text"] = uncommenter.Replace(string(rawText))
m.tags["source"] = defaultHostname
m.fields["priority"] = priorityNormal
m.ts = now
if len(message) == 0 {
goller marked this conversation as resolved.
Show resolved Hide resolved
s.events = append(s.events, m)
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

if len(message) > 1 {
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
rawMetadataFields := strings.Split(message[1:], "|")
for i := range rawMetadataFields {
if len(rawMetadataFields[i]) < 2 {
log.Printf("W! [inputs.statsd] too short metadata field")
}
switch rawMetadataFields[i][:2] {
case "d:":
ts, err := strconv.ParseInt(rawMetadataFields[i][2:], 10, 64)
if err != nil {
log.Printf("W! [inputs.statsd] skipping timestamp: %s", err)
continue
}
m.fields["ts"] = ts
case "p:":
switch rawMetadataFields[i][2:] {
case priorityLow:
m.fields["priority"] = priorityLow
case priorityNormal: // we already used this as a default
default:
log.Printf("W! [inputs.statsd] skipping priority")
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
continue
}
case "h:":
m.tags["source"] = rawMetadataFields[i][2:]
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
case "t:":
switch rawMetadataFields[i][2:] {
case "error":
m.fields["alert-type"] = "error"
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
case "warning":
m.fields["alert-type"] = "warning"
case "success":
m.fields["alert-type"] = "success"
case "info": // already set for info
default:
log.Printf("W! [inputs.statsd] skipping alert type")
continue
}
case "k:":
// TODO(docmerlin): does this make sense?
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
m.tags["aggregation-key"] = rawMetadataFields[i][2:]
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
case "s:":
m.fields["source-type-name"] = rawMetadataFields[i][2:]
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
default:
if rawMetadataFields[i][0] == '#' {
parseDataDogTags(m.tags, rawMetadataFields[i][1:])
} else {
log.Printf("W! [inputs.statsd] unknown metadata type: '%s'", rawMetadataFields[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielnelson so, the original source code would optimistically continue parsing and warn about bad values within the even message.

Does this fit telegraf? Should these warnings be errors or are logs ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the message is malformed an error should be returned up from here and then logged. Doing a best effort parse doesn't make sense unless we find that the datadog agent regularly emits bad messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Telegraf generally tries to ignore errors and keep working where it can.

}
}
}
}
// host is a magic tag in the system, and it expects it to replace the result of h: if it is present
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
// telegraf will add a"host" tag anyway with different meaning than dogstatsd, so we need to switch these out
if host, ok := m.tags["host"]; ok {
danielnelson marked this conversation as resolved.
Show resolved Hide resolved
delete(m.tags, "host")
m.tags["source"] = host
}
s.Lock()
s.events = append(s.events, m)
s.Unlock()
return nil
}

func parseDataDogTags(tags map[string]string, message string) {
start, i := 0, 0
var k string
docmerlin marked this conversation as resolved.
Show resolved Hide resolved
var inVal bool // check if we are parsing the value part of the tag
for i = range message {
if message[i] == ',' {
if k == "" {
k = message[start:i]
tags[k] = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done empty string tags before in the zipkin plugin, however, it makes it pretty difficult to query for at least with influxdb.

@danielnelson how are label type tags typically done in telegraf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the value of having an empty tag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know but Datadog events tests in their agent specifically makes allowances for them. We wouldn't have spec compatibility if we didn't allow them. Should we handle them some other way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is allowed to have an empty tag value in InfluxDB, we just toss them at serialize time.

start = i + 1
continue
}
tags[k] = message[start:i]
start = i + 1
k, inVal = "", false // reset state vars
} else if message[i] == ':' && !inVal {
k = message[start:i]
start = i + 1
inVal = true
}
}
// grab the last value
if k != "" {
tags[k] = message[start : i+1]
}
}
Loading