From 132b116d2f5e63574861d91f8be2ea5d5746779f Mon Sep 17 00:00:00 2001 From: Khanh Nguyen <91758108+khanhntd@users.noreply.github.com> Date: Tue, 25 Apr 2023 12:36:32 -0400 Subject: [PATCH] Add support AWS Embedded Metric Format Version 0 (#20314) Adding support for EMF version v0 and a flag for customers to make a difference between Version 0 and Version 1 (e.g EMF v0) --- .chloggen/add-support-emf-version-0.yaml | 16 +++ exporter/awsemfexporter/README.md | 7 +- exporter/awsemfexporter/config.go | 12 +- exporter/awsemfexporter/config_test.go | 3 + exporter/awsemfexporter/emf_exporter.go | 76 ++++------- exporter/awsemfexporter/emf_exporter_test.go | 27 ++-- exporter/awsemfexporter/factory.go | 26 +++- exporter/awsemfexporter/metric_declaration.go | 5 +- exporter/awsemfexporter/metric_translator.go | 50 +++++++- .../awsemfexporter/metric_translator_test.go | 120 ++++++++++-------- exporter/awsemfexporter/testdata/config.yaml | 3 + .../testdata/testTranslateCWMetricToEMF.json | 1 - 12 files changed, 205 insertions(+), 141 deletions(-) create mode 100644 .chloggen/add-support-emf-version-0.yaml delete mode 100644 exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json diff --git a/.chloggen/add-support-emf-version-0.yaml b/.chloggen/add-support-emf-version-0.yaml new file mode 100644 index 000000000000..242b8c50ee3c --- /dev/null +++ b/.chloggen/add-support-emf-version-0.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsemfexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support AWS Embedded Metric Format Version 0 + +# One or more tracking issues related to the change +issues: [20314] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index 3bbf632f8e00..02e9e4d77db7 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -32,12 +32,13 @@ The following exporter configuration parameters are supported. | `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | | `dimension_rollup_option` | DimensionRollupOption is the option for metrics dimension rollup. Three options are available: `NoDimensionRollup`, `SingleDimensionRollupOnly` and `ZeroAndSingleDimensionRollup` | "ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup) | | `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | -| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | -| `detailed_metrics` | Retain detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` | +| `output_destination` | Specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | +| `detailed_metrics` | Retain detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` | +| `version` | Send metrics to CloudWatchLogs with Embedded Metric Format in selected version [(e.g version 1 with _aws)](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure), version 0 without _aws) | `1` | | `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}``` | [ ] | | [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | | [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors. | [ ] | -| `retain_initial_value_of_delta_metric` | This option specifies how the first value of a metric is handled. AWS EMF expects metric values to only contain deltas to the previous value. In the default case the first received value is therefor not sent to AWS but only used as a baseline for follow up changes to this metric. This is fine for high throughput metrics with stable labels (e.g. `requests{code=200}`). In this case it does not matter if the first value of this metric is discarded. However when your metric describes infrequent events or events with high label cardinality, then the exporter in default configuration would still drop the first occurrence of this metric. With this configuration value set to `true` the first value of all metrics will instead be send to AWS. | false | +| `retain_initial_value_of_delta_metric` | Specify how the first value of a metric is handled. AWS EMF expects metric values to only contain deltas to the previous value. In the default case the first received value is therefor not sent to AWS but only used as a baseline for follow up changes to this metric. This is fine for high throughput metrics with stable labels (e.g. `requests{code=200}`). In this case it does not matter if the first value of this metric is discarded. However when your metric describes infrequent events or events with high label cardinality, then the exporter in default configuration would still drop the first occurrence of this metric. With this configuration value set to `true` the first value of all metrics will instead be send to AWS. | false | ### metric_declaration A metric_declaration section characterizes a rule to be used to set dimensions for exported metrics, filtered by the incoming metrics' labels and metric names. diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index 24ff3b8ebc1f..be1ac32bb50f 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -17,6 +17,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "errors" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" @@ -79,15 +80,20 @@ type Config struct { // Note that at the moment in order to use this feature the value "kubernetes" must also be added to the ParseJSONEncodedAttributeValues array in order to be used EKSFargateContainerInsightsEnabled bool `mapstructure:"eks_fargate_container_insights_enabled"` - // ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes. + // ResourceToTelemetrySettings is an option for converting resource attrihutes to telemetry attributes. // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"` - // DetailedMetrics is the options for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, + // DetailedMetrics is an option for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, // preserve the quantile's population) DetailedMetrics bool `mapstructure:"detailed_metrics"` + // Version is an option for sending metrics to CloudWatchLogs with Embedded Metric Format in selected version (with "_aws") + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure + // Otherwise, sending metrics as Embedded Metric Format version 0 (without "_aws") + Version string `mapstructure:"version"` + // logger is the Logger used for writing error/warning logs logger *zap.Logger } @@ -102,6 +108,8 @@ type MetricDescriptor struct { Overwrite bool `mapstructure:"overwrite"` } +var _ component.Config = (*Config)(nil) + // Validate filters out invalid metricDeclarations and metricDescriptors func (config *Config) Validate() error { var validDeclarations []*MetricDeclaration diff --git a/exporter/awsemfexporter/config_test.go b/exporter/awsemfexporter/config_test.go index dc8475c9b8cc..3ddccdc88e39 100644 --- a/exporter/awsemfexporter/config_test.go +++ b/exporter/awsemfexporter/config_test.go @@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) { LogStreamName: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", OutputDestination: "cloudwatch", + Version: "1", logger: zap.NewNop(), }, }, @@ -79,6 +80,7 @@ func TestLoadConfig(t *testing.T) { LogStreamName: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", OutputDestination: "cloudwatch", + Version: "1", ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, logger: zap.NewNop(), }, @@ -100,6 +102,7 @@ func TestLoadConfig(t *testing.T) { LogStreamName: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", OutputDestination: "cloudwatch", + Version: "1", MetricDescriptors: []MetricDescriptor{{ MetricName: "memcached_current_items", Unit: "Count", diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index aeaa03ad32bb..0ecb5079438d 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -23,18 +23,14 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/google/uuid" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) const ( @@ -46,8 +42,7 @@ const ( type emfExporter struct { pusherMap map[cwlogs.PusherKey]cwlogs.Pusher svcStructuredLog *cwlogs.Client - config component.Config - logger *zap.Logger + config *Config metricTranslator metricTranslator @@ -56,66 +51,40 @@ type emfExporter struct { collectorID string } -// newEmfPusher func creates an EMF Exporter instance with data push callback func -func newEmfPusher( - config component.Config, - params exporter.CreateSettings, -) (*emfExporter, error) { +// newEmfExporter creates a new exporter using exporterhelper +func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter, error) { if config == nil { return nil, errors.New("emf exporter config is nil") } - logger := params.Logger - expConfig := config.(*Config) - expConfig.logger = logger + config.logger = set.Logger // create AWS session - awsConfig, session, err := awsutil.GetAWSConfigSession(logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings) + awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings) if err != nil { return nil, err } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, session) - collectorIdentifier, _ := uuid.NewRandom() + svcStructuredLog := cwlogs.NewClient(set.Logger, awsConfig, set.BuildInfo, config.LogGroupName, config.LogRetention, session) + collectorIdentifier, err := uuid.NewRandom() + + if err != nil { + return nil, err + } emfExporter := &emfExporter{ svcStructuredLog: svcStructuredLog, config: config, - metricTranslator: newMetricTranslator(*expConfig), + metricTranslator: newMetricTranslator(*config), retryCnt: *awsConfig.MaxRetries, - logger: logger, collectorID: collectorIdentifier.String(), + pusherMap: map[cwlogs.PusherKey]cwlogs.Pusher{}, } - emfExporter.pusherMap = map[cwlogs.PusherKey]cwlogs.Pusher{} return emfExporter, nil } -// newEmfExporter creates a new exporter using exporterhelper -func newEmfExporter( - config component.Config, - set exporter.CreateSettings, -) (exporter.Metrics, error) { - emfPusher, err := newEmfPusher(config, set) - if err != nil { - return nil, err - } - - exporter, err := exporterhelper.NewMetricsExporter( - context.TODO(), - set, - config, - emfPusher.pushMetricsData, - exporterhelper.WithShutdown(emfPusher.shutdown), - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - ) - if err != nil { - return nil, err - } - return resourcetotelemetry.WrapMetricsExporter(config.(*Config).ResourceToTelemetrySettings, exporter), nil -} - func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error { rms := md.ResourceMetrics() labels := map[string]string{} @@ -129,23 +98,22 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e }) } } - emf.logger.Info("Start processing resource metrics", zap.Any("labels", labels)) + emf.config.logger.Info("Start processing resource metrics", zap.Any("labels", labels)) groupedMetrics := make(map[interface{}]*groupedMetric) - expConfig := emf.config.(*Config) defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID) - outputDestination := expConfig.OutputDestination + outputDestination := emf.config.OutputDestination for i := 0; i < rms.Len(); i++ { - err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, expConfig) + err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, emf.config) if err != nil { return err } } for _, groupedMetric := range groupedMetrics { - cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig) - putLogEvent := translateCWMetricToEMF(cWMetric, expConfig) + cWMetric := translateGroupedMetricToCWMetric(groupedMetric, emf.config) + putLogEvent := translateCWMetricToEMF(cWMetric, emf.config) // Currently we only support two options for "OutputDestination". if strings.EqualFold(outputDestination, outputDestinationStdout) { fmt.Println(*putLogEvent.InputLogEvent.Message) @@ -176,14 +144,14 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e // TODO now we only have one logPusher, so it's ok to return after first error occurred err := wrapErrorIfBadRequest(returnError) if err != nil { - emf.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err)) + emf.config.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err)) } return err } } } - emf.logger.Info("Finish processing resource metrics", zap.Any("labels", labels)) + emf.config.logger.Info("Finish processing resource metrics", zap.Any("labels", labels)) return nil } @@ -192,7 +160,7 @@ func (emf *emfExporter) getPusher(key cwlogs.PusherKey) cwlogs.Pusher { var ok bool if _, ok = emf.pusherMap[key]; !ok { - emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.logger) + emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) } return emf.pusherMap[key] } @@ -215,7 +183,7 @@ func (emf *emfExporter) shutdown(ctx context.Context) error { if returnError != nil { err := wrapErrorIfBadRequest(returnError) if err != nil { - emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err)) + emf.config.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err)) } } } diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index 2674ffade881..8759d44b8eec 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -75,7 +75,7 @@ func TestConsumeMetrics(t *testing.T) { expCfg := factory.CreateDefaultConfig().(*Config) expCfg.Region = "us-west-2" expCfg.MaxRetries = 0 - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -137,7 +137,7 @@ func TestConsumeMetricsWithOutputDestination(t *testing.T) { expCfg.Region = "us-west-2" expCfg.MaxRetries = 0 expCfg.OutputDestination = "stdout" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -198,7 +198,7 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "test-logStreamName" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -268,7 +268,7 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "/aws/ecs/containerinsights/{ClusterName}/performance" expCfg.LogStreamName = "{TaskId}" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -338,7 +338,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "{TaskId}" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -408,7 +408,7 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "{WrongKey}" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -478,7 +478,7 @@ func TestPushMetricsDataWithErr(t *testing.T) { expCfg.MaxRetries = 0 expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "test-logStreamName" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -550,7 +550,7 @@ func TestNewExporterWithoutConfig(t *testing.T) { settings := exportertest.NewNopCreateSettings() t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake") - exp, err := newEmfPusher(expCfg, settings) + exp, err := newEmfExporter(expCfg, settings) assert.NotNil(t, err) assert.Nil(t, exp) assert.Equal(t, settings.Logger, expCfg.logger) @@ -587,17 +587,16 @@ func TestNewExporterWithMetricDeclarations(t *testing.T) { params := exportertest.NewNopCreateSettings() params.Logger = zap.New(obs) - exp, err := newEmfPusher(expCfg, params) + exp, err := newEmfExporter(expCfg, params) assert.Nil(t, err) assert.NotNil(t, exp) err = expCfg.Validate() assert.Nil(t, err) - config := exp.config.(*Config) // Invalid metric declaration should be filtered out - assert.Equal(t, 3, len(config.MetricDeclarations)) + assert.Equal(t, 3, len(exp.config.MetricDeclarations)) // Invalid dimensions (> 10 dims) should be filtered out - assert.Equal(t, 1, len(config.MetricDeclarations[2].Dimensions)) + assert.Equal(t, 1, len(exp.config.MetricDeclarations[2].Dimensions)) // Test output warning logs expectedLogs := []observer.LoggedEntry{ @@ -615,7 +614,7 @@ func TestNewExporterWithMetricDeclarations(t *testing.T) { } func TestNewExporterWithoutSession(t *testing.T) { - exp, err := newEmfPusher(nil, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(nil, exportertest.NewNopCreateSettings()) assert.NotNil(t, err) assert.Nil(t, exp) } @@ -629,7 +628,7 @@ func TestWrapErrorIfBadRequest(t *testing.T) { assert.False(t, consumererror.IsPermanent(err)) } -// This test verifies that if func newEmfPusher() returns an error then newEmfExporter() +// This test verifies that if func newEmfExporter() returns an error then newEmfExporter() // will do so. func TestNewEmfExporterWithoutConfig(t *testing.T) { factory := NewFactory() diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index e73eab57e592..fbcfdd0f3b55 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -19,9 +19,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) const ( @@ -47,6 +49,7 @@ func createDefaultConfig() component.Config { LogStreamName: "", Namespace: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", + Version: "1", RetainInitialValueOfDeltaMetric: false, OutputDestination: "cloudwatch", logger: zap.NewNop(), @@ -54,11 +57,24 @@ func createDefaultConfig() component.Config { } // createMetricsExporter creates a metrics exporter based on this config. -func createMetricsExporter(_ context.Context, - params exporter.CreateSettings, - config component.Config) (exporter.Metrics, error) { - +func createMetricsExporter(ctx context.Context, params exporter.CreateSettings, config component.Config) (exporter.Metrics, error) { expCfg := config.(*Config) - return newEmfExporter(expCfg, params) + emfExp, err := newEmfExporter(expCfg, params) + if err != nil { + return nil, err + } + + exporter, err := exporterhelper.NewMetricsExporter( + ctx, + params, + config, + emfExp.pushMetricsData, + exporterhelper.WithShutdown(emfExp.shutdown), + ) + if err != nil { + return nil, err + } + + return resourcetotelemetry.WrapMetricsExporter(expCfg.ResourceToTelemetrySettings, exporter), nil } diff --git a/exporter/awsemfexporter/metric_declaration.go b/exporter/awsemfexporter/metric_declaration.go index af3343e48173..edc8fbd74452 100644 --- a/exporter/awsemfexporter/metric_declaration.go +++ b/exporter/awsemfexporter/metric_declaration.go @@ -186,8 +186,9 @@ func (lm *LabelMatcher) init() (err error) { if len(lm.Separator) == 0 { lm.Separator = ";" } - lm.compiledRegex = regexp.MustCompile(lm.Regex) - return + + lm.compiledRegex, err = regexp.Compile(lm.Regex) + return err } // Matches returns true if given set of labels matches the LabelMatcher's rules. diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index af705d607add..5d138fbb2dae 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -338,7 +338,6 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf // translateCWMetricToEMF converts CloudWatch Metric format to EMF. func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) *cwlogs.Event { // convert CWMetric into map format for compatible with PLE input - cWMetricMap := make(map[string]interface{}) fieldMap := cWMetric.fields // restore the json objects that are stored as string in attributes @@ -369,12 +368,51 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) *cwlogs.Event { } } - // Create `_aws` section only if there are measurements + // Create EMF metrics if there are measurements + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure if len(cWMetric.measurements) > 0 { - // Create `_aws` section only if there are measurements - cWMetricMap["CloudWatchMetrics"] = cWMetric.measurements - cWMetricMap["Timestamp"] = cWMetric.timestampMs - fieldMap["_aws"] = cWMetricMap + if config.Version == "1" { + /* EMF V1 + "Version": "1", + "_aws": { + "CloudWatchMetrics": [ + { + "Namespace": "ECS", + "Dimensions": [ ["ClusterName"] ], + "Metrics": [{"Name": "memcached_commands_total"}] + } + ], + "Timestamp": 1668387032641 + } + */ + fieldMap["Version"] = "1" + fieldMap["_aws"] = map[string]interface{}{ + "CloudWatchMetrics": cWMetric.measurements, + "Timestamp": cWMetric.timestampMs, + } + + } + } + + if config.Version == "0" { + fieldMap["Timestamp"] = fmt.Sprint(cWMetric.timestampMs) + if len(cWMetric.measurements) > 0 { + /* EMF V0 + { + "Version": "0", + "CloudWatchMetrics": [ + { + "Namespace": "ECS", + "Dimensions": [ ["ClusterName"] ], + "Metrics": [{"Name": "memcached_commands_total"}] + } + ], + "Timestamp": "1668387032641" + } + */ + fieldMap["Version"] = "0" + fieldMap["CloudWatchMetrics"] = cWMetric.measurements + } } pleMsg, err := json.Marshal(fieldMap) diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 2d47f1a0240b..8b8278868530 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -16,7 +16,6 @@ package awsemfexporter import ( "fmt" - "os" "reflect" "sort" "testing" @@ -39,15 +38,6 @@ import ( internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" ) -func readFromFile(filename string) string { - data, err := os.ReadFile(filename) - if err != nil { - panic(err) - } - str := string(data) - return str -} - func createMetricTestData() *agentmetricspb.ExportMetricsServiceRequest { request := &agentmetricspb.ExportMetricsServiceRequest{ Node: &commonpb.Node{ @@ -576,37 +566,77 @@ func TestTranslateOtToGroupedMetric(t *testing.T) { } func TestTranslateCWMetricToEMF(t *testing.T) { - cwMeasurement := cWMeasurement{ - Namespace: "test-emf", - Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, - Metrics: []map[string]string{{ - "Name": "spanCounter", - "Unit": "Count", - }}, + testCases := map[string]struct { + emfVersion string + measurements []cWMeasurement + expectedEMFLogEvent string + }{ + "WithMeasurementAndEMFV1": { + emfVersion: "1", + measurements: []cWMeasurement{{ + Namespace: "test-emf", + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, + Metrics: []map[string]string{{ + "Name": "spanCounter", + "Unit": "Count", + }}, + }}, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Version\":\"1\",\"_aws\":{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"Timestamp\":1596151098037},\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, + "WithMeasurementAndEMFV0": { + emfVersion: "0", + measurements: []cWMeasurement{{ + Namespace: "test-emf", + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, + Metrics: []map[string]string{{ + "Name": "spanCounter", + "Unit": "Count", + }}, + }}, + expectedEMFLogEvent: "{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Timestamp\":\"1596151098037\",\"Version\":\"0\",\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, + "WithNoMeasurementAndEMFV1": { + emfVersion: "1", + measurements: nil, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, + "WithNoMeasurementAndEMFV0": { + emfVersion: "0", + measurements: nil, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Timestamp\":\"1596151098037\",\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, } - timestamp := int64(1596151098037) - fields := make(map[string]interface{}) - fields[oTellibDimensionKey] = "cloudwatch-otel" - fields["spanName"] = "test" - fields["spanCounter"] = 0 - // add stringified json as attribute values - fields["kubernetes"] = "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}" - fields["Sources"] = "[\"cadvisor\",\"pod\",\"calculated\"]" - config := &Config{ - // include valid json string, a non-existing key, and keys whose value are not json/string - ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"}, - logger: zap.NewNop(), - } + for name, tc := range testCases { + t.Run(name, func(_ *testing.T) { + config := &Config{ - met := &cWMetrics{ - timestampMs: timestamp, - fields: fields, - measurements: []cWMeasurement{cwMeasurement}, + // include valid json string, a non-existing key, and keys whose value are not json/string + ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"}, + Version: tc.emfVersion, + logger: zap.NewNop(), + } + + fields := map[string]interface{}{ + oTellibDimensionKey: "cloudwatch-otel", + "spanName": "test", + "spanCounter": 0, + "kubernetes": "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}", + "Sources": "[\"cadvisor\",\"pod\",\"calculated\"]", + } + + cloudwatchMetric := &cWMetrics{ + timestampMs: int64(1596151098037), + fields: fields, + measurements: tc.measurements, + } + + emfLogEvent := translateCWMetricToEMF(cloudwatchMetric, config) + + assert.Equal(t, tc.expectedEMFLogEvent, *emfLogEvent.InputLogEvent.Message) + }) } - inputLogEvent := translateCWMetricToEMF(met, config) - assert.Equal(t, readFromFile("testdata/testTranslateCWMetricToEMF.json"), *inputLogEvent.InputLogEvent.Message, "Expect to be equal") } func TestTranslateGroupedMetricToCWMetric(t *testing.T) { @@ -2114,24 +2144,6 @@ func TestGroupedMetricToCWMeasurementsWithFilters(t *testing.T) { } } -func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) { - timestamp := int64(1596151098037) - fields := make(map[string]interface{}) - fields[oTellibDimensionKey] = "cloudwatch-otel" - fields["spanName"] = "test" - fields["spanCounter"] = 0 - - met := &cWMetrics{ - timestampMs: timestamp, - fields: fields, - measurements: nil, - } - inputLogEvent := translateCWMetricToEMF(met, &Config{}) - expected := "{\"OTelLib\":\"cloudwatch-otel\",\"spanCounter\":0,\"spanName\":\"test\"}" - - assert.Equal(t, expected, *inputLogEvent.InputLogEvent.Message) -} - func BenchmarkTranslateOtToGroupedMetricWithInstrLibrary(b *testing.B) { oc := createMetricTestData() rm := internaldata.OCToMetrics(oc.Node, oc.Resource, oc.Metrics).ResourceMetrics().At(0) diff --git a/exporter/awsemfexporter/testdata/config.yaml b/exporter/awsemfexporter/testdata/config.yaml index e132056e0f34..2349a1d1fde1 100644 --- a/exporter/awsemfexporter/testdata/config.yaml +++ b/exporter/awsemfexporter/testdata/config.yaml @@ -2,6 +2,9 @@ awsemf: awsemf/1: region: 'us-west-2' role_arn: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole" + detailed_metrics: false + version: "1" + awsemf/resource_attr_to_label: resource_to_telemetry_conversion: enabled: true diff --git a/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json b/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json deleted file mode 100644 index c3f4fb9e32fb..000000000000 --- a/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json +++ /dev/null @@ -1 +0,0 @@ -{"OTelLib":"cloudwatch-otel","Sources":["cadvisor","pod","calculated"],"_aws":{"CloudWatchMetrics":[{"Namespace":"test-emf","Dimensions":[["OTelLib"],["OTelLib","spanName"]],"Metrics":[{"Name":"spanCounter","Unit":"Count"}]}],"Timestamp":1596151098037},"kubernetes":{"container_name":"cloudwatch-agent","docker":{"container_id":"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca"},"host":"ip-192-168-58-245.ec2.internal","labels":{"controller-revision-hash":"5bdbf497dc","name":"cloudwatch-agent","pod-template-generation":"1"},"namespace_name":"amazon-cloudwatch","pod_id":"e23f3413-af2e-4a98-89e0-5df2251e7f05","pod_name":"cloudwatch-agent-26bl6","pod_owners":[{"owner_kind":"DaemonSet","owner_name":"cloudwatch-agent"}]},"spanCounter":0,"spanName":"test"} \ No newline at end of file