diff --git a/.chloggen/awsemf_mapCleanup.yaml b/.chloggen/awsemf_mapCleanup.yaml new file mode 100755 index 000000000000..6efc9ed16d90 --- /dev/null +++ b/.chloggen/awsemf_mapCleanup.yaml @@ -0,0 +1,32 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsemfexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enforce time to live on metric data that is stored for the purpose of cumulative to delta conversions within EMF Exporter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [25058] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This change fixes a bug where the cache used to store metric information for cumulative to delta + conversions was not enforcing its time to live. This could cause excessive memory growth in certain scenarios which could + lead to OOM failures for Collector. To properly fix this issue package global metric caches were removed and replaced + with caches that are unique per emf exporter. A byproduct of this change is that no two emf exporters within an + Collector will share a caches leading to more accurate cumulative to delta conversions. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index 1ed167b70b12..4aba2c5bba50 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -22,10 +22,10 @@ const ( summarySumSuffix = "_sum" ) -var ( - deltaMetricCalculator = aws.NewFloat64DeltaCalculator() - summaryMetricCalculator = aws.NewMetricCalculator(calculateSummaryDelta) -) +type emfCalculators struct { + delta aws.MetricCalculator + summary aws.MetricCalculator +} func calculateSummaryDelta(prev *aws.MetricValue, val interface{}, _ time.Time) (interface{}, bool) { metricEntry := val.(summaryMetricEntry) @@ -60,7 +60,7 @@ type dataPoints interface { // dataPoint: the adjusted data point // retained: indicates whether the data point is valid for further process // NOTE: It is an expensive call as it calculates the metric value. - CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool) (dataPoint []dataPoint, retained bool) + CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool, calculators *emfCalculators) (dataPoint []dataPoint, retained bool) } // deltaMetricMetadata contains the metadata required to perform rate/delta calculation @@ -106,7 +106,7 @@ type summaryMetricEntry struct { } // CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary. -func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool) ([]dataPoint, bool) { +func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) { metric := dps.NumberDataPointSlice.At(i) labels := createLabels(metric.Attributes(), instrumentationScopeName) timestampMs := unixNanoToMilliseconds(metric.Timestamp()) @@ -124,7 +124,7 @@ func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationS if dps.adjustToDelta { var deltaVal interface{} mKey := aws.NewKey(dps.deltaMetricMetadata, labels) - deltaVal, retained = deltaMetricCalculator.Calculate(mKey, metricVal, metric.Timestamp().AsTime()) + deltaVal, retained = calculators.delta.Calculate(mKey, metricVal, metric.Timestamp().AsTime()) // If a delta to the previous data point could not be computed use the current metric value instead if !retained && dps.retainInitialValueForDelta { @@ -146,7 +146,7 @@ func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationS } // CalculateDeltaDatapoints retrieves the HistogramDataPoint at the given index. -func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool) ([]dataPoint, bool) { +func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) { metric := dps.HistogramDataPointSlice.At(i) labels := createLabels(metric.Attributes(), instrumentationScopeName) timestamp := unixNanoToMilliseconds(metric.Timestamp()) @@ -165,7 +165,7 @@ func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentati } // CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index. -func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool) ([]dataPoint, bool) { +func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) { metric := dps.ExponentialHistogramDataPointSlice.At(idx) scale := metric.Scale() @@ -247,7 +247,7 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, } // CalculateDeltaDatapoints retrieves the SummaryDataPoint at the given index and perform calculation with sum and count while retain the quantile value. -func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool) ([]dataPoint, bool) { +func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool, calculators *emfCalculators) ([]dataPoint, bool) { metric := dps.SummaryDataPointSlice.At(i) labels := createLabels(metric.Attributes(), instrumentationScopeName) timestampMs := unixNanoToMilliseconds(metric.Timestamp()) @@ -261,7 +261,7 @@ func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentation if dps.adjustToDelta { var delta interface{} mKey := aws.NewKey(dps.deltaMetricMetadata, labels) - delta, retained = summaryMetricCalculator.Calculate(mKey, summaryMetricEntry{sum, count}, metric.Timestamp().AsTime()) + delta, retained = calculators.summary.Calculate(mKey, summaryMetricEntry{sum, count}, metric.Timestamp().AsTime()) // If a delta to the previous data point could not be computed use the current metric value instead if !retained && dps.retainInitialValueForDelta { diff --git a/exporter/awsemfexporter/datapoint_test.go b/exporter/awsemfexporter/datapoint_test.go index 9fd3f963fbb1..3e4f0b46f7f3 100644 --- a/exporter/awsemfexporter/datapoint_test.go +++ b/exporter/awsemfexporter/datapoint_test.go @@ -10,8 +10,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -169,14 +171,24 @@ func generateDeltaMetricMetadata(adjustToDelta bool, metricName string, retainIn } } -func setupDataPointCache() { - deltaMetricCalculator = aws.NewFloat64DeltaCalculator() - summaryMetricCalculator = aws.NewMetricCalculator(calculateSummaryDelta) +func setupEmfCalculators() *emfCalculators { + return &emfCalculators{ + summary: aws.NewMetricCalculator(calculateSummaryDelta), + delta: aws.NewFloat64DeltaCalculator(), + } +} + +func shutdownEmfCalculators(c *emfCalculators) error { + var errs error + errs = multierr.Append(errs, c.delta.Shutdown()) + return multierr.Append(errs, c.summary.Shutdown()) + } func TestCalculateDeltaDatapoints_NumberDataPointSlice(t *testing.T) { + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) for _, retainInitialValueOfDeltaMetric := range []bool{true, false} { - setupDataPointCache() testCases := []struct { name string @@ -263,6 +275,7 @@ func TestCalculateDeltaDatapoints_NumberDataPointSlice(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + // Given the number datapoint (including Sum and Gauge OTEL metric type) with data type as int or double numberDPS := pmetric.NewNumberDataPointSlice() numberDP := numberDPS.AppendEmpty() @@ -280,7 +293,7 @@ func TestCalculateDeltaDatapoints_NumberDataPointSlice(t *testing.T) { numberDatapointSlice := numberDataPointSlice{deltaMetricMetadata, numberDPS} // When calculate the delta datapoints for number datapoint - dps, retained := numberDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false) + dps, retained := numberDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false, emfCalcs) assert.Equal(t, 1, numberDatapointSlice.Len()) assert.Equal(t, tc.expectedRetained, retained) @@ -362,14 +375,16 @@ func TestCalculateDeltaDatapoints_HistogramDataPointSlice(t *testing.T) { t.Run(tc.name, func(_ *testing.T) { // Given the histogram datapoints histogramDatapointSlice := histogramDataPointSlice{deltaMetricMetadata, tc.histogramDPS} - + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) // When calculate the delta datapoints for histograms - dps, retained := histogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false) + dps, retained := histogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false, emfCalcs) // Then receiving the following datapoint with an expected length assert.True(t, retained) assert.Equal(t, 1, histogramDatapointSlice.Len()) assert.Equal(t, tc.expectedDatapoint, dps[0]) + }) } @@ -462,9 +477,10 @@ func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSlice(t *testing. t.Run(tc.name, func(_ *testing.T) { // Given the histogram datapoints exponentialHistogramDatapointSlice := exponentialHistogramDataPointSlice{deltaMetricMetadata, tc.histogramDPS} - + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) // When calculate the delta datapoints for histograms - dps, retained := exponentialHistogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false) + dps, retained := exponentialHistogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false, emfCalcs) // Then receiving the following datapoint with an expected length assert.True(t, retained) @@ -476,6 +492,8 @@ func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSlice(t *testing. } func TestCalculateDeltaDatapoints_SummaryDataPointSlice(t *testing.T) { + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) for _, retainInitialValueOfDeltaMetric := range []bool{true, false} { deltaMetricMetadata := generateDeltaMetricMetadata(true, "foo", retainInitialValueOfDeltaMetric) @@ -540,7 +558,7 @@ func TestCalculateDeltaDatapoints_SummaryDataPointSlice(t *testing.T) { summaryDatapointSlice := summaryDataPointSlice{deltaMetricMetadata, summaryDPS} // When calculate the delta datapoints for sum and count in summary - dps, retained := summaryDatapointSlice.CalculateDeltaDatapoints(0, "", true) + dps, retained := summaryDatapointSlice.CalculateDeltaDatapoints(0, "", true, emfCalcs) // Then receiving the following datapoint with an expected length assert.Equal(t, tc.expectedRetained, retained) @@ -644,7 +662,6 @@ func TestGetDataPoints(t *testing.T) { metadata := generateTestMetricMetadata("namespace", time.Now().UnixNano()/int64(time.Millisecond), "log-group", "log-stream", "cloudwatch-otel", metric.Type()) t.Run(tc.name, func(t *testing.T) { - setupDataPointCache() if tc.isPrometheusMetrics { metadata.receiver = prometheusReceiver @@ -740,7 +757,8 @@ func BenchmarkGetAndCalculateDeltaDataPoints(b *testing.B) { finalOtelMetrics := generateOtelTestMetrics(generateMetrics...) rms := finalOtelMetrics.ResourceMetrics() metrics := rms.At(0).ScopeMetrics().At(0).Metrics() - + emfCalcs := setupEmfCalculators() + defer require.NoError(b, shutdownEmfCalculators(emfCalcs)) b.ResetTimer() for n := 0; n < b.N; n++ { for i := 0; i < metrics.Len(); i++ { @@ -748,7 +766,7 @@ func BenchmarkGetAndCalculateDeltaDataPoints(b *testing.B) { dps := getDataPoints(metrics.At(i), metadata, zap.NewNop()) for i := 0; i < dps.Len(); i++ { - dps.CalculateDeltaDatapoints(i, "", false) + dps.CalculateDeltaDatapoints(i, "", false, emfCalcs) } } } diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 76d2fb34ce70..97de506a86f8 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -181,7 +181,7 @@ func (emf *emfExporter) shutdown(_ context.Context) error { } } - return nil + return emf.metricTranslator.Shutdown() } func wrapErrorIfBadRequest(err error) error { diff --git a/exporter/awsemfexporter/go.mod b/exporter/awsemfexporter/go.mod index 46964aebfb48..37fee97f1669 100644 --- a/exporter/awsemfexporter/go.mod +++ b/exporter/awsemfexporter/go.mod @@ -18,6 +18,7 @@ require ( go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 go.opentelemetry.io/collector/semconv v0.82.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.25.0 golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea ) @@ -47,7 +48,6 @@ require ( go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect diff --git a/exporter/awsemfexporter/grouped_metric.go b/exporter/awsemfexporter/grouped_metric.go index 6161113f6ec8..f26491218c0a 100644 --- a/exporter/awsemfexporter/grouped_metric.go +++ b/exporter/awsemfexporter/grouped_metric.go @@ -27,7 +27,7 @@ type metricInfo struct { } // addToGroupedMetric processes OT metrics and adds them into GroupedMetric buckets -func addToGroupedMetric(pmd pmetric.Metric, groupedMetrics map[interface{}]*groupedMetric, metadata cWMetricMetadata, patternReplaceSucceeded bool, logger *zap.Logger, descriptor map[string]MetricDescriptor, config *Config) error { +func addToGroupedMetric(pmd pmetric.Metric, groupedMetrics map[interface{}]*groupedMetric, metadata cWMetricMetadata, patternReplaceSucceeded bool, logger *zap.Logger, descriptor map[string]MetricDescriptor, config *Config, calculators *emfCalculators) error { dps := getDataPoints(pmd, metadata, logger) if dps == nil || dps.Len() == 0 { @@ -35,7 +35,7 @@ func addToGroupedMetric(pmd pmetric.Metric, groupedMetrics map[interface{}]*grou } for i := 0; i < dps.Len(); i++ { - dps, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics) + dps, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators) if !retained { continue } diff --git a/exporter/awsemfexporter/grouped_metric_test.go b/exporter/awsemfexporter/grouped_metric_test.go index 2214393bb795..adb6dc114a01 100644 --- a/exporter/awsemfexporter/grouped_metric_test.go +++ b/exporter/awsemfexporter/grouped_metric_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -95,7 +96,8 @@ func TestAddToGroupedMetric(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - setupDataPointCache() + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) groupedMetrics := make(map[interface{}]*groupedMetric) rms := tc.metric.ResourceMetrics() @@ -106,7 +108,12 @@ func TestAddToGroupedMetric(t *testing.T) { assert.Equal(t, 1, ilms.Len()) for i := 0; i < metrics.Len(); i++ { - err := addToGroupedMetric(metrics.At(i), groupedMetrics, generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(i).Type()), true, zap.NewNop(), nil, testCfg) + err := addToGroupedMetric(metrics.At(i), groupedMetrics, + generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(i).Type()), + true, zap.NewNop(), + nil, + testCfg, + emfCalcs) assert.Nil(t, err) } @@ -122,7 +129,8 @@ func TestAddToGroupedMetric(t *testing.T) { } t.Run("Add multiple different metrics", func(t *testing.T) { - setupDataPointCache() + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) groupedMetrics := make(map[interface{}]*groupedMetric) generateMetrics := []pmetric.Metrics{ @@ -141,7 +149,14 @@ func TestAddToGroupedMetric(t *testing.T) { assert.Equal(t, 9, metrics.Len()) for i := 0; i < metrics.Len(); i++ { - err := addToGroupedMetric(metrics.At(i), groupedMetrics, generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(i).Type()), true, logger, nil, testCfg) + err := addToGroupedMetric(metrics.At(i), + groupedMetrics, + generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(i).Type()), + true, + logger, + nil, + testCfg, + emfCalcs) assert.Nil(t, err) } @@ -178,17 +193,31 @@ func TestAddToGroupedMetric(t *testing.T) { }) t.Run("Add same metric but different log group", func(t *testing.T) { + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) groupedMetrics := make(map[interface{}]*groupedMetric) otelMetrics := generateTestGaugeMetric("int-gauge", "int") ilms := otelMetrics.ResourceMetrics().At(0).ScopeMetrics() metric := ilms.At(0).Metrics().At(0) metricMetadata1 := generateTestMetricMetadata(namespace, timestamp, "log-group-1", logStreamName, instrumentationLibName, metric.Type()) - err := addToGroupedMetric(metric, groupedMetrics, metricMetadata1, true, logger, nil, testCfg) + err := addToGroupedMetric(metric, + groupedMetrics, + metricMetadata1, + true, logger, + nil, + testCfg, + emfCalcs) assert.Nil(t, err) - metricMetadata2 := generateTestMetricMetadata(namespace, timestamp, "log-group-2", logStreamName, instrumentationLibName, metric.Type()) - err = addToGroupedMetric(metric, groupedMetrics, metricMetadata2, true, logger, nil, testCfg) + metricMetadata2 := generateTestMetricMetadata(namespace, + timestamp, + "log-group-2", + logStreamName, + instrumentationLibName, + metric.Type(), + ) + err = addToGroupedMetric(metric, groupedMetrics, metricMetadata2, true, logger, nil, testCfg, emfCalcs) assert.Nil(t, err) assert.Len(t, groupedMetrics, 2) @@ -220,6 +249,8 @@ func TestAddToGroupedMetric(t *testing.T) { }) t.Run("Duplicate metric names", func(t *testing.T) { + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) groupedMetrics := make(map[interface{}]*groupedMetric) generateMetrics := []pmetric.Metrics{ generateTestGaugeMetric("foo", "int"), @@ -237,7 +268,14 @@ func TestAddToGroupedMetric(t *testing.T) { obsLogger := zap.New(obs) for i := 0; i < metrics.Len(); i++ { - err := addToGroupedMetric(metrics.At(i), groupedMetrics, generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(i).Type()), true, obsLogger, nil, testCfg) + err := addToGroupedMetric(metrics.At(i), + groupedMetrics, + generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(i).Type()), + true, obsLogger, + nil, + testCfg, + emfCalcs, + ) assert.Nil(t, err) } assert.Equal(t, 1, len(groupedMetrics)) @@ -261,6 +299,8 @@ func TestAddToGroupedMetric(t *testing.T) { }) t.Run("Unhandled metric type", func(t *testing.T) { + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) groupedMetrics := make(map[interface{}]*groupedMetric) md := pmetric.NewMetrics() rms := md.ResourceMetrics() @@ -270,7 +310,15 @@ func TestAddToGroupedMetric(t *testing.T) { obs, logs := observer.New(zap.WarnLevel) obsLogger := zap.New(obs) - err := addToGroupedMetric(metric, groupedMetrics, generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, pmetric.MetricTypeEmpty), true, obsLogger, nil, testCfg) + err := addToGroupedMetric(metric, + groupedMetrics, + generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, pmetric.MetricTypeEmpty), + true, + obsLogger, + nil, + testCfg, + emfCalcs, + ) assert.Nil(t, err) assert.Equal(t, 0, len(groupedMetrics)) @@ -323,6 +371,8 @@ func TestAddKubernetesWrapper(t *testing.T) { } func BenchmarkAddToGroupedMetric(b *testing.B) { + emfCalcs := setupEmfCalculators() + defer require.NoError(b, shutdownEmfCalculators(emfCalcs)) generateMetrics := []pmetric.Metrics{ generateTestGaugeMetric("int-gauge", intValueType), generateTestGaugeMetric("int-gauge", doubleValueType), @@ -344,7 +394,7 @@ func BenchmarkAddToGroupedMetric(b *testing.B) { groupedMetrics := make(map[interface{}]*groupedMetric) for i := 0; i < numMetrics; i++ { metadata := generateTestMetricMetadata("namespace", int64(1596151098037), "log-group", "log-stream", "cloudwatch-otel", metrics.At(i).Type()) - err := addToGroupedMetric(metrics.At(i), groupedMetrics, metadata, true, logger, nil, testCfg) + err := addToGroupedMetric(metrics.At(i), groupedMetrics, metadata, true, logger, nil, testCfg, emfCalcs) assert.Nil(b, err) } } diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index e1fd70a5bdf6..70d71b7796da 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -10,9 +10,11 @@ import ( "time" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" + aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" ) const ( @@ -85,6 +87,7 @@ type cWMetricMetadata struct { type metricTranslator struct { metricDescriptor map[string]MetricDescriptor + calculators *emfCalculators } func newMetricTranslator(config Config) metricTranslator { @@ -94,9 +97,20 @@ func newMetricTranslator(config Config) metricTranslator { } return metricTranslator{ metricDescriptor: mt, + calculators: &emfCalculators{ + delta: aws.NewFloat64DeltaCalculator(), + summary: aws.NewMetricCalculator(calculateSummaryDelta), + }, } } +func (mt metricTranslator) Shutdown() error { + var errs error + errs = multierr.Append(errs, mt.calculators.delta.Shutdown()) + errs = multierr.Append(errs, mt.calculators.summary.Shutdown()) + return errs +} + // translateOTelToGroupedMetric converts OT metrics to Grouped Metric format. func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetrics, groupedMetrics map[interface{}]*groupedMetric, config *Config) error { timestamp := time.Now().UnixNano() / int64(time.Millisecond) @@ -129,7 +143,7 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri instrumentationScopeName: instrumentationScopeName, receiver: metricReceiver, } - err := addToGroupedMetric(metric, groupedMetrics, metadata, patternReplaceSucceeded, config.logger, mt.metricDescriptor, config) + err := addToGroupedMetric(metric, groupedMetrics, metadata, patternReplaceSucceeded, config.logger, mt.metricDescriptor, config, mt.calculators) if err != nil { return err } diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 75da81d1f5a6..d09ba1d8eaef 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" @@ -256,6 +257,7 @@ func TestTranslateOtToGroupedMetric(t *testing.T) { logger: zap.NewNop(), } translator := newMetricTranslator(*config) + defer require.NoError(t, translator.Shutdown()) noInstrLibMetric := createTestResourceMetrics() instrLibMetric := createTestResourceMetrics() @@ -345,7 +347,6 @@ func TestTranslateOtToGroupedMetric(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - setupDataPointCache() groupedMetrics := make(map[interface{}]*groupedMetric) err := translator.translateOTelToGroupedMetric(tc.metric, groupedMetrics, config) @@ -1976,6 +1977,7 @@ func BenchmarkTranslateOtToGroupedMetricWithInstrLibrary(b *testing.B) { logger: zap.NewNop(), } translator := newMetricTranslator(*config) + defer require.NoError(b, translator.Shutdown()) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -1998,6 +2000,7 @@ func BenchmarkTranslateOtToGroupedMetricWithoutConfigReplacePattern(b *testing.B logger: zap.NewNop(), } translator := newMetricTranslator(*config) + defer require.NoError(b, translator.Shutdown()) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -2020,6 +2023,7 @@ func BenchmarkTranslateOtToGroupedMetricWithConfigReplaceWithResource(b *testing logger: zap.NewNop(), } translator := newMetricTranslator(*config) + defer require.NoError(b, translator.Shutdown()) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -2042,6 +2046,7 @@ func BenchmarkTranslateOtToGroupedMetricWithConfigReplaceWithLabel(b *testing.B) logger: zap.NewNop(), } translator := newMetricTranslator(*config) + defer require.NoError(b, translator.Shutdown()) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -2059,6 +2064,7 @@ func BenchmarkTranslateOtToGroupedMetricWithoutInstrLibrary(b *testing.B) { logger: zap.NewNop(), } translator := newMetricTranslator(*config) + defer require.NoError(b, translator.Shutdown()) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -2318,6 +2324,7 @@ func TestTranslateOtToGroupedMetricForLogGroupAndStream(t *testing.T) { } translator := newMetricTranslator(*config) + defer require.NoError(t, translator.Shutdown()) groupedMetrics := make(map[interface{}]*groupedMetric) diff --git a/internal/aws/metrics/metric_calculator.go b/internal/aws/metrics/metric_calculator.go index 0f9866cb8e55..205ae5d39bae 100644 --- a/internal/aws/metrics/metric_calculator.go +++ b/internal/aws/metrics/metric_calculator.go @@ -4,6 +4,7 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" import ( + "errors" "sync" "time" @@ -34,6 +35,7 @@ func calculateDelta(prev *MetricValue, val interface{}, _ time.Time) (interface{ } // MetricCalculator is a calculator used to adjust metric values based on its previous record. +// Shutdown() must be called to clean up goroutines before program exit. type MetricCalculator struct { // lock on write lock sync.Mutex @@ -43,6 +45,7 @@ type MetricCalculator struct { calculateFunc CalculateFunc } +// NewMetricCalculator Creates a metric calculator that enforces a five-minute time to live on cache entries. func NewMetricCalculator(calculateFunc CalculateFunc) MetricCalculator { return MetricCalculator{ cache: NewMapWithExpiry(cleanInterval), @@ -63,6 +66,11 @@ func (rm *MetricCalculator) Calculate(mKey Key, value interface{}, timestamp tim rm.lock.Lock() defer rm.lock.Unlock() + // need to also lock cache to avoid the cleanup from removing entries while they are being processed. + // This is only likely to happen when data points come in close to expiration date. + rm.cache.Lock() + defer rm.cache.Unlock() + prev, exists := cacheStore.Get(mKey) result, done = rm.calculateFunc(prev, value, timestamp) if !exists || done { @@ -74,6 +82,10 @@ func (rm *MetricCalculator) Calculate(mKey Key, value interface{}, timestamp tim return result, done } +func (rm *MetricCalculator) Shutdown() error { + return rm.cache.Shutdown() +} + type Key struct { MetricMetadata interface{} MetricLabels attribute.Distinct @@ -99,15 +111,21 @@ type MetricValue struct { Timestamp time.Time } -// MapWithExpiry act like a map which provide a method to clean up expired entries +// MapWithExpiry act like a map which provides a method to clean up expired entries. +// MapWithExpiry is not thread safe and locks must be managed by the owner of the Map by the use of Lock() and Unlock() type MapWithExpiry struct { - lock *sync.Mutex - ttl time.Duration - entries map[interface{}]*MetricValue + lock *sync.Mutex + ttl time.Duration + entries map[interface{}]*MetricValue + doneChan chan struct{} } +// NewMapWithExpiry automatically starts a sweeper to enforce the maps TTL. ShutDown() must be called to ensure that these +// go routines are properly cleaned up ShutDown() must be called. func NewMapWithExpiry(ttl time.Duration) *MapWithExpiry { - return &MapWithExpiry{lock: &sync.Mutex{}, ttl: ttl, entries: make(map[interface{}]*MetricValue)} + m := &MapWithExpiry{lock: &sync.Mutex{}, ttl: ttl, entries: make(map[interface{}]*MetricValue), doneChan: make(chan struct{})} + go m.sweep(m.CleanUp) + return m } func (m *MapWithExpiry) Get(key Key) (*MetricValue, bool) { @@ -119,6 +137,32 @@ func (m *MapWithExpiry) Set(key Key, value MetricValue) { m.entries[key] = &value } +func (m *MapWithExpiry) sweep(removeFunc func(time2 time.Time)) { + ticker := time.NewTicker(m.ttl) + for { + select { + case currentTime := <-ticker.C: + m.lock.Lock() + removeFunc(currentTime) + m.lock.Unlock() + case <-m.doneChan: + ticker.Stop() + return + } + } +} + +func (m *MapWithExpiry) Shutdown() error { + select { + case <-m.doneChan: + return errors.New("shutdown called on an already closed channel") + default: + close(m.doneChan) + + } + return nil +} + func (m *MapWithExpiry) CleanUp(now time.Time) { for k, v := range m.entries { if now.Sub(v.Timestamp) >= m.ttl { diff --git a/internal/aws/metrics/metric_calculator_test.go b/internal/aws/metrics/metric_calculator_test.go index 50234d337e7a..015b3032feb9 100644 --- a/internal/aws/metrics/metric_calculator_test.go +++ b/internal/aws/metrics/metric_calculator_test.go @@ -6,10 +6,12 @@ package metrics import ( "math/rand" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestFloat64RateCalculator(t *testing.T) { @@ -24,6 +26,7 @@ func TestFloat64RateCalculator(t *testing.T) { r, ok = c.Calculate(mKey, float64(100), nextTime) assert.True(t, ok) assert.InDelta(t, 0.5, r, 0.1) + require.NoError(t, c.Shutdown()) } func TestFloat64RateCalculatorWithTooFrequentUpdate(t *testing.T) { @@ -46,6 +49,7 @@ func TestFloat64RateCalculatorWithTooFrequentUpdate(t *testing.T) { r, ok = c.Calculate(mKey, float64(105), nextTime) assert.True(t, ok) assert.InDelta(t, 1, r, 0.1) + require.NoError(t, c.Shutdown()) } func newFloat64RateCalculator() MetricCalculator { @@ -76,6 +80,7 @@ func TestFloat64DeltaCalculator(t *testing.T) { assert.InDelta(t, f-testCases[i-1], r, f/10) } } + require.NoError(t, c.Shutdown()) } func TestFloat64DeltaCalculatorWithDecreasingValues(t *testing.T) { @@ -91,43 +96,55 @@ func TestFloat64DeltaCalculatorWithDecreasingValues(t *testing.T) { assert.Equal(t, testCases[i]-testCases[i-1], r) } } + require.NoError(t, c.Shutdown()) } func TestMapWithExpiryAdd(t *testing.T) { store := NewMapWithExpiry(time.Second) value1 := rand.Float64() + store.Lock() store.Set(Key{MetricMetadata: "key1"}, MetricValue{RawValue: value1}) val, ok := store.Get(Key{MetricMetadata: "key1"}) + store.Unlock() assert.Equal(t, true, ok) assert.Equal(t, value1, val.RawValue) + store.Lock() + defer store.Unlock() val, ok = store.Get(Key{MetricMetadata: "key2"}) assert.Equal(t, false, ok) assert.True(t, val == nil) + require.NoError(t, store.Shutdown()) } func TestMapWithExpiryCleanup(t *testing.T) { store := NewMapWithExpiry(time.Second) value1 := rand.Float64() + store.Lock() store.Set(Key{MetricMetadata: "key1"}, MetricValue{RawValue: value1, Timestamp: time.Now()}) - store.CleanUp(time.Now()) val, ok := store.Get(Key{MetricMetadata: "key1"}) + assert.Equal(t, true, ok) assert.Equal(t, value1, val.RawValue.(float64)) assert.Equal(t, 1, store.Size()) + store.Unlock() - time.Sleep(time.Second) - store.CleanUp(time.Now()) + time.Sleep(time.Second + time.Millisecond) + store.Lock() val, ok = store.Get(Key{MetricMetadata: "key1"}) assert.Equal(t, false, ok) assert.True(t, val == nil) assert.Equal(t, 0, store.Size()) + store.Unlock() + require.NoError(t, store.Shutdown()) } func TestMapWithExpiryConcurrency(t *testing.T) { store := NewMapWithExpiry(time.Second) + store.Lock() store.Set(Key{MetricMetadata: "sum"}, MetricValue{RawValue: 0}) + store.Unlock() var wg sync.WaitGroup wg.Add(2) @@ -159,6 +176,7 @@ func TestMapWithExpiryConcurrency(t *testing.T) { wg.Wait() sum, _ := store.Get(Key{MetricMetadata: "sum"}) assert.Equal(t, 0, sum.RawValue.(int)) + require.NoError(t, store.Shutdown()) } type mockKey struct { @@ -219,3 +237,39 @@ func TestMapKeyNotEqualOnName(t *testing.T) { }, labelMap2) assert.NotEqual(t, mKey1, mKey2) } + +func TestSweep(t *testing.T) { + sweepEvent := make(chan time.Time) + closed := &atomic.Bool{} + + onSweep := func(now time.Time) { + sweepEvent <- now + } + + mwe := &MapWithExpiry{ + ttl: 1 * time.Millisecond, + lock: &sync.Mutex{}, + doneChan: make(chan struct{}), + } + + start := time.Now() + go func() { + mwe.sweep(onSweep) + closed.Store(true) + close(sweepEvent) + }() + + for i := 1; i <= 2; i++ { + sweepTime := <-sweepEvent + tickTime := time.Since(start) + mwe.ttl*time.Duration(i) + require.False(t, closed.Load()) + assert.LessOrEqual(t, mwe.ttl, tickTime) + assert.LessOrEqual(t, time.Since(sweepTime), mwe.ttl) + } + require.NoError(t, mwe.Shutdown()) + for range sweepEvent { // nolint + } + if !closed.Load() { + t.Errorf("Sweeper did not terminate.") + } +} diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index 6aa3427085e7..6334309d7fa5 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -19,6 +19,7 @@ require ( go.opentelemetry.io/collector/consumer v0.82.0 go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 go.opentelemetry.io/collector/receiver v0.82.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.25.0 k8s.io/api v0.27.4 k8s.io/apimachinery v0.27.4 @@ -120,7 +121,6 @@ require ( go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/oauth2 v0.10.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go index 43471e84b0dd..cd3b022b865f 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go @@ -23,6 +23,7 @@ import ( "github.com/google/cadvisor/manager" "github.com/google/cadvisor/utils/sysfs" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" "go.uber.org/zap" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" @@ -110,6 +111,7 @@ type EcsInfo interface { type Decorator interface { Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric + Shutdown() error } type Cadvisor struct { @@ -164,6 +166,18 @@ func GetMetricsExtractors() []extractors.MetricExtractor { return metricsExtractors } +func (c *Cadvisor) Shutdown() error { + var errs error + for _, ext := range metricsExtractors { + errs = multierr.Append(errs, ext.Shutdown()) + } + + if c.k8sDecorator != nil { + errs = multierr.Append(errs, c.k8sDecorator.Shutdown()) + } + return errs +} + func (c *Cadvisor) addEbsVolumeInfo(tags map[string]string, ebsVolumeIdsUsedAsPV map[string]string) { deviceName, ok := tags[ci.DiskDev] if !ok { diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go index 48e8828374d5..8494572c6137 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go @@ -77,6 +77,10 @@ func (m *MockK8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extracto return metric } +func (m *MockK8sDecorator) Shutdown() error { + return nil +} + func TestGetMetrics(t *testing.T) { t.Setenv("HOST_NAME", "host") hostInfo := testutils.MockHostInfo{ClusterName: "cluster"} diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go index 224df6287594..4e7871dbb9f4 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go @@ -27,6 +27,7 @@ type Cadvisor struct { type Decorator interface { Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric + Shutdown() error } // Option is a function that can be used to configure Cadvisor struct @@ -54,3 +55,7 @@ func New(_ string, _ HostInfo, _ *zap.Logger, _ ...Option) (*Cadvisor, error) { func (c *Cadvisor) GetMetrics() []pmetric.Metrics { return []pmetric.Metrics{} } + +func (c *Cadvisor) Shutdown() error { + return nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go index 7b2381bb0727..e2b8851d6740 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go @@ -53,6 +53,10 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf return metrics } +func (c *CPUMetricExtractor) Shutdown() error { + return c.rateCalculator.Shutdown() +} + func NewCPUMetricExtractor(logger *zap.Logger) *CPUMetricExtractor { return &CPUMetricExtractor{ logger: logger, diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor_test.go index 6447df404801..11d16b83c831 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor_test.go @@ -6,6 +6,8 @@ package extractors import ( "testing" + "github.com/stretchr/testify/require" + . "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" ) @@ -36,6 +38,7 @@ func TestCPUStats(t *testing.T) { // test node type containerType = TypeNode + require.NoError(t, extractor.Shutdown()) extractor = NewCPUMetricExtractor(nil) if extractor.HasValue(result[0]) { @@ -54,6 +57,7 @@ func TestCPUStats(t *testing.T) { // test instance type containerType = TypeInstance + require.NoError(t, extractor.Shutdown()) extractor = NewCPUMetricExtractor(nil) if extractor.HasValue(result[0]) { @@ -69,4 +73,5 @@ func TestCPUStats(t *testing.T) { AssertContainsTaggedFloat(t, cMetrics[0], "instance_cpu_usage_system", 10, 0) AssertContainsTaggedFloat(t, cMetrics[0], "instance_cpu_utilization", 0.5, 0) AssertContainsTaggedInt(t, cMetrics[0], "instance_cpu_limit", 2000) + require.NoError(t, extractor.Shutdown()) } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go index 9bb0d1e9c92f..384acf0e5e4f 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go @@ -56,6 +56,10 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat return metrics } +func (d *DiskIOMetricExtractor) Shutdown() error { + return d.rateCalculator.Shutdown() +} + func ioMetricName(prefix, key string) string { return prefix + strings.ToLower(key) } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor_test.go index 67a0de417213..96bb73d022d2 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" . "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" @@ -52,6 +53,7 @@ func TestDiskIOStats(t *testing.T) { // for ecs node-level metrics containerType = TypeInstance + require.NoError(t, extractor.Shutdown()) extractor = NewDiskIOMetricExtractor(nil) if extractor.HasValue(result[0]) { @@ -85,8 +87,9 @@ func TestDiskIOStats(t *testing.T) { // for non supported type containerType = TypeContainerDiskIO + require.NoError(t, extractor.Shutdown()) extractor = NewDiskIOMetricExtractor(nil) - + defer require.NoError(t, extractor.Shutdown()) if extractor.HasValue(result[0]) { cMetrics = extractor.GetValue(result[0], nil, containerType) } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go index 81377adde71b..559ee5481f81 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go @@ -30,6 +30,7 @@ type CPUMemInfoProvider interface { type MetricExtractor interface { HasValue(*cinfo.ContainerInfo) bool GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInfoProvider, containerType string) []*CAdvisorMetric + Shutdown() error } type CAdvisorMetric struct { diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go index 559e4af41ec3..6e2f888b461b 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go @@ -63,6 +63,10 @@ func (f *FileSystemMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMem return metrics } +func (f *FileSystemMetricExtractor) Shutdown() error { + return nil +} + func NewFileSystemMetricExtractor(logger *zap.Logger) *FileSystemMetricExtractor { fse := &FileSystemMetricExtractor{ logger: logger, diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go index 40751481ac1a..e5205c3cc323 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go @@ -64,6 +64,10 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf return metrics } +func (m *MemMetricExtractor) Shutdown() error { + return m.rateCalculator.Shutdown() +} + func NewMemMetricExtractor(logger *zap.Logger) *MemMetricExtractor { return &MemMetricExtractor{ logger: logger, diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor_test.go index ca4052affecb..92ba85dbcc9c 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor_test.go @@ -6,6 +6,8 @@ package extractors import ( "testing" + "github.com/stretchr/testify/require" + . "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" ) @@ -43,6 +45,7 @@ func TestMemStats(t *testing.T) { // for node type containerType = TypeNode + require.NoError(t, extractor.Shutdown()) extractor = NewMemMetricExtractor(nil) if extractor.HasValue(result[0]) { @@ -71,6 +74,7 @@ func TestMemStats(t *testing.T) { // for instance type containerType = TypeInstance + require.NoError(t, extractor.Shutdown()) extractor = NewMemMetricExtractor(nil) if extractor.HasValue(result[0]) { @@ -96,5 +100,5 @@ func TestMemStats(t *testing.T) { AssertContainsTaggedFloat(t, cMetrics[0], "instance_memory_pgmajfault", 10, 0) AssertContainsTaggedFloat(t, cMetrics[0], "instance_memory_hierarchical_pgmajfault", 10, 0) AssertContainsTaggedFloat(t, cMetrics[0], "instance_memory_utilization", 2.68630981, 1.0e-8) - + require.NoError(t, extractor.Shutdown()) } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go index 3b66db8fe53a..35df5aea6b4c 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go @@ -91,6 +91,10 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro return metrics } +func (n *NetMetricExtractor) Shutdown() error { + return n.rateCalculator.Shutdown() +} + func NewNetMetricExtractor(logger *zap.Logger) *NetMetricExtractor { return &NetMetricExtractor{ logger: logger, diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor_test.go index 9766769b3444..6cf6ff341bc7 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" @@ -155,4 +156,5 @@ func TestNetStats(t *testing.T) { for i := range expectedFields { AssertContainsTaggedField(t, cMetrics[i], expectedFields[i], expectedTags[i]) } + require.NoError(t, extractor.Shutdown()) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 007c76101d41..6462c5cc59c8 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -238,10 +238,11 @@ func (k *K8sAPIServer) init() error { } // Shutdown stops the k8sApiServer -func (k *K8sAPIServer) Shutdown() { +func (k *K8sAPIServer) Shutdown() error { if k.cancel != nil { k.cancel() } + return nil } func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourcelock.Interface) { diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index cd2d0f3c6921..f5d36c93ddd8 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -222,7 +223,7 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { } } - k8sAPIServer.Shutdown() + require.NoError(t, k8sAPIServer.Shutdown()) } func TestK8sAPIServer_init(t *testing.T) { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index 53c410ac43b7..0fca19d55498 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "go.uber.org/multierr" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -59,6 +60,8 @@ type mapWithExpiry struct { } func (m *mapWithExpiry) Get(key string) (interface{}, bool) { + m.MapWithExpiry.Lock() + defer m.MapWithExpiry.Unlock() if val, ok := m.MapWithExpiry.Get(awsmetrics.NewKey(key, nil)); ok { return val.RawValue, ok } @@ -67,6 +70,8 @@ func (m *mapWithExpiry) Get(key string) (interface{}, bool) { } func (m *mapWithExpiry) Set(key string, content interface{}) { + m.MapWithExpiry.Lock() + defer m.MapWithExpiry.Unlock() val := awsmetrics.MetricValue{ RawValue: content, Timestamp: time.Now(), @@ -131,6 +136,17 @@ func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel return podStore, nil } +func (p *PodStore) Shutdown() error { + var errs error + errs = p.cache.Shutdown() + for _, maps := range p.prevMeasurements { + if prevMeasErr := maps.Shutdown(); prevMeasErr != nil { + errs = multierr.Append(errs, prevMeasErr) + } + } + return errs +} + func (p *PodStore) getPrevMeasurement(metricType, metricKey string) (interface{}, bool) { prevMeasurement, ok := p.prevMeasurements[metricType] if !ok { @@ -164,8 +180,6 @@ func (p *PodStore) RefreshTick(ctx context.Context) { now := time.Now() if now.Sub(p.lastRefreshed) >= refreshInterval { p.refresh(ctx, now) - // call cleanup every refresh cycle - p.cleanup(now) p.lastRefreshed = now } } @@ -239,16 +253,6 @@ func (p *PodStore) refresh(ctx context.Context, now time.Time) { p.refreshInternal(now, podList) } -func (p *PodStore) cleanup(now time.Time) { - for _, prevMeasurement := range p.prevMeasurements { - prevMeasurement.CleanUp(now) - } - - p.Lock() - defer p.Unlock() - p.cache.CleanUp(now) -} - func (p *PodStore) refreshInternal(now time.Time, podList []corev1.Pod) { var podCount int var containerCount int diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go index c54a03feefa0..447ceb91f12a 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -206,6 +207,7 @@ func generateMetric(fields map[string]interface{}, tags map[string]string) CIMet func TestPodStore_decorateCpu(t *testing.T) { podStore := getPodStore() + defer require.NoError(t, podStore.Shutdown()) pod := getBaseTestPodInfo() @@ -235,6 +237,7 @@ func TestPodStore_decorateCpu(t *testing.T) { func TestPodStore_decorateMem(t *testing.T) { podStore := getPodStore() + defer require.NoError(t, podStore.Shutdown()) pod := getBaseTestPodInfo() tags := map[string]string{ci.MetricType: ci.TypePod} @@ -285,6 +288,7 @@ func TestPodStore_addStatus(t *testing.T) { tags := map[string]string{ci.MetricType: ci.TypePod, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit"} fields := map[string]interface{}{ci.MetricName(ci.TypePod, ci.CPUTotal): float64(1)} podStore := getPodStore() + defer require.NoError(t, podStore.Shutdown()) metric := generateMetric(fields, tags) podStore.addStatus(metric, pod) @@ -575,8 +579,8 @@ func (m *mockPodClient) ListPods() ([]corev1.Pod, error) { } func TestPodStore_RefreshTick(t *testing.T) { - podStore := getPodStore() + defer require.NoError(t, podStore.Shutdown()) podStore.podClient = &mockPodClient{} podStore.lastRefreshed = time.Now().Add(-time.Minute) podStore.RefreshTick(context.Background()) @@ -591,8 +595,8 @@ func TestPodStore_RefreshTick(t *testing.T) { func TestPodStore_decorateNode(t *testing.T) { pod := getBaseTestPodInfo() podList := []corev1.Pod{*pod} - podStore := getPodStore() + defer require.NoError(t, podStore.Shutdown()) podStore.refreshInternal(time.Now(), podList) tags := map[string]string{ci.MetricType: ci.TypeNode} @@ -626,11 +630,12 @@ func TestPodStore_Decorate(t *testing.T) { metric := &mockCIMetric{ tags: tags, } - + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() podStore := getPodStore() + defer require.NoError(t, podStore.Shutdown()) podStore.podClient = &mockPodClient{} kubernetesBlob := map[string]interface{}{} - ctx := context.Background() ok := podStore.Decorate(ctx, metric, kubernetesBlob) assert.True(t, ok) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/store.go b/receiver/awscontainerinsightreceiver/internal/stores/store.go index 890d07e0a62e..2e4754660665 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/store.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/store.go @@ -40,6 +40,8 @@ type K8sDecorator struct { // The K8sStore (e.g. podstore) does network request in Decorate function, thus needs to take a context // object for canceling the request ctx context.Context + // the pod store needs to be saved here because the map it is stateful and needs to be shut down. + podStore *PodStore } func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, logger *zap.Logger) (*K8sDecorator, error) { @@ -53,9 +55,11 @@ func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, } podstore, err := NewPodStore(hostIP, prefFullPodName, addFullPodNameMetricLabel, logger) + if err != nil { return nil, err } + k.podStore = podstore k.stores = append(k.stores, podstore) if tagService { @@ -97,3 +101,7 @@ func (k *K8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.C TagMetricSource(metric) return metric } + +func (k *K8sDecorator) Shutdown() error { + return k.podStore.Shutdown() +} diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 0c3b01e3a6a2..fbde4b19c7f0 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" + "go.uber.org/multierr" "go.uber.org/zap" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" @@ -26,6 +27,7 @@ var _ receiver.Metrics = (*awsContainerInsightReceiver)(nil) type metricsProvider interface { GetMetrics() []pmetric.Metrics + Shutdown() error } // awsContainerInsightReceiver implements the receiver.Metrics @@ -125,7 +127,18 @@ func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error { return nil } acir.cancel() - return nil + + var errs error + + if acir.k8sapiserver != nil { + errs = multierr.Append(errs, acir.k8sapiserver.Shutdown()) + } + if acir.cadvisor != nil { + errs = multierr.Append(errs, acir.cadvisor.Shutdown()) + } + + return errs + } // collectData collects container stats from Amazon ECS Task Metadata Endpoint diff --git a/receiver/awscontainerinsightreceiver/receiver_test.go b/receiver/awscontainerinsightreceiver/receiver_test.go index 9aedcfa8c415..27686d194f1b 100644 --- a/receiver/awscontainerinsightreceiver/receiver_test.go +++ b/receiver/awscontainerinsightreceiver/receiver_test.go @@ -25,10 +25,18 @@ func (c *mockCadvisor) GetMetrics() []pmetric.Metrics { return []pmetric.Metrics{md} } +func (c *mockCadvisor) Shutdown() error { + return nil +} + // Mock k8sapiserver type mockK8sAPIServer struct { } +func (m *mockK8sAPIServer) Shutdown() error { + return nil +} + func (m *mockK8sAPIServer) GetMetrics() []pmetric.Metrics { md := pmetric.NewMetrics() return []pmetric.Metrics{md}