Skip to content

Commit

Permalink
Support StatisticValues in cloudwatch output plugin (influxdata#4318)
Browse files Browse the repository at this point in the history
  • Loading branch information
david7482 committed Jun 30, 2018
1 parent 1bd41ef commit 24a9b2f
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 4 deletions.
9 changes: 9 additions & 0 deletions plugins/outputs/cloudwatch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,12 @@ Examples include but are not limited to:
### namespace

The namespace used for AWS CloudWatch metrics.

### enable_statistic_values

If you have a large amount of metrics, you should consider to send
statistic values instead of raw metrics. This would not only improve
performance but also save AWS API cost. Use `basicstats` aggregator to
calculate those required statistic fields (count, min, max, and sum).
[See Cloudwatch StatisticSet](https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatch/#StatisticSet).
This plugin would try to parse those statistic fields and send to Cloudwatch.
119 changes: 117 additions & 2 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ type CloudWatch struct {

Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace
svc *cloudwatch.CloudWatch

EnableStatisticValues bool `toml:"enable_statistic_values"`
}

type statisticSet struct {
field string
max float64
min float64
sum float64
count float64
}

var sampleConfig = `
Expand All @@ -50,6 +60,14 @@ var sampleConfig = `
## Namespace for the CloudWatch MetricDatums
namespace = "InfluxData/Telegraf"
## If you have a large amount of metrics, you should consider to send
## statistic values instead of raw metrics. This would not only improve
## performance but also save AWS API cost. Use basicstats aggregator to
## calculate required statistic fields (count, min, max, and sum) and
## enable this flag. This plugin would try to parse those fields and
## send statistic values to Cloudwatch.
# enable_statistic_values = false
`

func (c *CloudWatch) SampleConfig() string {
Expand Down Expand Up @@ -107,7 +125,7 @@ func (c *CloudWatch) Write(metrics []telegraf.Metric) error {
// is equal to one MetricDatum. There is a limit on how many MetricDatums a
// request can have so we process one Point at a time.
func (c *CloudWatch) WriteSinglePoint(point telegraf.Metric) error {
datums := BuildMetricDatum(point)
datums := BuildMetricDatum(c.EnableStatisticValues, point)

const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data metrics per call

Expand Down Expand Up @@ -161,9 +179,28 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch
return partitions
}

// Make a MetricDatum from telegraf.Metric. It would check if all required fields of
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
// Otherwise, it would make MetricDatum from each field in a Point.
func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.MetricDatum {

// If not enable, just take all metrics as value datums.
if !buildStatistic {
return BuildValueMetricDatum(point)
}

// Try to parse statisticSet first, then build statistic/value datum accordingly.
set, ok := getStatisticSet(point)
if ok {
return BuildStatisticMetricDatum(point, set)
} else {
return BuildValueMetricDatum(point)
}
}

// Make a MetricDatum for each field in a Point. Only fields with values that can be
// converted to float64 are supported. Non-supported fields are skipped.
func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
func BuildValueMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
datums := make([]*cloudwatch.MetricDatum, len(point.Fields()))
i := 0

Expand Down Expand Up @@ -227,6 +264,24 @@ func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
return datums
}

// Make a MetricDatum with statistic values.
func BuildStatisticMetricDatum(point telegraf.Metric, set *statisticSet) []*cloudwatch.MetricDatum {

data := &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{point.Name(), set.field}, "_")),
StatisticValues: &cloudwatch.StatisticSet{
Minimum: aws.Float64(set.min),
Maximum: aws.Float64(set.max),
Sum: aws.Float64(set.sum),
SampleCount: aws.Float64(set.count),
},
Dimensions: BuildDimensions(point.Tags()),
Timestamp: aws.Time(point.Time()),
}

return []*cloudwatch.MetricDatum{data}
}

// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to
// 10 dimensions per metric so we only keep up to the first 10 alphabetically.
// This always includes the "host" tag if it exists.
Expand Down Expand Up @@ -270,6 +325,66 @@ func BuildDimensions(mTags map[string]string) []*cloudwatch.Dimension {
return dimensions
}

func getStatisticSet(point telegraf.Metric) (*statisticSet, bool) {

// cloudwatch.StatisticSet requires Max, Min, Count and Sum values.
// If this point has less than 4 fields, it's not possible to build
// StatisticSet from it.
if len(point.Fields()) < 4 {
return nil, false
}

// Try to find the max field. If we could find it, we will use its
// field name to find other required fields.
var set *statisticSet
for k, v := range point.Fields() {
if strings.HasSuffix(k, "_max") {
if fv, ok := convert(v); ok {
set = &statisticSet{
field: k[:len(k)-4],
max: fv,
}
break
}
}
}
if set == nil {
return nil, false
}

// Check if we could find all required fields with the same field name
var ok bool
if set.min, ok = findField(point, set.field+"_min"); !ok {
return nil, false
}
if set.count, ok = findField(point, set.field+"_count"); !ok {
return nil, false
}
if set.sum, ok = findField(point, set.field+"_sum"); !ok {
return nil, false
}

return set, true
}

func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
default:
return 0, false
}
}

func findField(point telegraf.Metric, field string) (float64, bool) {
if v, ok := point.GetField(field); ok {
if fv, ok := convert(v); ok {
return fv, true
}
}
return 0, false
}

func init() {
outputs.Add("cloudwatch", func() telegraf.Output {
return &CloudWatch{}
Expand Down
24 changes: 22 additions & 2 deletions plugins/outputs/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"math"
"sort"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -72,13 +74,31 @@ func TestBuildMetricDatums(t *testing.T) {
testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108
}
for _, point := range validMetrics {
datums := BuildMetricDatum(point)
datums := BuildMetricDatum(false, point)
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point))
}
for _, point := range invalidMetrics {
datums := BuildMetricDatum(point)
datums := BuildMetricDatum(false, point)
assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point))
}

statisticMetric, _ := metric.New(
"test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{"value_max": float64(10), "value_min": float64(0), "value_sum": float64(100), "value_count": float64(20)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
datums := BuildMetricDatum(true, statisticMetric)
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric))

multipleFieldsMetric, _ := metric.New(
"test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{"valueA": float64(10), "valueB": float64(0), "valueC": float64(100), "valueD": float64(20)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
datums = BuildMetricDatum(true, multipleFieldsMetric)
assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multipleFieldsMetric))
}

func TestPartitionDatums(t *testing.T) {
Expand Down

0 comments on commit 24a9b2f

Please sign in to comment.