diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 964704164877..68d55e2ad62a 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -132,9 +132,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e for _, groupedMetric := range groupedMetrics { putLogEvent, err := translateGroupedMetricToEmf(groupedMetric, emf.config, defaultLogStream) - emf.config.logger.Info("putLogEvent", zap.Any("putLogEvent", putLogEvent)) if err != nil { - emf.config.logger.Error("error with translating grouped metric to emf") if errors.Is(err, errMissingMetricsForEnhancedContainerInsights) { emf.config.logger.Debug("Dropping empty putLogEvents for enhanced container insights", zap.Error(err)) continue @@ -154,21 +152,16 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e if emfPusher != nil { returnError := emfPusher.AddLogEntry(putLogEvent) if returnError != nil { - emf.config.logger.Error("could not get the emf pusher", zap.Error(returnError)) return wrapErrorIfBadRequest(returnError) } - emf.config.logger.Info("did not crash when adding the log entry!") } } } if strings.EqualFold(outputDestination, outputDestinationCloudWatch) { - for num, emfPusher := range emf.listPushers() { - emf.config.logger.Info("force flushing emfpusher #" + string(rune(num))) - emf.config.logger.Info("entity is ") + for _, emfPusher := range emf.listPushers() { returnError := emfPusher.ForceFlush() if returnError != nil { - emf.config.logger.Info("error with force flushing") // TODO now we only have one logPusher, so it's ok to return after first error occurred err := wrapErrorIfBadRequest(returnError) if err != nil { @@ -176,11 +169,10 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e } return err } - emf.config.logger.Info("succeeded in force flushing") } } - emf.config.logger.Info("Finish processing resource metrics", zap.Any("labels", labels)) + emf.config.logger.Debug("Finish processing resource metrics", zap.Any("labels", labels)) return nil } @@ -191,7 +183,6 @@ func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher { if _, ok = emf.pusherMap[hash]; !ok { emf.pusherMap[hash] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) } - emf.config.logger.Info("The hash is " + hash) return emf.pusherMap[hash] } diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index d3584b169c9b..02984e817f3d 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -6,13 +6,14 @@ package awsemfexporter import ( "context" "errors" - "github.com/aws/aws-sdk-go/aws" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/entity" "os" "testing" "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -34,18 +35,6 @@ func init() { os.Setenv("AWS_SECRET_ACCESS_KEY", "test") } -var entity = cloudwatchlogs.Entity{ - Attributes: map[string]*string{ - "PlatformType": aws.String("AWS::EC2"), - "InstanceId": aws.String("i-123456789"), - "AutoScalingGroup": aws.String("test-group"), - }, - KeyAttributes: map[string]*string{ - "Name": aws.String("myService"), - "Environment": aws.String("myEnvironment"), - }, -} - type mockPusher struct { mock.Mock } @@ -209,17 +198,12 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { streamKey := &cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: expCfg.LogStreamName, - Entity: &cloudwatchlogs.Entity{Attributes: map[string]*string{ - "PlatformType": nil, - "InstanceId": nil, - "AutoScalingGroup": nil, - }, KeyAttributes: map[string]*string{ - "Name": nil, - "Environment": nil, + Entity: &cloudwatchlogs.Entity{KeyAttributes: map[string]*string{ + entity.EntityType: aws.String(entity.Service), }}, } - streamKeyHash := streamKey.Hash() - pusherMap, ok := exp.pusherMap[streamKeyHash] + expectedStreamKeyHash := streamKey.Hash() + pusherMap, ok := exp.pusherMap[expectedStreamKeyHash] assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -241,8 +225,11 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, resourceAttributeMap: map[string]any{ - "aws.ecs.cluster.name": "test-cluster-name", - "aws.ecs.task.id": "test-task-id", + "aws.ecs.cluster.name": "test-cluster-name", + "aws.ecs.task.id": "test-task-id", + entity.KeyAttributeEntityServiceName: "myService", + entity.KeyAttributeEntityDeploymentEnvironment: "myEnvironment", + entity.AttributeEntityCluster: "test-cluster-name", }, }) require.Error(t, exp.pushMetricsData(ctx, md)) @@ -250,14 +237,15 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { streamKey := &cwlogs.StreamKey{ LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance", LogStreamName: "test-task-id", - Entity: &cloudwatchlogs.Entity{Attributes: map[string]*string{ - "PlatformType": nil, - "InstanceId": nil, - "AutoScalingGroup": nil, - }, KeyAttributes: map[string]*string{ - "Name": nil, - "Environment": nil, - }}, + Entity: &cloudwatchlogs.Entity{ + Attributes: map[string]*string{ + "Cluster": aws.String("test-cluster-name"), + }, + KeyAttributes: map[string]*string{ + "Type": aws.String(entity.Service), + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + }}, } pusherMap, ok := exp.pusherMap[streamKey.Hash()] assert.True(t, ok) @@ -277,12 +265,8 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, exp) var entity = &cloudwatchlogs.Entity{ - Attributes: map[string]*string{ - "PlatformType": aws.String("AWS::EC2"), - "AutoScalingGroup": aws.String("test-group"), - "InstanceId": aws.String("i-123456789"), - }, KeyAttributes: map[string]*string{ + "Type": aws.String(entity.Service), "Name": aws.String("myService"), "Environment": aws.String("myEnvironment"), }, @@ -292,10 +276,10 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, resourceAttributeMap: map[string]any{ - "aws.ecs.cluster.name": "test-cluster-name", - "aws.ecs.task.id": "test-task-id", - keyAttributeEntityServiceName: "myService", - keyAttributeEntityDeploymentEnvironment: "myEnvironment", + "aws.ecs.cluster.name": "test-cluster-name", + "aws.ecs.task.id": "test-task-id", + entity.KeyAttributeEntityServiceName: "myService", + entity.KeyAttributeEntityDeploymentEnvironment: "myEnvironment", }, }) require.Error(t, exp.pushMetricsData(ctx, md)) @@ -327,8 +311,10 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, resourceAttributeMap: map[string]any{ - "aws.ecs.cluster.name": "test-cluster-name", - "aws.ecs.task.id": "test-task-id", + "aws.ecs.cluster.name": "test-cluster-name", + "aws.ecs.task.id": "test-task-id", + entity.KeyAttributeEntityServiceName: "myService", + entity.KeyAttributeEntityDeploymentEnvironment: "myEnvironment", }, }) require.Error(t, exp.pushMetricsData(ctx, md)) @@ -336,14 +322,13 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { streamKey := cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: expCfg.LogStreamName, - Entity: &cloudwatchlogs.Entity{Attributes: map[string]*string{ - "PlatformType": nil, - "InstanceId": nil, - "AutoScalingGroup": nil, - }, KeyAttributes: map[string]*string{ - "Name": nil, - "Environment": nil, - }}, + Entity: &cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{ + "Type": aws.String(entity.Service), + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + }, + }, } pusherMap, ok := exp.pusherMap[streamKey.Hash()] assert.True(t, ok) @@ -373,6 +358,11 @@ func TestPushMetricsDataWithErr(t *testing.T) { streamKey := cwlogs.StreamKey{ LogGroupName: "test-logGroupName", LogStreamName: "test-logStreamName", + Entity: &cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{ + "Type": aws.String(entity.Service), + }, + }, } exp.pusherMap[streamKey.Hash()] = logPusher diff --git a/exporter/awsemfexporter/grouped_metric.go b/exporter/awsemfexporter/grouped_metric.go index 37cfa94a311e..c95186f88129 100644 --- a/exporter/awsemfexporter/grouped_metric.go +++ b/exporter/awsemfexporter/grouped_metric.go @@ -38,7 +38,6 @@ func addToGroupedMetric( ) error { dps := getDataPoints(pmd, metadata, config.logger) - config.logger.Info("got datapoints") if dps == nil || dps.Len() == 0 { return nil } @@ -54,14 +53,12 @@ func addToGroupedMetric( continue } dps, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators) - config.logger.Info("Calculated Delta Datapoints") if !retained { continue } config.logger.Info("Retained") for _, dp := range dps { - config.logger.Info("Traversing through datapoints") labels := dp.labels if metricType, ok := labels["Type"]; ok { @@ -73,7 +70,6 @@ func addToGroupedMetric( // if patterns were found in config file and weren't replaced by resource attributes, replace those patterns with metric labels. // if patterns are provided for a valid key and that key doesn't exist in the resource attributes, it is replaced with `undefined`. if !patternReplaceSucceeded { - config.logger.Info("patternReplaceSucceeded is false") if strings.Contains(metadata.logGroup, "undefined") { metadata.logGroup, _ = replacePatterns(config.LogGroupName, labels, config.logger) } @@ -81,7 +77,6 @@ func addToGroupedMetric( metadata.logStream, _ = replacePatterns(config.LogStreamName, labels, config.logger) } } - config.logger.Info("replaced the patterns for lg and ls") metric := &metricInfo{ value: dp.value, @@ -96,7 +91,6 @@ func addToGroupedMetric( // Extra params to use when grouping metrics groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels) - //config.logger.Info("entity in metadata key:" + groupKey.MetricMetadata.(cWMetricMetadata).entity.GoString()) if _, ok := groupedMetrics[groupKey]; ok { // if MetricName already exists in metrics map, print warning log if _, ok := groupedMetrics[groupKey].metrics[dp.name]; ok { diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index d21bcb0b74cf..e54abb078258 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -7,18 +7,18 @@ import ( "encoding/json" "errors" "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs" - "go.opentelemetry.io/collector/pdata/pcommon" "reflect" "strings" "time" + "github.com/aws/aws-sdk-go/aws" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs" awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" ) @@ -172,9 +172,7 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri } } - config.logger.Info("resourceAttributes", zap.Any("resourceAttributes", resourceAttributes.AsRaw())) entity, resourceAttributes := fetchEntityFields(resourceAttributes) - config.logger.Info("fetched entity:" + entity.GoString()) for j := 0; j < ilms.Len(); j++ { ilm := ilms.At(j) @@ -198,10 +196,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri instrumentationScopeName: instrumentationScopeName, receiver: metricReceiver, } - config.logger.Info("entity after metadata creation:" + metadata.groupedMetricMetadata.entity.GoString()) err := addToGroupedMetric(metric, groupedMetrics, metadata, patternReplaceSucceeded, mt.metricDescriptor, config, mt.calculators) if err != nil { - config.logger.Info("error with adding to grouped metric:" + err.Error()) return err } } @@ -210,6 +206,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri } func fetchEntityFields(resourceAttributes pcommon.Map) (cloudwatchlogs.Entity, pcommon.Map) { + //the original resourceAttributes map is immutable, so we need to create a mutable copy + //to remove the entity fields from the attributes mutableResourceAttributes := pcommon.NewMap() resourceAttributes.CopyTo(mutableResourceAttributes) serviceKeyAttr := map[string]*string{ @@ -440,7 +438,7 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) (*cwlogs.Event, var f any err := json.Unmarshal([]byte(val), &f) if err != nil { - config.logger.Info( + config.logger.Debug( "Failed to parse json-encoded string", zap.String("label key", key), zap.String("label value", val), @@ -555,7 +553,6 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d cWMetric := translateGroupedMetricToCWMetric(groupedMetric, config) event, err := translateCWMetricToEMF(cWMetric, config) if err != nil { - config.logger.Info("error with translating CW Metric to EMF", zap.Error(err)) return nil, err } // Drop a nil putLogEvent for EnhancedContainerInsights @@ -577,14 +574,3 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d return event, nil } - -func processAttributes(entityMap map[string]string, targetMap map[string]*string, mutableResourceAttributes pcommon.Map) { - for entityField, shortName := range entityMap { - if val, ok := mutableResourceAttributes.Get(entityField); ok { - if strVal := val.Str(); strVal != "" { - targetMap[shortName] = aws.String(strVal) - } - mutableResourceAttributes.Remove(entityField) - } - } -} diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 60479c88a769..867fa727ad03 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -6,6 +6,7 @@ package awsemfexporter import ( "errors" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/entity" "math" "reflect" "sort" @@ -39,7 +40,7 @@ func createTestResourceMetricsHelper(numMetrics int) pmetric.ResourceMetrics { rm.Resource().Attributes().PutStr("ClusterName", "myCluster") rm.Resource().Attributes().PutStr("PodName", "myPod") rm.Resource().Attributes().PutStr(attributeReceiver, prometheusReceiver) - rm.Resource().Attributes().PutStr(keyAttributeEntityServiceName, "myServiceName") + rm.Resource().Attributes().PutStr(entity.KeyAttributeEntityServiceName, "myServiceName") sm := rm.ScopeMetrics().AppendEmpty() m := sm.Metrics().AppendEmpty() diff --git a/exporter/awsemfexporter/util.go b/exporter/awsemfexporter/util.go index a820fd4d1ed6..1fadc1ea493e 100644 --- a/exporter/awsemfexporter/util.go +++ b/exporter/awsemfexporter/util.go @@ -5,6 +5,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "fmt" + "github.com/aws/aws-sdk-go/aws" "sort" "strings" "time" @@ -172,3 +173,14 @@ func attrMaptoStringMap(attrMap pcommon.Map) map[string]string { }) return strMap } + +func processAttributes(entityMap map[string]string, targetMap map[string]*string, mutableResourceAttributes pcommon.Map) { + for entityField, shortName := range entityMap { + if val, ok := mutableResourceAttributes.Get(entityField); ok { + if strVal := val.Str(); strVal != "" { + targetMap[shortName] = aws.String(strVal) + } + mutableResourceAttributes.Remove(entityField) + } + } +} diff --git a/exporter/awsemfexporter/util_test.go b/exporter/awsemfexporter/util_test.go index dc5a3dc3735b..bfc70aca783b 100644 --- a/exporter/awsemfexporter/util_test.go +++ b/exporter/awsemfexporter/util_test.go @@ -366,3 +366,39 @@ func TestGetLogInfo(t *testing.T) { } } + +func TestProcessAttributes(t *testing.T) { + testCases := []struct { + name string + key string + value string + expected string + }{ + { + name: "Non-empty string value", + key: "testKey", + value: "testValue", + expected: "testValue", + }, + { + name: "Empty string value", + key: "emptyKey", + value: "", + expected: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + attrs := pcommon.NewMap() + attrs.PutStr(tc.key, tc.value) + + processAttributes(make(map[string]string), make(map[string]*string), attrs) + + val, ok := attrs.Get(tc.key) + assert.True(t, ok, "Key should exist in the map") + actualVal := val.Str() + assert.Equal(t, tc.expected, actualVal, "Value should match the expected value") + }) + } +} diff --git a/internal/aws/cwlogs/pusher.go b/internal/aws/cwlogs/pusher.go index ea2da8f02f31..039ee6dd68a4 100644 --- a/internal/aws/cwlogs/pusher.go +++ b/internal/aws/cwlogs/pusher.go @@ -136,12 +136,7 @@ type eventBatch struct { } // Create a new log event batch if needed. -func newEventBatch(key StreamKey, logger *zap.Logger) *eventBatch { - logger.Info("in newEventBatch function") - logger.Info("logGroupName: " + key.LogGroupName) - logger.Info("logStreamName: " + key.LogStreamName) - logger.Info("p.entity type: " + fmt.Sprintf("%T", key.Entity)) - logger.Info("entity: " + key.Entity.GoString()) +func newEventBatch(key StreamKey) *eventBatch { return &eventBatch{ putLogEventsInput: &cloudwatchlogs.PutLogEventsInput{ @@ -252,7 +247,7 @@ func newLogPusher(streamKey StreamKey, svcStructuredLog: svcStructuredLog, logger: logger, } - pusher.logEventBatch = newEventBatch(streamKey, logger) + pusher.logEventBatch = newEventBatch(streamKey) return pusher } @@ -279,9 +274,7 @@ func (p *logPusher) AddLogEntry(logEvent *Event) error { } func (p *logPusher) ForceFlush() error { - p.logger.Info("in force flush method for the log pusher") prevBatch := p.renewEventBatch() - p.logger.Info("renewedEventBatch. The entity here is " + prevBatch.putLogEventsInput.Entity.GoString()) if prevBatch != nil { return p.pushEventBatch(prevBatch) } @@ -328,7 +321,7 @@ func (p *logPusher) addLogEvent(logEvent *Event) *eventBatch { LogGroupName: *p.logGroupName, LogStreamName: *p.logStreamName, Entity: p.entity, - }, p.logger) + }) } currentBatch.append(logEvent) p.logEventBatch = currentBatch @@ -339,17 +332,13 @@ func (p *logPusher) addLogEvent(logEvent *Event) *eventBatch { func (p *logPusher) renewEventBatch() *eventBatch { var prevBatch *eventBatch - p.logger.Info("renewing EventBatch, just before the if statement") if len(p.logEventBatch.putLogEventsInput.LogEvents) > 0 { - p.logger.Info("renewing EventBatch. The entity here is " + p.entity.GoString()) - p.logger.Info("p.entity type: " + fmt.Sprintf("%T", p.entity)) prevBatch = p.logEventBatch p.logEventBatch = newEventBatch(StreamKey{ LogGroupName: *p.logGroupName, LogStreamName: *p.logStreamName, Entity: p.entity, - }, p.logger) - p.logger.Info("renewed EventBatch. The entity here is " + p.logEventBatch.putLogEventsInput.Entity.GoString()) + }) } return prevBatch diff --git a/internal/aws/cwlogs/pusher_test.go b/internal/aws/cwlogs/pusher_test.go index df32a762142b..4aaab00dbef4 100644 --- a/internal/aws/cwlogs/pusher_test.go +++ b/internal/aws/cwlogs/pusher_test.go @@ -132,7 +132,7 @@ func TestPusher_newLogEventBatch(t *testing.T) { LogGroupName: logGroup, LogStreamName: logStreamName, Entity: &entity, - }, zap.NewExample()) + }) assert.Equal(t, int64(0), logEventBatch.maxTimestampMs) assert.Equal(t, int64(0), logEventBatch.minTimestampMs) assert.Equal(t, 0, logEventBatch.byteTotal)