Skip to content

Commit

Permalink
removes log lines, fixes unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dchappa committed Sep 9, 2024
1 parent c87246b commit dc69644
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 106 deletions.
13 changes: 2 additions & 11 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e

for _, groupedMetric := range groupedMetrics {
putLogEvent, err := translateGroupedMetricToEmf(groupedMetric, emf.config, defaultLogStream)
emf.config.logger.Info("putLogEvent", zap.Any("putLogEvent", putLogEvent))
if err != nil {
emf.config.logger.Error("error with translating grouped metric to emf")
if errors.Is(err, errMissingMetricsForEnhancedContainerInsights) {
emf.config.logger.Debug("Dropping empty putLogEvents for enhanced container insights", zap.Error(err))
continue
Expand All @@ -154,33 +152,27 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
if emfPusher != nil {
returnError := emfPusher.AddLogEntry(putLogEvent)
if returnError != nil {
emf.config.logger.Error("could not get the emf pusher", zap.Error(returnError))
return wrapErrorIfBadRequest(returnError)
}
emf.config.logger.Info("did not crash when adding the log entry!")
}
}
}

if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
for num, emfPusher := range emf.listPushers() {
emf.config.logger.Info("force flushing emfpusher #" + string(rune(num)))
emf.config.logger.Info("entity is ")
for _, emfPusher := range emf.listPushers() {
returnError := emfPusher.ForceFlush()
if returnError != nil {
emf.config.logger.Info("error with force flushing")
// TODO now we only have one logPusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(returnError)
if err != nil {
emf.config.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err))
}
return err
}
emf.config.logger.Info("succeeded in force flushing")
}
}

emf.config.logger.Info("Finish processing resource metrics", zap.Any("labels", labels))
emf.config.logger.Debug("Finish processing resource metrics", zap.Any("labels", labels))

return nil
}
Expand All @@ -191,7 +183,6 @@ func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
if _, ok = emf.pusherMap[hash]; !ok {
emf.pusherMap[hash] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
}
emf.config.logger.Info("The hash is " + hash)
return emf.pusherMap[hash]
}

Expand Down
94 changes: 42 additions & 52 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package awsemfexporter
import (
"context"
"errors"
"github.com/aws/aws-sdk-go/aws"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/entity"
"os"
"testing"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -34,18 +35,6 @@ func init() {
os.Setenv("AWS_SECRET_ACCESS_KEY", "test")
}

var entity = cloudwatchlogs.Entity{
Attributes: map[string]*string{
"PlatformType": aws.String("AWS::EC2"),
"InstanceId": aws.String("i-123456789"),
"AutoScalingGroup": aws.String("test-group"),
},
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
}

type mockPusher struct {
mock.Mock
}
Expand Down Expand Up @@ -209,17 +198,12 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
streamKey := &cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
Entity: &cloudwatchlogs.Entity{Attributes: map[string]*string{
"PlatformType": nil,
"InstanceId": nil,
"AutoScalingGroup": nil,
}, KeyAttributes: map[string]*string{
"Name": nil,
"Environment": nil,
Entity: &cloudwatchlogs.Entity{KeyAttributes: map[string]*string{
entity.EntityType: aws.String(entity.Service),
}},
}
streamKeyHash := streamKey.Hash()
pusherMap, ok := exp.pusherMap[streamKeyHash]
expectedStreamKeyHash := streamKey.Hash()
pusherMap, ok := exp.pusherMap[expectedStreamKeyHash]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -241,23 +225,27 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
entity.KeyAttributeEntityServiceName: "myService",
entity.KeyAttributeEntityDeploymentEnvironment: "myEnvironment",
entity.AttributeEntityCluster: "test-cluster-name",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
streamKey := &cwlogs.StreamKey{
LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
LogStreamName: "test-task-id",
Entity: &cloudwatchlogs.Entity{Attributes: map[string]*string{
"PlatformType": nil,
"InstanceId": nil,
"AutoScalingGroup": nil,
}, KeyAttributes: map[string]*string{
"Name": nil,
"Environment": nil,
}},
Entity: &cloudwatchlogs.Entity{
Attributes: map[string]*string{
"Cluster": aws.String("test-cluster-name"),
},
KeyAttributes: map[string]*string{
"Type": aws.String(entity.Service),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
}},
}
pusherMap, ok := exp.pusherMap[streamKey.Hash()]
assert.True(t, ok)
Expand All @@ -277,12 +265,8 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, exp)
var entity = &cloudwatchlogs.Entity{
Attributes: map[string]*string{
"PlatformType": aws.String("AWS::EC2"),
"AutoScalingGroup": aws.String("test-group"),
"InstanceId": aws.String("i-123456789"),
},
KeyAttributes: map[string]*string{
"Type": aws.String(entity.Service),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Expand All @@ -292,10 +276,10 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
entity.KeyAttributeEntityServiceName: "myService",
entity.KeyAttributeEntityDeploymentEnvironment: "myEnvironment",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
Expand Down Expand Up @@ -327,23 +311,24 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
entity.KeyAttributeEntityServiceName: "myService",
entity.KeyAttributeEntityDeploymentEnvironment: "myEnvironment",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
streamKey := cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
Entity: &cloudwatchlogs.Entity{Attributes: map[string]*string{
"PlatformType": nil,
"InstanceId": nil,
"AutoScalingGroup": nil,
}, KeyAttributes: map[string]*string{
"Name": nil,
"Environment": nil,
}},
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String(entity.Service),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
},
}
pusherMap, ok := exp.pusherMap[streamKey.Hash()]
assert.True(t, ok)
Expand Down Expand Up @@ -373,6 +358,11 @@ func TestPushMetricsDataWithErr(t *testing.T) {
streamKey := cwlogs.StreamKey{
LogGroupName: "test-logGroupName",
LogStreamName: "test-logStreamName",
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String(entity.Service),
},
},
}
exp.pusherMap[streamKey.Hash()] = logPusher

Expand Down
6 changes: 0 additions & 6 deletions exporter/awsemfexporter/grouped_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func addToGroupedMetric(
) error {

dps := getDataPoints(pmd, metadata, config.logger)
config.logger.Info("got datapoints")
if dps == nil || dps.Len() == 0 {
return nil
}
Expand All @@ -54,14 +53,12 @@ func addToGroupedMetric(
continue
}
dps, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators)
config.logger.Info("Calculated Delta Datapoints")
if !retained {
continue
}
config.logger.Info("Retained")

for _, dp := range dps {
config.logger.Info("Traversing through datapoints")
labels := dp.labels

if metricType, ok := labels["Type"]; ok {
Expand All @@ -73,15 +70,13 @@ func addToGroupedMetric(
// if patterns were found in config file and weren't replaced by resource attributes, replace those patterns with metric labels.
// if patterns are provided for a valid key and that key doesn't exist in the resource attributes, it is replaced with `undefined`.
if !patternReplaceSucceeded {
config.logger.Info("patternReplaceSucceeded is false")
if strings.Contains(metadata.logGroup, "undefined") {
metadata.logGroup, _ = replacePatterns(config.LogGroupName, labels, config.logger)
}
if strings.Contains(metadata.logStream, "undefined") {
metadata.logStream, _ = replacePatterns(config.LogStreamName, labels, config.logger)
}
}
config.logger.Info("replaced the patterns for lg and ls")

metric := &metricInfo{
value: dp.value,
Expand All @@ -96,7 +91,6 @@ func addToGroupedMetric(

// Extra params to use when grouping metrics
groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels)
//config.logger.Info("entity in metadata key:" + groupKey.MetricMetadata.(cWMetricMetadata).entity.GoString())
if _, ok := groupedMetrics[groupKey]; ok {
// if MetricName already exists in metrics map, print warning log
if _, ok := groupedMetrics[groupKey].metrics[dp.name]; ok {
Expand Down
26 changes: 6 additions & 20 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
"go.opentelemetry.io/collector/pdata/pcommon"
"reflect"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)

Expand Down Expand Up @@ -172,9 +172,7 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
}
}

config.logger.Info("resourceAttributes", zap.Any("resourceAttributes", resourceAttributes.AsRaw()))
entity, resourceAttributes := fetchEntityFields(resourceAttributes)
config.logger.Info("fetched entity:" + entity.GoString())

for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
Expand All @@ -198,10 +196,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
instrumentationScopeName: instrumentationScopeName,
receiver: metricReceiver,
}
config.logger.Info("entity after metadata creation:" + metadata.groupedMetricMetadata.entity.GoString())
err := addToGroupedMetric(metric, groupedMetrics, metadata, patternReplaceSucceeded, mt.metricDescriptor, config, mt.calculators)
if err != nil {
config.logger.Info("error with adding to grouped metric:" + err.Error())
return err
}
}
Expand All @@ -210,6 +206,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
}

func fetchEntityFields(resourceAttributes pcommon.Map) (cloudwatchlogs.Entity, pcommon.Map) {
//the original resourceAttributes map is immutable, so we need to create a mutable copy
//to remove the entity fields from the attributes
mutableResourceAttributes := pcommon.NewMap()
resourceAttributes.CopyTo(mutableResourceAttributes)
serviceKeyAttr := map[string]*string{
Expand Down Expand Up @@ -440,7 +438,7 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) (*cwlogs.Event,
var f any
err := json.Unmarshal([]byte(val), &f)
if err != nil {
config.logger.Info(
config.logger.Debug(
"Failed to parse json-encoded string",
zap.String("label key", key),
zap.String("label value", val),
Expand Down Expand Up @@ -555,7 +553,6 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, config)
event, err := translateCWMetricToEMF(cWMetric, config)
if err != nil {
config.logger.Info("error with translating CW Metric to EMF", zap.Error(err))
return nil, err
}
// Drop a nil putLogEvent for EnhancedContainerInsights
Expand All @@ -577,14 +574,3 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d

return event, nil
}

func processAttributes(entityMap map[string]string, targetMap map[string]*string, mutableResourceAttributes pcommon.Map) {
for entityField, shortName := range entityMap {
if val, ok := mutableResourceAttributes.Get(entityField); ok {
if strVal := val.Str(); strVal != "" {
targetMap[shortName] = aws.String(strVal)
}
mutableResourceAttributes.Remove(entityField)
}
}
}
3 changes: 2 additions & 1 deletion exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package awsemfexporter
import (
"errors"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/entity"
"math"
"reflect"
"sort"
Expand Down Expand Up @@ -39,7 +40,7 @@ func createTestResourceMetricsHelper(numMetrics int) pmetric.ResourceMetrics {
rm.Resource().Attributes().PutStr("ClusterName", "myCluster")
rm.Resource().Attributes().PutStr("PodName", "myPod")
rm.Resource().Attributes().PutStr(attributeReceiver, prometheusReceiver)
rm.Resource().Attributes().PutStr(keyAttributeEntityServiceName, "myServiceName")
rm.Resource().Attributes().PutStr(entity.KeyAttributeEntityServiceName, "myServiceName")
sm := rm.ScopeMetrics().AppendEmpty()

m := sm.Metrics().AppendEmpty()
Expand Down
12 changes: 12 additions & 0 deletions exporter/awsemfexporter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -172,3 +173,14 @@ func attrMaptoStringMap(attrMap pcommon.Map) map[string]string {
})
return strMap
}

func processAttributes(entityMap map[string]string, targetMap map[string]*string, mutableResourceAttributes pcommon.Map) {
for entityField, shortName := range entityMap {
if val, ok := mutableResourceAttributes.Get(entityField); ok {
if strVal := val.Str(); strVal != "" {
targetMap[shortName] = aws.String(strVal)
}
mutableResourceAttributes.Remove(entityField)
}
}
}
Loading

0 comments on commit dc69644

Please sign in to comment.