Skip to content

Commit

Permalink
EMF: Store the initial value for cumulative metrics and skip sending …
Browse files Browse the repository at this point in the history
…to backend (#3425)
  • Loading branch information
bjrara authored Jun 8, 2021
1 parent 439ff5f commit e6e5817
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 351 deletions.
62 changes: 45 additions & 17 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ func calculateSummaryDelta(prev *aws.MetricValue, val interface{}, timestampMs t
countDelta := metricEntry.count
if prev != nil {
prevSummaryEntry := prev.RawValue.(summaryMetricEntry)
summaryDelta = summaryDelta - prevSummaryEntry.sum
countDelta = countDelta - prevSummaryEntry.count
summaryDelta = metricEntry.sum - prevSummaryEntry.sum
countDelta = metricEntry.count - prevSummaryEntry.count
} else {
return summaryMetricEntry{summaryDelta, countDelta}, false
}
return summaryMetricEntry{summaryDelta, countDelta}, true
}
Expand All @@ -53,8 +55,11 @@ type DataPoint struct {
// - pdata.SummaryDataPointSlice
type DataPoints interface {
Len() int
// NOTE: At() is an expensive call as it calculates the metric's value
At(i int) DataPoint
// At gets the adjusted datapoint from the DataPointSlice at i-th index.
// dataPoint: the adjusted data point
// retained: indicates whether the data point is valid for further process
// NOTE: It is an expensive call as it calculates the metric value.
At(i int) (dataPoint DataPoint, retained bool)
}

// deltaMetricMetadata contains the metadata required to perform rate/delta calculation
Expand Down Expand Up @@ -112,49 +117,67 @@ type summaryMetricEntry struct {
}

// At retrieves the IntDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps IntDataPointSlice) At(i int) DataPoint {
func (dps IntDataPointSlice) At(i int) (DataPoint, bool) {
metric := dps.IntDataPointSlice.At(i)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)

var metricVal float64
metricVal = float64(metric.Value())
retained := true
if dps.adjustToDelta {
deltaVal, _ := deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
var deltaVal interface{}
deltaVal, retained = deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
metricVal, metric.Timestamp().AsTime())
metricVal = deltaVal.(float64)
if !retained {
return DataPoint{}, retained
}
// It should not happen in practice that the previous metric value is smaller than the current one.
// If it happens, we assume that the metric is reset for some reason.
if deltaVal.(float64) >= 0 {
metricVal = deltaVal.(float64)
}
}

return DataPoint{
Value: metricVal,
Labels: labels,
TimestampMs: timestampMs,
}
}, retained
}

// At retrieves the DoubleDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps DoubleDataPointSlice) At(i int) DataPoint {
func (dps DoubleDataPointSlice) At(i int) (DataPoint, bool) {
metric := dps.DoubleDataPointSlice.At(i)
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())

var metricVal float64
metricVal = metric.Value()
retained := true
if dps.adjustToDelta {
deltaVal, _ := deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
var deltaVal interface{}
deltaVal, retained = deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
metricVal, metric.Timestamp().AsTime())
metricVal = deltaVal.(float64)
if !retained {
return DataPoint{}, retained
}
// It should not happen in practice that the previous metric value is smaller than the current one.
// If it happens, we assume that the metric is reset for some reason.
if deltaVal.(float64) >= 0 {
metricVal = deltaVal.(float64)
}
}

return DataPoint{
Value: metricVal,
Labels: labels,
TimestampMs: timestampMs,
}
}, retained
}

// At retrieves the HistogramDataPoint at the given index.
func (dps HistogramDataPointSlice) At(i int) DataPoint {
func (dps HistogramDataPointSlice) At(i int) (DataPoint, bool) {
metric := dps.HistogramDataPointSlice.At(i)
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
timestamp := unixNanoToMilliseconds(metric.Timestamp())
Expand All @@ -166,20 +189,25 @@ func (dps HistogramDataPointSlice) At(i int) DataPoint {
},
Labels: labels,
TimestampMs: timestamp,
}
}, true
}

// At retrieves the SummaryDataPoint at the given index.
func (dps SummaryDataPointSlice) At(i int) DataPoint {
func (dps SummaryDataPointSlice) At(i int) (DataPoint, bool) {
metric := dps.SummaryDataPointSlice.At(i)
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())

sum := metric.Sum()
count := metric.Count()
retained := true
if dps.adjustToDelta {
delta, _ := summaryMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
var delta interface{}
delta, retained = summaryMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
summaryMetricEntry{metric.Sum(), metric.Count()}, metric.Timestamp().AsTime())
if !retained {
return DataPoint{}, retained
}
summaryMetricDelta := delta.(summaryMetricEntry)
sum = summaryMetricDelta.sum
count = summaryMetricDelta.count
Expand All @@ -198,7 +226,7 @@ func (dps SummaryDataPointSlice) At(i int) DataPoint {
Value: metricVal,
Labels: labels,
TimestampMs: timestampMs,
}
}, retained
}

// createLabels converts OTel StringMap labels to a map
Expand Down
Loading

0 comments on commit e6e5817

Please sign in to comment.