From 2aa2a480b3f2bea028a19118ac68379f98d04ad5 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 30 Mar 2020 15:45:22 -0600 Subject: [PATCH 1/4] Combine metrics with no dimension --- .../metricbeat/module/aws/_meta/docs.asciidoc | 1 + .../module/aws/cloudwatch/cloudwatch.go | 48 +++++++++---------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc index 8f3358a965f9..32afafebf86d 100644 --- a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc @@ -112,6 +112,7 @@ image::./images/metricbeat-aws-elb-overview.png[] [float] === `lambda` When an invocation completes, Lambda sends a set of metrics to CloudWatch for that invocation. +Default period in aws module configuration is set to `5m` for lambda metricset. The lambda metricset comes with a predefined dashboard: image::./images/metricbeat-aws-lambda-overview.png[] diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go index b674be9989fc..29babfc3d8ec 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go @@ -144,12 +144,12 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { svcResourceAPI := resourcegroupstaggingapi.New(awscommon.EnrichAWSConfigWithEndpoint( m.Endpoint, "tagging", regionName, awsConfig)) - eventsWithIdentifier, eventsNoIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, listMetricDetailTotal.metricsWithStats, listMetricDetailTotal.resourceTypeFilters, regionName, startTime, endTime) + eventsWithIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, listMetricDetailTotal.metricsWithStats, listMetricDetailTotal.resourceTypeFilters, regionName, startTime, endTime) if err != nil { return errors.Wrap(err, "createEvents failed for region "+regionName) } - err = reportEvents(eventsWithIdentifier, eventsNoIdentifier, report) + err = reportEvents(eventsWithIdentifier, report) if err != nil { return errors.Wrap(err, "reportEvents failed") } @@ -183,12 +183,12 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { // get resource type filters and tags filters for each namespace resourceTypeTagFilters := constructTagsFilters(namespaceDetails) - eventsWithIdentifier, eventsNoIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, filteredMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) + eventsWithIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, filteredMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) if err != nil { return errors.Wrap(err, "createEvents failed for region "+regionName) } - err = reportEvents(eventsWithIdentifier, eventsNoIdentifier, report) + err = reportEvents(eventsWithIdentifier, report) if err != nil { return errors.Wrap(err, "reportEvents failed") } @@ -427,23 +427,20 @@ func insertRootFields(event mb.Event, metricValue float64, labels []string) mb.E return event } -func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcResourceAPI resourcegroupstaggingapiiface.ClientAPI, listMetricWithStatsTotal []metricsWithStatistics, resourceTypeTagFilters map[string][]aws.Tag, regionName string, startTime time.Time, endTime time.Time) (map[string]mb.Event, []mb.Event, error) { +func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcResourceAPI resourcegroupstaggingapiiface.ClientAPI, listMetricWithStatsTotal []metricsWithStatistics, resourceTypeTagFilters map[string][]aws.Tag, regionName string, startTime time.Time, endTime time.Time) (map[string]mb.Event, error) { // Initialize events for each identifier. events := map[string]mb.Event{} - // Initialize events for the ones without identifiers. - var eventsNoIdentifier []mb.Event - // Construct metricDataQueries metricDataQueries := createMetricDataQueries(listMetricWithStatsTotal, m.Period) if len(metricDataQueries) == 0 { - return events, eventsNoIdentifier, nil + return events, nil } // Use metricDataQueries to make GetMetricData API calls metricDataResults, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime) if err != nil { - return events, eventsNoIdentifier, errors.Wrap(err, "GetMetricDataResults failed") + return events, errors.Wrap(err, "GetMetricDataResults failed") } // Find a timestamp for all metrics in output @@ -459,9 +456,12 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes if exists { labels := strings.Split(*output.Label, labelSeperator) if len(labels) != 5 { - eventNew := aws.InitEvent(regionName, m.AccountName, m.AccountID) - eventNew = insertRootFields(eventNew, output.Values[timestampIdx], labels) - eventsNoIdentifier = append(eventsNoIdentifier, eventNew) + // when there is no identifier value in label, use region+accountID+namespace instead + identifier := regionName + m.AccountID + labels[namespaceIdx] + if _, ok := events[identifier]; !ok { + events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID) + } + events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels) continue } @@ -473,7 +473,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes } } } - return events, eventsNoIdentifier, nil + return events, nil } // Get tags @@ -509,9 +509,13 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes if len(tagsFilter) != 0 { continue } - eventNew := aws.InitEvent(regionName, m.AccountName, m.AccountID) - eventNew = insertRootFields(eventNew, output.Values[timestampIdx], labels) - eventsNoIdentifier = append(eventsNoIdentifier, eventNew) + + // when there is no identifier value in label, use region+accountID+namespace instead + identifier := regionName + m.AccountID + labels[namespaceIdx] + if _, ok := events[identifier]; !ok { + events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID) + } + events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels) continue } @@ -534,21 +538,15 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes } } } - return events, eventsNoIdentifier, nil + return events, nil } -func reportEvents(eventsWithIdentifier map[string]mb.Event, eventsNoIdentifier []mb.Event, report mb.ReporterV2) error { +func reportEvents(eventsWithIdentifier map[string]mb.Event, report mb.ReporterV2) error { for _, event := range eventsWithIdentifier { if reported := report.Event(event); !reported { return nil } } - - for _, event := range eventsNoIdentifier { - if reported := report.Event(event); !reported { - return nil - } - } return nil } From dbe6815ec5c45efeeb198fae177db1125a80f9c2 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 30 Mar 2020 15:56:45 -0600 Subject: [PATCH 2/4] update changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 36a2ef964321..ad972dbe9158 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Use max in k8s overview dashboard aggregations. {pull}17015[17015] - Fix Disk Used and Disk Usage visualizations in the Metricbeat System dashboards. {issue}12435[12435] {pull}17272[17272] - Fix missing Accept header for Prometheus and OpenMetrics module. {issue}16870[16870] {pull}17291[17291] +- Combine cloudwatch aggregated metrics into single event. {pull}17345[17345] *Packetbeat* From 9d4685dc266d6c3da144e891d6d3ae143c440e99 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 31 Mar 2020 07:01:57 -0600 Subject: [PATCH 3/4] run mage fmt update --- metricbeat/docs/modules/aws.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/metricbeat/docs/modules/aws.asciidoc b/metricbeat/docs/modules/aws.asciidoc index 53997ba5bddc..feed9a4b79d7 100644 --- a/metricbeat/docs/modules/aws.asciidoc +++ b/metricbeat/docs/modules/aws.asciidoc @@ -120,6 +120,7 @@ image::./images/metricbeat-aws-elb-overview.png[] [float] === `lambda` When an invocation completes, Lambda sends a set of metrics to CloudWatch for that invocation. +Default period in aws module configuration is set to `5m` for lambda metricset. The lambda metricset comes with a predefined dashboard: image::./images/metricbeat-aws-lambda-overview.png[] From d6cf8611e1b21a837e5c412d3ad2a830b7e2b464 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 31 Mar 2020 12:54:10 -0600 Subject: [PATCH 4/4] Add unit tests for CreateEvents function w/o dimensions --- .../module/aws/cloudwatch/cloudwatch_test.go | 231 ++++++++++++++++++ 1 file changed, 231 insertions(+) diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go index 513165104b80..35fd7d603503 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go @@ -7,10 +7,15 @@ package cloudwatch import ( + "net/http" "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" + "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi" + "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -18,6 +23,21 @@ import ( ) var ( + regionName = "us-west-1" + timestamp = time.Now() + accountID = "123456789012" + + id1 = "cpu" + value1 = 0.25 + label1 = "CPUUtilization|AWS/EC2|Average|InstanceId|i-1" + + id2 = "disk" + value2 = 5.0 + label2 = "DiskReadOps|AWS/EC2|Average|InstanceId|i-1" + + label3 = "CPUUtilization|AWS/EC2|Average" + label4 = "DiskReadOps|AWS/EC2|Average" + instanceID1 = "i-1" instanceID2 = "i-2" namespace = "AWS/EC2" @@ -1075,3 +1095,214 @@ func TestCheckStatistics(t *testing.T) { }) } } + +// MockCloudWatchClient struct is used for unit tests. +type MockCloudWatchClient struct { + cloudwatchiface.ClientAPI +} + +// MockCloudWatchClientWithoutDim struct is used for unit tests. +type MockCloudWatchClientWithoutDim struct { + cloudwatchiface.ClientAPI +} + +// MockResourceGroupsTaggingClient is used for unit tests. +type MockResourceGroupsTaggingClient struct { + resourcegroupstaggingapiiface.ClientAPI +} + +func (m *MockCloudWatchClient) ListMetricsRequest(input *cloudwatch.ListMetricsInput) cloudwatch.ListMetricsRequest { + dim := cloudwatch.Dimension{ + Name: &dimName, + Value: &instanceID1, + } + httpReq, _ := http.NewRequest("", "", nil) + return cloudwatch.ListMetricsRequest{ + Request: &awssdk.Request{ + Data: &cloudwatch.ListMetricsOutput{ + Metrics: []cloudwatch.Metric{ + { + MetricName: &metricName1, + Namespace: &namespace, + Dimensions: []cloudwatch.Dimension{dim}, + }, + }, + }, + HTTPRequest: httpReq, + }, + } +} + +func (m *MockCloudWatchClient) GetMetricDataRequest(input *cloudwatch.GetMetricDataInput) cloudwatch.GetMetricDataRequest { + httpReq, _ := http.NewRequest("", "", nil) + + return cloudwatch.GetMetricDataRequest{ + Request: &awssdk.Request{ + Data: &cloudwatch.GetMetricDataOutput{ + MetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label1, + Values: []float64{value1}, + Timestamps: []time.Time{timestamp}, + }, + { + Id: &id2, + Label: &label2, + Values: []float64{value2}, + Timestamps: []time.Time{timestamp}, + }, + }, + }, + HTTPRequest: httpReq, + }, + } +} + +func (m *MockCloudWatchClientWithoutDim) ListMetricsRequest(input *cloudwatch.ListMetricsInput) cloudwatch.ListMetricsRequest { + httpReq, _ := http.NewRequest("", "", nil) + return cloudwatch.ListMetricsRequest{ + Request: &awssdk.Request{ + Data: &cloudwatch.ListMetricsOutput{ + Metrics: []cloudwatch.Metric{ + { + MetricName: &metricName1, + Namespace: &namespace, + }, + }, + }, + HTTPRequest: httpReq, + }, + } +} + +func (m *MockCloudWatchClientWithoutDim) GetMetricDataRequest(input *cloudwatch.GetMetricDataInput) cloudwatch.GetMetricDataRequest { + httpReq, _ := http.NewRequest("", "", nil) + + return cloudwatch.GetMetricDataRequest{ + Request: &awssdk.Request{ + Data: &cloudwatch.GetMetricDataOutput{ + MetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label3, + Values: []float64{value1}, + Timestamps: []time.Time{timestamp}, + }, + { + Id: &id2, + Label: &label4, + Values: []float64{value2}, + Timestamps: []time.Time{timestamp}, + }, + }, + }, + HTTPRequest: httpReq, + }, + } +} + +func (m *MockResourceGroupsTaggingClient) GetResourcesRequest(input *resourcegroupstaggingapi.GetResourcesInput) resourcegroupstaggingapi.GetResourcesRequest { + httpReq, _ := http.NewRequest("", "", nil) + return resourcegroupstaggingapi.GetResourcesRequest{ + Request: &awssdk.Request{ + Data: &resourcegroupstaggingapi.GetResourcesOutput{ + PaginationToken: awssdk.String(""), + ResourceTagMappingList: []resourcegroupstaggingapi.ResourceTagMapping{ + { + ResourceARN: awssdk.String("arn:aws:ec2:us-west-1:123456789012:instance:i-1"), + Tags: []resourcegroupstaggingapi.Tag{ + { + Key: awssdk.String("name"), + Value: awssdk.String("test-ec2"), + }, + }, + }, + }, + }, + HTTPRequest: httpReq, + }, + } +} + +func TestCreateEventsWithIdentifier(t *testing.T) { + m := MetricSet{} + m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}} + m.MetricSet = &aws.MetricSet{Period: 5} + + mockTaggingSvc := &MockResourceGroupsTaggingClient{} + mockCloudwatchSvc := &MockCloudWatchClient{} + listMetricWithStatsTotal := []metricsWithStatistics{{ + cloudwatch.Metric{ + Dimensions: []cloudwatch.Dimension{{ + Name: awssdk.String("InstanceId"), + Value: awssdk.String("i-1"), + }}, + MetricName: awssdk.String("CPUUtilization"), + Namespace: awssdk.String("AWS/EC2"), + }, + []string{"Average"}, + nil, + }} + resourceTypeTagFilters := map[string][]aws.Tag{} + resourceTypeTagFilters["ec2:instance"] = []aws.Tag{ + { + Key: "name", + Value: "test-ec2", + }, + } + startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period) + + events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) + assert.NoError(t, err) + + metricValue, err := events["i-1"].RootFields.GetValue("aws.ec2.metrics.CPUUtilization.avg") + assert.NoError(t, err) + assert.Equal(t, value1, metricValue) + + dimension, err := events["i-1"].RootFields.GetValue("aws.dimensions.InstanceId") + assert.NoError(t, err) + assert.Equal(t, instanceID1, dimension) +} + +func TestCreateEventsWithoutIdentifier(t *testing.T) { + m := MetricSet{} + m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}} + m.MetricSet = &aws.MetricSet{Period: 5, AccountID: accountID} + + mockTaggingSvc := &MockResourceGroupsTaggingClient{} + mockCloudwatchSvc := &MockCloudWatchClientWithoutDim{} + listMetricWithStatsTotal := []metricsWithStatistics{ + { + cloudwatch.Metric{ + MetricName: awssdk.String("CPUUtilization"), + Namespace: awssdk.String("AWS/EC2"), + }, + []string{"Average"}, + nil, + }, + { + cloudwatch.Metric{ + MetricName: awssdk.String("DiskReadOps"), + Namespace: awssdk.String("AWS/EC2"), + }, + []string{"Average"}, + nil, + }, + } + + resourceTypeTagFilters := map[string][]aws.Tag{} + startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period) + + events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) + assert.NoError(t, err) + + expectedID := regionName + accountID + namespace + metricValue, err := events[expectedID].RootFields.GetValue("aws.ec2.metrics.CPUUtilization.avg") + assert.NoError(t, err) + assert.Equal(t, value1, metricValue) + + dimension, err := events[expectedID].RootFields.GetValue("aws.ec2.metrics.DiskReadOps.avg") + assert.NoError(t, err) + assert.Equal(t, value2, dimension) +}