-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support StatisticValues in cloudwatch output plugin (#4318) #4364
Changes from 3 commits
93ac576
08a11aa
c3d212f
4fcd076
9757472
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,143 @@ type CloudWatch struct { | |
|
||
Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace | ||
svc *cloudwatch.CloudWatch | ||
|
||
WriteStatistics bool `toml:"write_statistics"` | ||
} | ||
|
||
type statisticType int | ||
|
||
const ( | ||
statisticTypeNone statisticType = iota | ||
statisticTypeMax | ||
statisticTypeMin | ||
statisticTypeSum | ||
statisticTypeCount | ||
) | ||
|
||
type cloudwatchField interface { | ||
addValue(sType statisticType, value float64) | ||
buildDatum() []*cloudwatch.MetricDatum | ||
} | ||
|
||
type statisticField struct { | ||
metricName string | ||
fieldName string | ||
tags map[string]string | ||
values map[statisticType]float64 | ||
timestamp time.Time | ||
} | ||
|
||
func (f *statisticField) addValue(sType statisticType, value float64) { | ||
if sType != statisticTypeNone { | ||
f.values[sType] = value | ||
} | ||
} | ||
|
||
func (f *statisticField) buildDatum() []*cloudwatch.MetricDatum { | ||
|
||
var datums []*cloudwatch.MetricDatum | ||
|
||
if f.hasAllFields() { | ||
// If we have all required fields, we build datum with StatisticValues | ||
min, _ := f.values[statisticTypeMin] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to change this, but just using |
||
max, _ := f.values[statisticTypeMax] | ||
sum, _ := f.values[statisticTypeSum] | ||
count, _ := f.values[statisticTypeCount] | ||
|
||
datum := &cloudwatch.MetricDatum{ | ||
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")), | ||
Dimensions: BuildDimensions(f.tags), | ||
Timestamp: aws.Time(f.timestamp), | ||
StatisticValues: &cloudwatch.StatisticSet{ | ||
Minimum: aws.Float64(min), | ||
Maximum: aws.Float64(max), | ||
Sum: aws.Float64(sum), | ||
SampleCount: aws.Float64(count), | ||
}, | ||
} | ||
|
||
datums = append(datums, datum) | ||
|
||
} else { | ||
// If we don't have all required fields, we build each field as independent datum | ||
for sType, value := range f.values { | ||
datum := &cloudwatch.MetricDatum{ | ||
Value: aws.Float64(value), | ||
Dimensions: BuildDimensions(f.tags), | ||
Timestamp: aws.Time(f.timestamp), | ||
} | ||
|
||
switch sType { | ||
case statisticTypeMin: | ||
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "min"}, "_")) | ||
case statisticTypeMax: | ||
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "max"}, "_")) | ||
case statisticTypeSum: | ||
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "sum"}, "_")) | ||
case statisticTypeCount: | ||
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "count"}, "_")) | ||
default: | ||
// should not be here | ||
continue | ||
} | ||
|
||
datums = append(datums, datum) | ||
} | ||
} | ||
|
||
return datums | ||
} | ||
|
||
func (f *statisticField) hasAllFields() bool { | ||
|
||
_, hasMin := f.values[statisticTypeMin] | ||
_, hasMax := f.values[statisticTypeMax] | ||
_, hasSum := f.values[statisticTypeSum] | ||
_, hasCount := f.values[statisticTypeCount] | ||
|
||
return hasMin && hasMax && hasSum && hasCount | ||
} | ||
|
||
type valueField struct { | ||
metricName string | ||
fieldName string | ||
tags map[string]string | ||
value float64 | ||
timestamp time.Time | ||
} | ||
|
||
func (f *valueField) addValue(sType statisticType, value float64) { | ||
if sType == statisticTypeNone { | ||
f.value = value | ||
} | ||
} | ||
|
||
func (f *valueField) buildDatum() []*cloudwatch.MetricDatum { | ||
|
||
// Do CloudWatch boundary checking | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we still need to do the boundary checking on the statistic datums? Maybe we can do this where we call |
||
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html | ||
if math.IsNaN(f.value) { | ||
return nil | ||
} | ||
if math.IsInf(f.value, 0) { | ||
return nil | ||
} | ||
if f.value > 0 && f.value < float64(8.515920e-109) { | ||
return nil | ||
} | ||
if f.value > float64(1.174271e+108) { | ||
return nil | ||
} | ||
|
||
return []*cloudwatch.MetricDatum{ | ||
{ | ||
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")), | ||
Value: aws.Float64(f.value), | ||
Dimensions: BuildDimensions(f.tags), | ||
Timestamp: aws.Time(f.timestamp), | ||
}, | ||
} | ||
} | ||
|
||
var sampleConfig = ` | ||
|
@@ -50,6 +187,14 @@ var sampleConfig = ` | |
|
||
## Namespace for the CloudWatch MetricDatums | ||
namespace = "InfluxData/Telegraf" | ||
|
||
## If you have a large amount of metrics, you should consider to send statistic | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same grammar suggestions as above. |
||
## values instead of raw metrics which could not only improve performance but | ||
## also save AWS API cost. If enable this flag, this plugin would parse the required | ||
## CloudWatch statistic fields (count, min, max, and sum) and send them to CloudWatch. | ||
## You could use basicstats aggregator to calculate those fields. If not all statistic | ||
## fields are available, all fields would still be sent as raw metrics. | ||
# write_statistics = false | ||
` | ||
|
||
func (c *CloudWatch) SampleConfig() string { | ||
|
@@ -96,7 +241,7 @@ func (c *CloudWatch) Write(metrics []telegraf.Metric) error { | |
|
||
var datums []*cloudwatch.MetricDatum | ||
for _, m := range metrics { | ||
d := BuildMetricDatum(m) | ||
d := BuildMetricDatum(c.WriteStatistics, m) | ||
datums = append(datums, d...) | ||
} | ||
|
||
|
@@ -151,67 +296,58 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch | |
return partitions | ||
} | ||
|
||
// Make a MetricDatum for each field in a Point. Only fields with values that can be | ||
// converted to float64 are supported. Non-supported fields are skipped. | ||
func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum { | ||
datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) | ||
i := 0 | ||
// Make a MetricDatum from telegraf.Metric. It would check if all required fields of | ||
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values. | ||
// Otherwise, fields would still been built independently. | ||
func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.MetricDatum { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, changing an exported function's signature is advised against. Since nothing outside of this package uses it, maybe add a note that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't something we are concerned about right now, maybe someday but not now. The only API we provide stability for is the TOML file. I keep meaning to document this, but haven't gotten to it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is not an issue, I could help to |
||
|
||
var value float64 | ||
fields := make(map[string]cloudwatchField) | ||
|
||
for k, v := range point.Fields() { | ||
switch t := v.(type) { | ||
case int: | ||
value = float64(t) | ||
case int32: | ||
value = float64(t) | ||
case int64: | ||
value = float64(t) | ||
case uint64: | ||
value = float64(t) | ||
case float64: | ||
value = t | ||
case bool: | ||
if t { | ||
value = 1 | ||
} else { | ||
value = 0 | ||
} | ||
case time.Time: | ||
value = float64(t.Unix()) | ||
default: | ||
// Skip unsupported type. | ||
datums = datums[:len(datums)-1] | ||
continue | ||
} | ||
|
||
// Do CloudWatch boundary checking | ||
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html | ||
if math.IsNaN(value) { | ||
datums = datums[:len(datums)-1] | ||
continue | ||
} | ||
if math.IsInf(value, 0) { | ||
datums = datums[:len(datums)-1] | ||
val, ok := convert(v) | ||
if !ok { | ||
// Only fields with values that can be converted to float64 are supported. | ||
// Non-supported fields are skipped. | ||
continue | ||
} | ||
if value > 0 && value < float64(8.515920e-109) { | ||
datums = datums[:len(datums)-1] | ||
continue | ||
} | ||
if value > float64(1.174271e+108) { | ||
datums = datums[:len(datums)-1] | ||
|
||
sType, fieldName := getStatisticType(k) | ||
|
||
// If statistic metric is not enabled or non-statistic type, just take current field as a value field. | ||
if !buildStatistic || sType == statisticTypeNone { | ||
fields[k] = &valueField{ | ||
metricName: point.Name(), | ||
fieldName: k, | ||
tags: point.Tags(), | ||
timestamp: point.Time(), | ||
value: val, | ||
} | ||
continue | ||
} | ||
|
||
datums[i] = &cloudwatch.MetricDatum{ | ||
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")), | ||
Value: aws.Float64(value), | ||
Dimensions: BuildDimensions(point.Tags()), | ||
Timestamp: aws.Time(point.Time()), | ||
// Otherwise, it shall be a statistic field. | ||
if _, ok := fields[fieldName]; !ok { | ||
// Hit an uncached field, create statisticField for first time | ||
fields[fieldName] = &statisticField{ | ||
metricName: point.Name(), | ||
fieldName: fieldName, | ||
tags: point.Tags(), | ||
timestamp: point.Time(), | ||
values: map[statisticType]float64{ | ||
sType: val, | ||
}, | ||
} | ||
} else { | ||
// Add new statistic value to this field | ||
fields[fieldName].addValue(sType, val) | ||
} | ||
} | ||
|
||
i += 1 | ||
var datums []*cloudwatch.MetricDatum | ||
for _, f := range fields { | ||
d := f.buildDatum() | ||
datums = append(datums, d...) | ||
} | ||
|
||
return datums | ||
|
@@ -260,6 +396,58 @@ func BuildDimensions(mTags map[string]string) []*cloudwatch.Dimension { | |
return dimensions | ||
} | ||
|
||
func getStatisticType(name string) (sType statisticType, fieldName string) { | ||
switch { | ||
case strings.HasSuffix(name, "_max"): | ||
sType = statisticTypeMax | ||
fieldName = name[:len(name)-len("_max")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use |
||
case strings.HasSuffix(name, "_min"): | ||
sType = statisticTypeMin | ||
fieldName = name[:len(name)-len("_min")] | ||
case strings.HasSuffix(name, "_sum"): | ||
sType = statisticTypeSum | ||
fieldName = name[:len(name)-len("_sum")] | ||
case strings.HasSuffix(name, "_count"): | ||
sType = statisticTypeCount | ||
fieldName = name[:len(name)-len("_count")] | ||
default: | ||
sType = statisticTypeNone | ||
fieldName = name | ||
} | ||
return | ||
} | ||
|
||
func convert(v interface{}) (value float64, ok bool) { | ||
|
||
ok = true | ||
|
||
switch t := v.(type) { | ||
case int: | ||
value = float64(t) | ||
case int32: | ||
value = float64(t) | ||
case int64: | ||
value = float64(t) | ||
case uint64: | ||
value = float64(t) | ||
case float64: | ||
value = t | ||
case bool: | ||
if t { | ||
value = 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see this is how it was before, but should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks okay to me, value is a float64 and we are assigning a const to it. |
||
} else { | ||
value = 0 | ||
} | ||
case time.Time: | ||
value = float64(t.Unix()) | ||
default: | ||
// Skip unsupported type. | ||
ok = false | ||
} | ||
|
||
return | ||
} | ||
|
||
func init() { | ||
outputs.Add("cloudwatch", func() telegraf.Output { | ||
return &CloudWatch{} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor grammar: